今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

Spring Boot 整合RabbitMQ

 高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。 Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 Spring Boot为通过RabbitMQ使用AMQP提供了许多便利,包括spring-boot-starter-amqp“ Starter”。

一.spring boot中中使用RabbitMQ

引入Strater依赖

implementation 'org.springframework.boot:spring-boot-starter-amqp'

1.rabbitmq配置

spring:
  #MQ
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    #    指明采用发送者确认模式
    publisher-confirm-type: CORRELATED
    #    失败时返回消息
    publisher-returns: true
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        #      每个容器的消费者数量控制,也就是线程池的大小
        concurrency: 1
        #       acknowledge-mode: none
        max-concurrency: 4
        # 开启失败时的重试
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 100000   # 重试最大间隔时间
          initial-interval: 1000  # 重试初始间隔时间
        #          预取的数量,spring amqp2.0开始默认值为250,之前默认为1,最好设置稍微大些
        prefetch: 1

2.rabbitmq连接工厂配置


@Configuration
public class RabbitConfig {

    Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private RabbitProperties rabbitProperties;


    @Bean
    public ConnectionFactory getConnectionFactory() {
        com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory =
                new com.rabbitmq.client.ConnectionFactory();
        rabbitConnectionFactory.setHost(rabbitProperties.getHost());
        rabbitConnectionFactory.setPort(rabbitProperties.getPort());
        rabbitConnectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
        DefaultCredentialsProvider credentialsProvider = new DefaultCredentialsProvider(rabbitProperties.getUsername(), rabbitProperties.getPassword());
        rabbitConnectionFactory.setCredentialsProvider(credentialsProvider);

        rabbitConnectionFactory.setAutomaticRecoveryEnabled(true);
        rabbitConnectionFactory.setNetworkRecoveryInterval(5000);

        ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);

        //((CachingConnectionFactory)connectionFactory).setPublisherConfirms(rabbitProperties.isPublisherConfirms());
        ((CachingConnectionFactory) connectionFactory).setPublisherReturns(rabbitProperties.isPublisherReturns());
        ((CachingConnectionFactory) connectionFactory).setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());

        return connectionFactory;
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitProperties rabbitProperties) {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(getConnectionFactory());
        containerFactory.setConcurrentConsumers(1);
        containerFactory.setMaxConcurrentConsumers(20);
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //containerFactory.setRetryTemplate(rabbitRetryTemplate());
        //containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        containerFactory.setTaskExecutor(taskExecutor());
        containerFactory.setChannelTransacted(false);
        containerFactory.setAdviceChain(retryInterceptor());
        return containerFactory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor() {

        return RetryInterceptorBuilder.stateless()
                .retryOperations(rabbitRetryTemplate())
                .recoverer(new ImmediateRequeueMessageRecoverer())
                .build();
    }



    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.initialize();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(10);
        return executor;
    }

        @Bean
    public RetryTemplate rabbitRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 执行之前调用 (返回false时会终止执行)
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                logger.warn("重试结束,已重试次数=[{}]", retryContext.getRetryCount());
            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                //  异常 都会调用
                logger.error("-----第{}次调用", retryContext.getRetryCount());
            }
        });

        retryTemplate.setBackOffPolicy(backOffPolicy());
        retryTemplate.setRetryPolicy(retryPolicy());
        return retryTemplate;
    }

    @Bean
    public ExponentialBackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        long maxInterval = rabbitProperties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
        long initialInterval = rabbitProperties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
        double multiplier = rabbitProperties.getListener().getSimple().getRetry().getMultiplier();
        // 重试间隔
        backOffPolicy.setInitialInterval(initialInterval * 1000);
        // 重试最大间隔
        backOffPolicy.setMaxInterval(maxInterval * 1000);
        // 重试间隔乘法策略
        backOffPolicy.setMultiplier(multiplier);
        return backOffPolicy;
    }

    @Bean
    public SimpleRetryPolicy retryPolicy() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        int maxAttempts = rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts();
        retryPolicy.setMaxAttempts(maxAttempts);
        return retryPolicy;
    }


    //声明队列
    @Bean
    public Queue saveDailyDataQueue() {
        return new Queue("covid.daily.data.queue", true); // true表示持久化该队列
    }


    /**
     * direct(直接):把消息路由到那些BindingKey和RoutingKey完全匹配的队列中;
     */

    /**
     * 声明topic交互器
     * topic(主题):类似于direct,但可以使用通配符匹配规则(广播);
     * BindingKey允许使用两种符号用于模糊匹配:“*”与“#”,“#”可匹配多个或零个单词;“*”可匹配一个单词。
     */
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("covid.daily.data.exchange");
    }

    //绑定
    @Bean
    public Binding bindingQueue() {
        return BindingBuilder.bind(saveDailyDataQueue()).to(topicExchange()).with("covid.daily.data");
    }


}

3.发送消息

public class CovidServiceImpl implements CovidService, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    Logger logger = LoggerFactory.getLogger(CovidServiceImpl.class);

    @Autowired
    CovidRepository covidRepository;

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void init() {
        //设置消息投递到queue失败回退时回调
        rabbitTemplate.setReturnCallback(this);
        //设置消息发送到exchange结果回调
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.error("发送消息到交换器成功,correlationData=[{}]",correlationData);
        } else {
            logger.error("发送消息到交换器失败,原因=[{}]",cause);

        }

    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("发送消息到队列失败,响应码=[{}],CorrelationId=[{}]",replyCode,message.getMessageProperties().getCorrelationId());

    }


    @Override
    public void saveDailyData() {
        String msgId = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(msgId);
        Map<String,Object> map = new HashMap<>();
        map.put("time",LocalDate.now());
        rabbitTemplate.convertAndSend("covid.daily.data.exchange", "covid.daily.data", map, correlationId);
    }
}

4.接受消息

注意使用@RabbitListener,@RabbitHandler 就可以。

@Component
@RabbitListener(queues = "covid.daily.data.queue")
public class CovidMsgReceiver {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${covid.daily.data.api}")
    private String covidDaliyDataApi;
    @Autowired
    CovidRepository covidRepository;

    @Autowired
    RestTemplate restTemplate;

    Logger logger = LoggerFactory.getLogger(CovidMsgReceiver.class);
    @RabbitHandler
    public void onMessage(Map msg, Channel channel, Message message) throws IOException {



        try {
            logger.info("HelloReceiver收到  : " + msg +",收到时间"+new Date());
            //消息发送成功,但是网络中断导致,无法接受怎么处理
            Covid[] dailyList = restTemplate.getForObject(covidDaliyDataApi, Covid[].class);
            logger.info("COVID daily data size ===>>>>>[{}]", dailyList.length);
            covidRepository.saveAll(Arrays.asList(dailyList));
            logger.info("COVID daily data 已写入数据库");
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            logger.info("receiver success:"+msg);
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            logger.info("receiver fail");
        }

    }

    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("消息已确认");
    }

    @RabbitHandler
    public void onMessage(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void onMessage(byte[] message) {
        System.out.println(new String(message));
    }
}
RabbitListener源码解析

1.解析注解:Spring 在启动时会扫描带有 @RabbitListener 注解的方法,并解析这些注解

2.创建消息监听器容器:对于每个带有 @RabbitListener 注解的方法,Spring 会创建一个消息监听器容器。

3.启动消息监听器容器:容器启动后,会开始监听指定队列的消息。当队列中有消息时调用监听器方法来处理。

4.添加消息: spring启动时根据配置启动循环线程,循环线程处理broker投递的消息放入队列中

 

二.对于上面使用的详细解析

1.在上面的配置我们配置了 publisher-confirm-type: CORRELATED, publisher-returns: true 开启发送者确认模式。使用CorrelationData关联发送的消息

假设我们发送消息是将exchage 设置错误,将叫调用confirm方法,消息已发送到交换器但是设置路由key错误将会调用returnedMessage

rabbitTemplate.convertAndSend("covid.daily.data.exchange_fail", "covid.daily.data_fail", map, correlationId);

2.通过配置一下信息,我们设置消费者消费消息后需要手动确认,否则认为消息未消费成功。并且配置了消费重试次数为5. 

    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        #      每个容器的消费者数量控制,也就是线程池的大小
        concurrency: 1
        #       acknowledge-mode: none
        max-concurrency: 4
        # 开启失败时的重试
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 100000   # 重试最大间隔时间
          initial-interval: 1000  # 重试初始间隔时间

通过使用channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 表示消息已处理成功。但是假设这个时候在调用远程api的过程网络中断了,这个时候怎么处理呢。

在前面我们配置了重试机制,通过重试拦截器来处理retryInterceptor,如果重试成功,那么正常消费消息,如果在重试次数用完之后任然没能消费消息,这个时候我们可以选择重新发送消息到队列中,或者将消息持久化,后面通过其他方式来处理记录的消息。通过配置RepublishMessageRecoverer,RejectAndDontRequeueRecoverer,ImmediateRequeueMessageRecoverer 可以冲入队列,拒绝消息等。

23:19:56.592 [taskExecutor-1] INFO  com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到  : {time=2021-03-07},收到时间Sun Mar 07 23:19:56 CST 2021
23:19:56.593 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第2次调用
23:19:58.597 [taskExecutor-1] INFO  com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到  : {time=2021-03-07},收到时间Sun Mar 07 23:19:58 CST 2021
23:19:58.600 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第3次调用
23:20:02.612 [taskExecutor-1] INFO  com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到  : {time=2021-03-07},收到时间Sun Mar 07 23:20:02 CST 2021
23:20:02.616 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第4次调用
23:20:10.632 [taskExecutor-1] INFO  com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到  : {time=2021-03-07},收到时间Sun Mar 07 23:20:10 CST 2021
23:20:10.634 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第5次调用
23:24:23.810 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.lambda$recoverer$0 - 消息参数==>[],失败原因=[Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,1), conn: Proxy@2bd9722 Shared Rabbit Connection: SimpleConnection@54f3fd30 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 56057], (Body:'[B@56cf1eca(byte[129])' MessageProperties [headers={spring_listener_return_correlation=f5ff3d09-09b0-4942-9387-3b7f876b8a4d, spring_returned_message_correlation=d8bf92c6-fb8d-488b-9401-09e17c888a8c}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=covid.daily.data.exchange, receivedRoutingKey=covid.daily.data, deliveryTag=6, consumerTag=amq.ctag-7eogtN6aqvpgJvqrLV6EEw, consumerQueue=covid.daily.data.queue])]
23:24:23.812 [taskExecutor-1] WARN  com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.close - 重试结束,已重试次数=[5]

 

分享到:

专栏

类型标签

网站访问总量