今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

rabbitmq一致性

1.为什么要使用Rabbitmq

  1. 异步通信:RabbitMQ是一个消息队列中间件,可以实现应用程序之间的异步通信。通过将消息发送到队列中,发送方可以继续执行其他操作,而不必等待接收方的响应。这种异步通信模式可以提高系统的可伸缩性和性能。

  2. 解耦应用:RabbitMQ可以将应用程序解耦。发送方和接收方之间通过消息队列通信,发送方不需要知道接收方的存在,也不需要了解接收方的具体实现。这样可以提高系统的灵活性和可维护性。

  3. 可靠性和持久化:RabbitMQ具有高可靠性和持久性特性。它可以确保消息的可靠传递,即使在发送方或接收方出现故障的情况下也能保证消息的不丢失。此外,RabbitMQ还支持将消息持久化到磁盘,以防止消息丢失。

  4. 负载均衡:RabbitMQ支持多个消费者同时监听同一个队列。当有多个消费者时,RabbitMQ会自动进行负载均衡,将消息均匀地分发给不同的消费者,以提高系统的处理能力和吞吐量。

  5. 灵活的消息模式:RabbitMQ支持多种消息模式,如点对点模式、发布/订阅模式和工作队列模式等。这些模式可以适用于不同类型的应用场景,提供了更多的灵活性和选择性。

2.消息的丢失造成的数据不一致性

  1. 生产者发送消息失败

    消息发送成功,业务执行失败会造成数据不一致的情况。
    业务执行成功,但是消息发送失败,同样会造成数据不一致

    该种情况下如果将业务执行和发送放在同一个事务中是不是就可以解决该问题。消息发送成功与否 是通过Publisher Confirms 来感知的,ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调。但是此时还未被路由到队列上,有可能在路由到队列上时失败。 但是将他们放在同一个事务中并不是一个好的方法,因为消息最大的特性就是要异步发送,如果放在事务中,相当于同步执行。这里我们应该采用发布事务消息来保证消息的发送依赖于业务执行成功与否。rabbitmq 本身不支持事务消息。不过我们可以通过设计一张表来存储半消息。

    springboot中设置确认回调和返回回调
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setMessageConverter(new SerializerMessageConverter());
            rabbitTemplate.setReturnsCallback((returnedMessage)->{
               logger.info("ReturnsCallback msg:{}", returnedMessage);
            });
            rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
                if (ack) {
                    logger.info("消息成功发送到exchange,correlationData:{}",correlationData);
                } else {
                    logger.error("消息发送exchange失败,cause:{}",cause);
                }
            });
            return rabbitTemplate;
        }

     
  2. rabbitmq宕机消息未被持久

    默认情况下,RabbitMQ 中的消息是非持久化的,这意味着消息只存储在内存中,而不会被写入磁盘。如果 RabbitMQ 服务器崩溃或关闭,非持久化的消息将会丢失。为了避免这种情况,您可以在发布消息时将其设置为持久化,以确保消息在服务器重启后仍然可用。但是即使设置了持久化,也有可能还没有被持久就崩溃的情况。
    springboot设置持久化
    //交换器持久化  
      @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("myTopicExchange",true,true);
        }
    //队列持久化
        @Bean
        public Queue topicQueue() {
            return QueueBuilder.durable("myTopicQueue")
                    .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
                    .withArgument("x-dead-letter-routing-key", DLX_ROUTEING_KEY)
                    .build();
        }

     
  3. 消费者消费消息业务还未执行宕机

    消费者在处理非常耗时的操作时,因为网络故障或服务器故障,导致超时未发送ACK,Rabbit broker 重新发送消息,或者 消费者执行到中途宕机,没有ack.消费者重新启动后重新消费导致数据不一致。    该种情况可能会导致消息重复消费消费。需要设计消费者的幂等性

    消费者手动ACK
    @Slf4j
    public class TopicCustomer implements ChannelAwareMessageListener {
    
        private static final int MAX_RETRY_ATTEMPTS = 5;
    
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            try{
                String body = new String(message.getBody());
                log.info("onMessage:{}", body);
                //int i = 10 / 0;
                /**
                 * 确认一条消息
                 * channel.basicAck(deliveryTag, false); <br>
                 * deliveryTag:该消息的index <br>
                 * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch(Exception e){
                int retryCount = (int) message.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);
                if (retryCount < MAX_RETRY_ATTEMPTS) {
                    // Calculate the delay time using exponential backoff
                    long delay = (long) Math.pow(2, retryCount) * 1000;
                    // Add the retry count header
                    // Send the message to the back of the queue with a delay
                    Map<String, Object> headers = message.getMessageProperties().getHeaders();
                    headers.put("retryCount",retryCount + 1);
                    AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().headers(headers).build();
                    channel.basicPublish(
                            message.getMessageProperties().getReceivedExchange(),
                            message.getMessageProperties().getReceivedRoutingKey(),
                            basicProperties,
                            message.getBody()
                    );
                   log.info("Message processing failed, retrying in " + delay + "ms");
                } else {
                    // Max retry attempts reached, send manual negative acknowledgement to remove message from queue
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    log.info("Message processing failed after " + MAX_RETRY_ATTEMPTS + " attempts, message removed from queue");
                }
            }
        }
    }
    

     
分享到:

专栏

类型标签

网站访问总量