- 生产者发送消息失败
消息发送成功,业务执行失败会造成数据不一致的情况。
业务执行成功,但是消息发送失败,同样会造成数据不一致
该种情况下如果将业务执行和发送放在同一个事务中是不是就可以解决该问题。消息发送成功与否 是通过Publisher Confirms 来感知的,ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调。但是此时还未被路由到队列上,有可能在路由到队列上时失败。 但是将他们放在同一个事务中并不是一个好的方法,因为消息最大的特性就是要异步发送,如果放在事务中,相当于同步执行。这里我们应该采用发布事务消息来保证消息的发送依赖于业务执行成功与否。rabbitmq 本身不支持事务消息。不过我们可以通过设计一张表来存储半消息。
springboot中设置确认回调和返回回调
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(new SerializerMessageConverter());
rabbitTemplate.setReturnsCallback((returnedMessage)->{
logger.info("ReturnsCallback msg:{}", returnedMessage);
});
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
if (ack) {
logger.info("消息成功发送到exchange,correlationData:{}",correlationData);
} else {
logger.error("消息发送exchange失败,cause:{}",cause);
}
});
return rabbitTemplate;
}
- rabbitmq宕机消息未被持久
默认情况下,RabbitMQ 中的消息是非持久化的,这意味着消息只存储在内存中,而不会被写入磁盘。如果 RabbitMQ 服务器崩溃或关闭,非持久化的消息将会丢失。为了避免这种情况,您可以在发布消息时将其设置为持久化,以确保消息在服务器重启后仍然可用。但是即使设置了持久化,也有可能还没有被持久就崩溃的情况。
springboot设置持久化
//交换器持久化
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("myTopicExchange",true,true);
}
//队列持久化
@Bean
public Queue topicQueue() {
return QueueBuilder.durable("myTopicQueue")
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.withArgument("x-dead-letter-routing-key", DLX_ROUTEING_KEY)
.build();
}
- 消费者消费消息业务还未执行宕机
消费者在处理非常耗时的操作时,因为网络故障或服务器故障,导致超时未发送ACK,Rabbit broker 重新发送消息,或者 消费者执行到中途宕机,没有ack.消费者重新启动后重新消费导致数据不一致。 该种情况可能会导致消息重复消费消费。需要设计消费者的幂等性
消费者手动ACK
@Slf4j
public class TopicCustomer implements ChannelAwareMessageListener {
private static final int MAX_RETRY_ATTEMPTS = 5;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
String body = new String(message.getBody());
log.info("onMessage:{}", body);
//int i = 10 / 0;
/**
* 确认一条消息
* channel.basicAck(deliveryTag, false); <br>
* deliveryTag:该消息的index <br>
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
int retryCount = (int) message.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);
if (retryCount < MAX_RETRY_ATTEMPTS) {
// Calculate the delay time using exponential backoff
long delay = (long) Math.pow(2, retryCount) * 1000;
// Add the retry count header
// Send the message to the back of the queue with a delay
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("retryCount",retryCount + 1);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().headers(headers).build();
channel.basicPublish(
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
basicProperties,
message.getBody()
);
log.info("Message processing failed, retrying in " + delay + "ms");
} else {
// Max retry attempts reached, send manual negative acknowledgement to remove message from queue
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("Message processing failed after " + MAX_RETRY_ATTEMPTS + " attempts, message removed from queue");
}
}
}
}