Sring整合RocketMQ实战之消息发送与消费

引入Pom文件

 <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.6.0</version>
</dependency>

消息发送

配置生产者

@Configuration
public class RocketConfig {
    
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer defaultMQProducer(){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("consumer-group");
        defaultMQProducer.setNamesrvAddr("NameServer地址");
        return defaultMQProducer;
    }
}

同步发送

@Component
public class RocketProducer {
    
    @Autowired
    private DefaultMQProducer mqProducer;
    
    /**
     * 同步发送,等待发送结果
     */
    public SendResult send(String topic, String tags,String keys, String context) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        Message message = new Message(topic, tags, keys, context.getBytes(RemotingHelper.DEFAULT_CHARSET));
        return mqProducer.send(message);
    }
    
    
}

异步发送

/**
 * 异步发送,直接返回。在callback处理发送结果
 */
public void sendAsync(String topic, String tags,String keys, String context) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException {
    Message message = new Message(topic, tags, keys, context.getBytes(RemotingHelper.DEFAULT_CHARSET));
    mqProducer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("Send message: " + message);
            System.out.println("Send Success: " + sendResult);
        }
        @Override
        public void onException(Throwable e) {
            System.out.println("Send message: " + message);
            System.out.println("Send Exception: " + e);
        }
    });
}

顺序消息

顺序消息的原理:消息发往同一个队列

 public SendResult sendOrderly(String topic, String tags, String keys, String context, Object arg) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
    Message message = new Message(topic, tags, keys, context.getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }, arg);
    
    return sendResult;
}

消息消费

并发消费消息监听

@Component
public class RocketListener1 implements MessageListenerConcurrently {
    
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt messageExt = list.get(0);
        System.out.println("RocketListener1:" + messageExt);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

顺序消费消息监听

单线程消费单个队列

@Component
public class RocketOrderlyListener implements MessageListenerOrderly {
    
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        MessageExt messageExt = msgs.get(0);
        System.out.println("RocketOrderlyListener:" + messageExt);
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

广播模式消费

/**
 * 广播模式消费者
 * 同一消息会被同组下的所有消费者消费
 * @return DefaultMQPushConsumer
 * @throws MQClientException
 */
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer mpConsumer() throws MQClientException {
    DefaultMQPushConsumer mqConsumer = new DefaultMQPushConsumer("test-consumer-group3");
    mqConsumer.setNamesrvAddr("NamesrvAddr");
    mqConsumer.subscribe("Test", "*");
    mqConsumer.setMessageModel(MessageModel.BROADCASTING);
    mqConsumer.registerMessageListener(rocketListener2);
    return mqConsumer;
}

集群模式消费

/**
 * 集群模式消费
 * 同一消息只被同一消费组下的某一消费者消费,可被不同消费组同时消费
 * @return DefaultMQPushConsumer
 * @throws MQClientException
 */
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer defaultMpConsumer() throws MQClientException {
    DefaultMQPushConsumer defaultMpConsumer = new DefaultMQPushConsumer("test-consumer-group1");
    defaultMpConsumer.setNamesrvAddr("NamesrvAddr");
    defaultMpConsumer.subscribe("Test", "*");
    defaultMpConsumer.registerMessageListener(rocketListener1);
    return defaultMpConsumer;
}

https://juejin.im/post/5e216d11f265da3e36456611

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
iOS
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论