Spring Boot 整合RabbitMQ
高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。 Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 Spring Boot为通过RabbitMQ使用AMQP提供了许多便利,包括spring-boot-starter-amqp“ Starter”。
一.spring boot中中使用RabbitMQ
引入Strater依赖
implementation 'org.springframework.boot:spring-boot-starter-amqp'
1.rabbitmq配置
spring:
#MQ
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
# 指明采用发送者确认模式
publisher-confirm-type: CORRELATED
# 失败时返回消息
publisher-returns: true
virtual-host: /
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
# 每个容器的消费者数量控制,也就是线程池的大小
concurrency: 1
# acknowledge-mode: none
max-concurrency: 4
# 开启失败时的重试
retry:
enabled: true
max-attempts: 5
max-interval: 100000 # 重试最大间隔时间
initial-interval: 1000 # 重试初始间隔时间
# 预取的数量,spring amqp2.0开始默认值为250,之前默认为1,最好设置稍微大些
prefetch: 1
2.rabbitmq连接工厂配置
@Configuration
public class RabbitConfig {
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory getConnectionFactory() {
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory =
new com.rabbitmq.client.ConnectionFactory();
rabbitConnectionFactory.setHost(rabbitProperties.getHost());
rabbitConnectionFactory.setPort(rabbitProperties.getPort());
rabbitConnectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
DefaultCredentialsProvider credentialsProvider = new DefaultCredentialsProvider(rabbitProperties.getUsername(), rabbitProperties.getPassword());
rabbitConnectionFactory.setCredentialsProvider(credentialsProvider);
rabbitConnectionFactory.setAutomaticRecoveryEnabled(true);
rabbitConnectionFactory.setNetworkRecoveryInterval(5000);
ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
//((CachingConnectionFactory)connectionFactory).setPublisherConfirms(rabbitProperties.isPublisherConfirms());
((CachingConnectionFactory) connectionFactory).setPublisherReturns(rabbitProperties.isPublisherReturns());
((CachingConnectionFactory) connectionFactory).setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitProperties rabbitProperties) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(getConnectionFactory());
containerFactory.setConcurrentConsumers(1);
containerFactory.setMaxConcurrentConsumers(20);
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//containerFactory.setRetryTemplate(rabbitRetryTemplate());
//containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
containerFactory.setTaskExecutor(taskExecutor());
containerFactory.setChannelTransacted(false);
containerFactory.setAdviceChain(retryInterceptor());
return containerFactory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.retryOperations(rabbitRetryTemplate())
.recoverer(new ImmediateRequeueMessageRecoverer())
.build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
return executor;
}
@Bean
public RetryTemplate rabbitRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
// 执行之前调用 (返回false时会终止执行)
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
logger.warn("重试结束,已重试次数=[{}]", retryContext.getRetryCount());
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 异常 都会调用
logger.error("-----第{}次调用", retryContext.getRetryCount());
}
});
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
@Bean
public ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
long maxInterval = rabbitProperties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
long initialInterval = rabbitProperties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
double multiplier = rabbitProperties.getListener().getSimple().getRetry().getMultiplier();
// 重试间隔
backOffPolicy.setInitialInterval(initialInterval * 1000);
// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval * 1000);
// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);
return backOffPolicy;
}
@Bean
public SimpleRetryPolicy retryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
int maxAttempts = rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);
return retryPolicy;
}
//声明队列
@Bean
public Queue saveDailyDataQueue() {
return new Queue("covid.daily.data.queue", true); // true表示持久化该队列
}
/**
* direct(直接):把消息路由到那些BindingKey和RoutingKey完全匹配的队列中;
*/
/**
* 声明topic交互器
* topic(主题):类似于direct,但可以使用通配符匹配规则(广播);
* BindingKey允许使用两种符号用于模糊匹配:“*”与“#”,“#”可匹配多个或零个单词;“*”可匹配一个单词。
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange("covid.daily.data.exchange");
}
//绑定
@Bean
public Binding bindingQueue() {
return BindingBuilder.bind(saveDailyDataQueue()).to(topicExchange()).with("covid.daily.data");
}
}
3.发送消息
public class CovidServiceImpl implements CovidService, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
Logger logger = LoggerFactory.getLogger(CovidServiceImpl.class);
@Autowired
CovidRepository covidRepository;
@Autowired
RestTemplate restTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//设置消息投递到queue失败回退时回调
rabbitTemplate.setReturnCallback(this);
//设置消息发送到exchange结果回调
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.error("发送消息到交换器成功,correlationData=[{}]",correlationData);
} else {
logger.error("发送消息到交换器失败,原因=[{}]",cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.error("发送消息到队列失败,响应码=[{}],CorrelationId=[{}]",replyCode,message.getMessageProperties().getCorrelationId());
}
@Override
public void saveDailyData() {
String msgId = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(msgId);
Map<String,Object> map = new HashMap<>();
map.put("time",LocalDate.now());
rabbitTemplate.convertAndSend("covid.daily.data.exchange", "covid.daily.data", map, correlationId);
}
}
4.接受消息
注意使用@RabbitListener,@RabbitHandler 就可以。
@Component
@RabbitListener(queues = "covid.daily.data.queue")
public class CovidMsgReceiver {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${covid.daily.data.api}")
private String covidDaliyDataApi;
@Autowired
CovidRepository covidRepository;
@Autowired
RestTemplate restTemplate;
Logger logger = LoggerFactory.getLogger(CovidMsgReceiver.class);
@RabbitHandler
public void onMessage(Map msg, Channel channel, Message message) throws IOException {
try {
logger.info("HelloReceiver收到 : " + msg +",收到时间"+new Date());
//消息发送成功,但是网络中断导致,无法接受怎么处理
Covid[] dailyList = restTemplate.getForObject(covidDaliyDataApi, Covid[].class);
logger.info("COVID daily data size ===>>>>>[{}]", dailyList.length);
covidRepository.saveAll(Arrays.asList(dailyList));
logger.info("COVID daily data 已写入数据库");
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
logger.info("receiver success:"+msg);
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
logger.info("receiver fail");
}
}
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Message content : " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已确认");
}
@RabbitHandler
public void onMessage(String message) {
System.out.println(message);
}
@RabbitHandler
public void onMessage(byte[] message) {
System.out.println(new String(message));
}
}
RabbitListener源码解析
1.解析注解:Spring 在启动时会扫描带有 @RabbitListener
注解的方法,并解析这些注解
2.创建消息监听器容器:对于每个带有 @RabbitListener
注解的方法,Spring 会创建一个消息监听器容器。
3.启动消息监听器容器:容器启动后,会开始监听指定队列的消息。当队列中有消息时调用监听器方法来处理。
4.添加消息: spring启动时根据配置启动循环线程,循环线程处理broker投递的消息放入队列中
二.对于上面使用的详细解析
1.在上面的配置我们配置了 publisher-confirm-type: CORRELATED, publisher-returns: true 开启发送者确认模式。使用CorrelationData关联发送的消息
假设我们发送消息是将exchage 设置错误,将叫调用confirm方法,消息已发送到交换器但是设置路由key错误将会调用returnedMessage
rabbitTemplate.convertAndSend("covid.daily.data.exchange_fail", "covid.daily.data_fail", map, correlationId);
2.通过配置一下信息,我们设置消费者消费消息后需要手动确认,否则认为消息未消费成功。并且配置了消费重试次数为5.
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
# 每个容器的消费者数量控制,也就是线程池的大小
concurrency: 1
# acknowledge-mode: none
max-concurrency: 4
# 开启失败时的重试
retry:
enabled: true
max-attempts: 5
max-interval: 100000 # 重试最大间隔时间
initial-interval: 1000 # 重试初始间隔时间
通过使用channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 表示消息已处理成功。但是假设这个时候在调用远程api的过程网络中断了,这个时候怎么处理呢。
在前面我们配置了重试机制,通过重试拦截器来处理retryInterceptor,如果重试成功,那么正常消费消息,如果在重试次数用完之后任然没能消费消息,这个时候我们可以选择重新发送消息到队列中,或者将消息持久化,后面通过其他方式来处理记录的消息。通过配置RepublishMessageRecoverer,RejectAndDontRequeueRecoverer,ImmediateRequeueMessageRecoverer 可以冲入队列,拒绝消息等。
23:19:56.592 [taskExecutor-1] INFO com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到 : {time=2021-03-07},收到时间Sun Mar 07 23:19:56 CST 2021
23:19:56.593 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第2次调用
23:19:58.597 [taskExecutor-1] INFO com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到 : {time=2021-03-07},收到时间Sun Mar 07 23:19:58 CST 2021
23:19:58.600 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第3次调用
23:20:02.612 [taskExecutor-1] INFO com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到 : {time=2021-03-07},收到时间Sun Mar 07 23:20:02 CST 2021
23:20:02.616 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第4次调用
23:20:10.632 [taskExecutor-1] INFO com.zlennon.covid.common.CovidMsgReceiver.onMessage - HelloReceiver收到 : {time=2021-03-07},收到时间Sun Mar 07 23:20:10 CST 2021
23:20:10.634 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.onError - -----第5次调用
23:24:23.810 [taskExecutor-1] ERROR com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.lambda$recoverer$0 - 消息参数==>[],失败原因=[Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,1), conn: Proxy@2bd9722 Shared Rabbit Connection: SimpleConnection@54f3fd30 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 56057], (Body:'[B@56cf1eca(byte[129])' MessageProperties [headers={spring_listener_return_correlation=f5ff3d09-09b0-4942-9387-3b7f876b8a4d, spring_returned_message_correlation=d8bf92c6-fb8d-488b-9401-09e17c888a8c}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=covid.daily.data.exchange, receivedRoutingKey=covid.daily.data, deliveryTag=6, consumerTag=amq.ctag-7eogtN6aqvpgJvqrLV6EEw, consumerQueue=covid.daily.data.queue])]
23:24:23.812 [taskExecutor-1] WARN com.zlennon.covid.config.RabbitConfig$$EnhancerBySpringCGLIB$$46ecb22d.close - 重试结束,已重试次数=[5]
分享到: