springboot+rabbitmq如何实现消息确认机制(踩坑经验)

springboot+rabbitmq如何实现消息确认机制(踩坑经验)
本⽂收录在个⼈博客:p,技术资源共享,⼀起进步
最近部门号召⼤伙多组织⼀些技术分享会,说是要活跃公司的技术氛围,但早就看穿⼀切的我知道,这 T M 就是为了刷KPI。不过,话说回来这的确是件好事,与其开那些没味的扯⽪会,多做技术交流还是很有助于个⼈成长的。
于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和⼤伙⼀起学习学习!
这次我分享的是springboot + rabbitmq如何实现消息确认机制,以及在实际开发中的⼀点踩坑经验,其实整体的内容⽐较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。
可以看到使⽤了RabbitMQ以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:
消息⽣产者 - > rabbitmq服务器(消息发送失败)
rabbitmq服务器⾃⾝故障导致消息丢失
消息消费者 - > rabbitmq服务(消费消息失败)
所以说能不使⽤中间件就尽量不要⽤,如果为了⽤⽽⽤只会徒增烦恼。开启消息确认机制以后,尽管很⼤程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq整体效率变低,吞吐量下降严重,不是⾮常重要的消息真⼼不建议你⽤消息确认机制。生态石笼网箱
下边我们先来实现springboot + rabbitmq消息确认机制,再对遇到的问题做具体分析。
⼀、准备环境
1、引⼊ rabbitmq 依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、修改 application.properties 配置
配置中需要开启发送端和消费端的消息确认。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
noyes
spring.rabbitmq.publisher-returns=true
>>>>>>>>>>##
# 设置消费端⼿动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否⽀持重试
spring.rabbitmq.enabled=true
3、定义 Exchange 和 Queue
定义交换机confirmTestExchange和队列confirm_test_queue,并将队列绑定在交换机上。
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
rabbitmq的消息确认分为两部分:发送消息确认和消息接收确认。
⼆、消息发送确认
发送消息确认:⽤来确认⽣产者producer将消息发送到broker,broker上的交换机exchange再投递给队列queue的过程中,消息是否成功投递。
消息从producer到rabbitmq broker有⼀个confirmCallback确认模式。
消息从exchange到queue投递失败有⼀个returnCallback退回模式。
我们可以利⽤这两个Callback来确保消的100%送达。
1、 ConfirmCallback确认模式
消息只要被rabbitmq broker接收到就会触发confirmCallback回调。
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
<("消息发送异常!");
} else {
log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", Id(), ack, cause);
}
}
}
实现接⼝ConfirmCallback,重写其confirm()⽅法,⽅法内有三个参数correlationData、ack、cause。
correlationData:对象内部只有⼀个id属性,⽤来表⽰当前消息的唯⼀性。
ack:消息投递到broker的状态,true表⽰成功。
cause:表⽰投递失败的原因。
但消息被broker接收到只能表⽰已经到达 MQ服务器,并不能保证消息⼀定会被投递到⽬标queue⾥。所以接下来需要⽤到returnCallback。
2、 ReturnCallback 退回模式
如果消息未能投递到⽬标queue⾥将触发回调returnCallback,⼀旦向queue投递消息未成功,这⾥⼀般会记录下当前消息的详细投递数据,⽅便后续做重发或者补偿等操作。
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
实现接⼝ReturnCallback,重写returnedMessage()⽅法,⽅法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
下边是具体的消息发送,在rabbitTemplate中设置Confirm和Return回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建⼀个CorrelationData对象,添加⼀个id
为10000000000。
@Autowired
自动点火器private RabbitTemplate rabbitTemplate;磁性输送带
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到消息后,⼿动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/
奇石底座**
* 消息投递到队列失败回调处理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 发送消息
*/
message -> {
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
三、消息接收确认
消息接收确认要⽐消息发送确认简单⼀点,因为只有⼀个消息回执(ack)的过程。使⽤@RabbitHandler注解标注的⽅法要增加channel(信道)、message两个参数。
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {纸张阻燃剂
try {
log.info("⼩富收到消息:{}", msg);
//TODO 具体业务
channel.MessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (MessageProperties().getRedelivered()) {
<("消息已重复处理失败,拒绝再次接收...");
channel.MessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
<("消息即将再次返回队列处理...");
channel.MessageProperties().getDeliveryTag(), false, true);
}
}
}
}
消费消息有三种回执⽅法,我们来分析⼀下每种⽅法的含义。
1、basicAck
basicAck:表⽰成功确认,使⽤此回执⽅法后,消息会被rabbitmq broker删除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag:表⽰消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。⼿动消息确认模式下,我们可以对指定deliveryTag的消息进⾏ack、nack、reject等操作。multiple:是否批量确认,值为true则会⼀次性ack所有⼩于当前消息deliveryTag的消息。
举个栗⼦:假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条
消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进⾏确认。
2、basicNack
basicNack:表⽰失败确认,⼀般在消费消息业务异常时⽤到此⽅法,可以将消息重新投递⼊队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag:表⽰消息投递序号。
multiple:是否批量确认。
requeue:值为true消息将重新⼊队列。
3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进⾏批量操作,其他⽤法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag:表⽰消息投递序号。
requeue:值为true消息将重新⼊队列。
四、测试
发送消息测试⼀下消息确认机制是否⽣效,从执⾏结果上看发送者发消息后成功回调,消费端成功的消费了消息。
⽤抓包⼯具Wireshark观察⼀下rabbitmq amqp协议交互的变化,也多了ack的过程。
五、踩坑⽇志
1、不消息确认
这是⼀个⾮常没技术含量的坑,但却是⾮常容易犯错的地⽅。
开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会⼀直存在,导致重复消费。
2、消息⽆限投递
在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息,int a = 1 / 0发⽣异常后将消息重新投⼊队列。@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消费者 2 号收到:{}", msg);
int a = 1 / 0;
channel.MessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.MessageProperties().getDeliveryTag(), false, true);
}
}
但是有个问题是,业务代码⼀旦出现bug 99.9%的情况是不会⾃动修复,⼀条消息会被⽆限投递进队列,消费端⽆限执⾏,导致了死循环。
本地的CPU被瞬间打满了,⼤家可以想象⼀下当时在⽣产环境导致服务死机,我是有多慌。
⽽且rabbitmq management只有⼀条未被确认的消息。
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。
消费者会⽴刻消费这条消息,业务处理再抛出异常,消息再重新⼊队,如此反复进⾏。导致消息队列处理出现阻塞,导致正常消息也⽆法运⾏。
⽽我们当时的解决⽅案是,先将消息进⾏应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,⼜保证了正常业务的进⾏。
channel.MessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.MessageProperties().getReceivedExchange(),
但这种⽅法并没有解决根本问题,错误消息还是会时不时报错,后⾯优化设置了消息重试次数,达到了重试上限以后,⼿动确认,队列删除此消息,并将消息持久化⼊MySQL并推送报警,进⾏⼈⼯处理和定时任务做补偿。
3、重复消费
如何保证 MQ 的消费是幂等性,这个需要根据具体业务⽽定,可以借助MySQL、或者redis将消息持久化,通过再消息中的唯⼀性属性校验。
总结
到此这篇关于springboot + rabbitmq 如何实现消息确认机制(踩坑经验)的⽂章就介绍到这了,更多相关springboot rabbitmq 消息确认机制内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!

本文发布于:2024-09-22 15:49:40,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/1/183676.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   确认   队列   机制   投递   消费   发送   处理
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议