RabbitMQ-Java-04-发布订阅模式

RabbitMQ-Java-04-发布订阅模式
说明
RabbitMQ-Java-04-发布订阅模式
本案例是⼀个Maven项⽬
假设你已经实现了上⼀节⼯作队列
官⽅⽂档已包含绝⼤多数本案例内容。请移步:docs.spring.io/spring-amqp/docs/current/reference/html/
核⼼概念
》原理
发布订阅模式核⼼是交换机Exchanges
当通过信道(channel)绑定了交换机(exchange)、队列(queue)、路由key(routing_key),就实现了发布订阅模式之前章节默认我已经使⽤了发布订阅模式,正式使⽤往往都是使⽤发布订阅模式。如
果不想使⽤发布订阅模式可以在发送消息的时候交换机传空字符串,路由key传队列名即可,这样消息默认⾛的都是默认交换机。
# 使⽤发布订阅模式
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, Bytes("UTF-8"));
# 不使⽤发布订阅模式
channel.basicPublish("", QUEUE_NAME, null, Bytes("UTF-8"));割礼龙凤斗
交换机的类型
扇出模式:fanout
交换机消息转发规则
⼴播到跟他绑定的所有队列
注意:这⾥是所有不同的队列。如果是同⼀个队列多个线程同时run(),最终还是会轮询分发到每个线程。
扇出交换机⽆视路由key。只要是该扇出交换机绑定的队列,不管有没有绑定路由key,所有消息都能收到。
核⼼代码提取
// 声明交换机(扇出模式)
直接模式:direct
交换机消息转发规则
匹配跟交换机绑定的队列
匹配队列绑定的路由key等于消息指定的路由key
直接交换机会严格判断消息绑定的路由和队列绑定的路由是否匹配
⼀个队列可以绑定多个路由key
电子发声挂图核⼼代码抽取
// 声明交换机(直接模式)
主题模式:topic
交换机消息转发规则
匹配跟交换机绑定的队列
匹配队列绑定的路由key经过模糊匹配后等于消息指定的路由key
主题模式路由模糊匹配规则
语法案例
英⽂句号表⽰词语连接符
英⽂句号连接的字符表⽰⼀个词语,词语可以是字母、数字、中⽂,每个词语最少1个字符匹配符号
扬卡洛夫*:星号匹配⼀个词语
婚姻保卫战片尾曲
#:井号匹配零个或多个词语
语法案例匹配规则举例
# 以下匹配规则都能匹配到上边的语法案例:
*.bb.*
aa.bb.*
*.*.cc
aa.#
#.cc
#
注意:路由长度最长255个字节
当⼀个队列绑定的路由key是#,那么这个队列将接受所有数据,等价于fanout
当⼀个队列绑定的路由key没有#和*,那么这个队列等价于direct
标题模式:headers / match
如何⽣成⼀个临时队列?
String queueName = channel.queueDeclare().getQueue()
操作步骤
》完整代码
⼯具类:RabbitMqUtils
asin.rabbit.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception {
// 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
// 配置
factory.setHost("192.168.3.202");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
// 获取连接
Connection connection = wConnection();
// 获取信道
Channel channel = ateChannel();
return channel;
return channel;
}
}
扇出模式:fanout
Subscribe01
asin.rabbit.publish.fanout;
asin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe01 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_01";
/
/ 路由key名
// public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {            System.out.println("[*] 成功收到消息:" + new Body()));
};
// 接收消息失败回调
血清肌酸激酶CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub1正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key⽆效,随便绑    }
public static Channel getChannel() throws Exception {
if (channel == null) {
Channel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe01.channel = channel;
}
}
Subscribe02
asin.rabbit.publish.fanout;
asin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Subscribe02 {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 队列名
public static final String QUEUE_NAME = "publish_subscribe_queue_02";
// 路由key名
/
/ public static final String ROUTING_KEY = "publish_subscribe_routing_02";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
// 初始化
init();
Channel channel = getChannel();
// 接收消息成功回调
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("[*] 成功收到消息:" + new Body()));
};
/
/ 接收消息失败回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("[*] 接收消息失败");
};
System.out.println("[*] Sub2正在等待接收消息。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
/**
* 初始化
*
* @throws Exception
*/
public static void init() throws Exception {
Channel channel = getChannel();
// 声明交换机
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
惠尚学// 绑定队列、交换机、路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 扇出模式路由key⽆效,随便绑    }
public static Channel getChannel() throws Exception {
if (channel == null) {
Channel());
}
return channel;
}
public static void setChannel(Channel channel) {
Subscribe02.channel = channel;
}
}
Publish
asin.rabbit.publish.fanout;
asin.rabbit.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Publish {
// 交换机名
public static final String EXCHANGE_NAME = "publish_subscribe_exchange";
// 路由key名
// public static final String ROUTING_KEY = "publish_subscribe_routing_01";
// 信道
public static Channel channel = null;
public static void main(String[] args) throws Exception {
/
/ 信道
Channel channel = getChannel();
System.out.println("[*] 正在等待控制台输⼊消息");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String input = ();
// 发消息
channel.basicPublish(EXCHANGE_NAME, "", null, Bytes("UTF-8"));
// 扇出模式就算绑定了⾮空路由key,所有队列也能接收到消息
// channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, Bytes("UTF-8"));
System.out.println("[*] 消息发送成功");
}
}

本文发布于:2024-09-22 04:21:11,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/18747.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:路由   消息   绑定
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议