RabbitMQ消息队列,发送消息失败、消息持久化、消费者失败处理方法和发送消息

RabbitMQ消息队列,发送消息失败、消息持久化、消费者失
处理⽅法和发送消息
项⽬是使⽤springboot项⽬开发的,前是代码实现,后⾯有分析发送消息失败、消息持久化、消费者失败处理⽅法和发送消息解决⽅法及⼿动确认的模式
先引⼊l
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application 配置⽂件
spring:
rabbitmq:
host: IP地址
port: 5672
username: ⽤户名
password: 密码
RabbitConfig配置⽂件
slf4j.Slf4j;
import org.Binding;
import org.BindingBuilder;
import org.DirectExchange;
import org.Queue;
import org.springframework.tion.CachingConnectionFactory;
import org.springframework.tion.ConnectionFactory;
import org.springframework.RabbitTemplate;
import org.springframework.verter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.fig.ConfigurableBeanFactory;
import t.annotation.Bean;
import t.annotation.Configuration;
import t.annotation.Scope;
/**
Broker:它提供⼀种传输服务,它的⾓⾊就是维护⼀条从⽣产者到消费者的路线,保证数据能按照指定的⽅式进⾏传输,
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到⼀个或多个队列。
Binding:绑定,它的作⽤就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进⾏消息投递。
vhost:虚拟主机,⼀个broker⾥可以有多个vhost,⽤作不同⽤户的权限分离。
Producer:消息⽣产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接⾥,可建⽴多个channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
public static final String EXCHANGE_A = "my_mq_exchange_A";
public static final String EXCHANGE_B = "my_mq_exchange_B";
public static final String EXCHANGE_C = "my_mq_exchange_C";
public static final String QUEUE_A="QUEUE_A";
public static final String QUEUE_B="QUEUE_B";
public static final String QUEUE_C="QUEUE_C";
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试
connectionFactory.setChannelCacheSize(100);//解决多线程发送消息
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMandatory(true); //设置发送消息失败重试
return template;
}
//配置使⽤json转递数据
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
环保电镀}
/*public SimpleMessageListenerContainer messageListenerContainer(){rgd-208
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());
return container;
}*/
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,⽆ routingkey的概念
* HeadersExchange: 通过添加属性key - value匹配
* DirectExchange: 按照routingkey分发到指定队列
* TopicExchange : 多关键字匹配
* @return
*/
@Bean
public DirectExchange defaultExchange(){
return new DirectExchange(EXCHANGE_A,true,false);
}
@Bean
public Queue queueA(){
return  new Queue(QUEUE_A,true);// 队列持久化
}
@Bean
public Queue queueB(){
return  new Queue(QUEUE_B,true);// 队列持久化
}
/**
* ⼀个交换机可以绑定多个消息队列,也就是消息通过⼀个交换机,可以分发到不同的队列当中去。
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);    }
@Bean
public Binding bindingB(){
return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);    }
}
⽣成者
slf4j.Slf4j;
import org.Message;
import org.springframework.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* ⽣产者
*/
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
private RabbitTemplate rabbitTemplate;
@Autowired
public ProducerMessage(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
人工抽脂rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
rabbitTemplate.setReturnCallback(this::returnedMessage);
rabbitTemplate.setMandatory(true);
}
public void  sendMsg (Object content){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
/**
* 消息发送到队列中,进⾏消息确认
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(" 消息确认的id: " + correlationData);
if(ack){
压延加工log.info("消息发送成功");
//发送成功删除本地数据库存的消息
}else{
log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);
/
/ 根据本地消息的状态为失败,可以⽤定时任务去处理数据
}
}
/**
* 消息发送失败返回监控
* @param message
* @param i
* @param s
* @param s1
* @param s2
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
方艺蒙
log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);
}
}
消费者
import com.rabbitmq.client.Channel;
slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者
*/
@Slf4j
@Component
public class ComsumerMessage {
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public void handleMessage(Message message,Channel channel) throws  IOException{
try {
String json = new Body());
JSONObject jsonObject = JSONObject.fromObject(json);
log.info("消息了【】handleMessage" +  json);
int i = 1/0;
//业务处理。
/**
* 防⽌重复消费,可以根据传过来的唯⼀ID先判断缓存数据中是否有数据
* 1、有数据则不消费,直接应答处理
* 2、缓存没有数据,则进⾏消费处理数据,处理完后⼿动应答
* 3、如果消息处理异常则,可以存⼊数据库中,⼿动处理(可以增加短信和邮件提醒功能)            */
//⼿动应答
channel.MessageProperties().getDeliveryTag(),false);
}catch (Exception e){
<("消费消息失败了【】error:"+ Body());
<("OrderConsumer  handleMessage {} , error:",message,e);
// 处理消息失败,将消息重新放回队列
channel.MessageProperties().getDeliveryTag(), false,true);
}
红外线烘干箱}
}
发送消息:调⽤⽣成的⽅法

本文发布于:2024-09-21 17:22:50,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/183098.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   发送   失败   处理   数据   队列
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议