基于MQTT和Kafka高并发场景下的消息路由方法[发明专利]

(19)中华人民共和国国家知识产权局
(12)发明专利申请
(10)申请公布号 (43)申请公布日 (21)申请号 201810465578.2
(22)申请日 2018.05.16
(71)申请人 华南理工大学
地址 510640 广东省广州市天河区五山路
381号
申请人 中山市华南理工大学现代产业技术
研究院
(72)发明人 陆璐 黄佳文 
(74)专利代理机构 广州市华学知识产权代理有
限公司 44245
代理人 陈宏升
(51)Int.Cl.
H04L  12/58(2006.01)
H04L  29/08(2006.01)
H04L  12/803(2013.01)
H04L  12/24(2006.01)
(54)发明名称
基于MQTT和Kafka高并发场景下的消息路由
方法
(57)摘要
本发明公开了基于MQTT和Kafka高并发场景
下的消息路由方法,该方法充分利用MQTT轻量级
协议在通信中支持数百万个设备同时连接的特
点,引入Kafka集以弥补MQTT协议不支持负载
均衡的缺陷,并通过磁盘顺序写速度快的特点来
应对高并发场景下的应用需求,大大提高了消息
的传送速度,支持实时数据流的保存和异步处
理。权利要求书2页  说明书4页  附图1页CN 108768826 A 2018.11.06
C N  108768826
A
1.基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,包括以下步骤:
利用Kafka的分区机制接收高并发场景下的用户消息,并以顺序写的方式写入磁盘,同时基于发布/订阅模式的消息队列进行保存,利用Kafka代理集实现负载均衡;
然后通过Kafka Stream将消息检索过滤,持久化存入数据库,并保持对接收端状态的监听;将过滤后的数据发送到MQTT服务器中,保存在不同的Topic下;
最后由MQTT协议中订阅了不同Topic的接收端进行匹配,获取消息,并且接收端在每次上线或下线时都将发送状态消息以更新在线列表。
2.根据权利要求1所述基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,所述利用Kafka的分区机制接收高并发场景下的用户消息,是依赖于磁盘顺序写的方式来存储和缓存消息的,且具有一定的时间期限;用户在客户端进行消息的发送时,指定消息要送达的分区,将partitioner.class设置为自定义的分区,并在partition()中设定消息发送到分区的具体规则。
3.根据权利要求2所述基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,所述用户消息,其记录以Key-Value键值对的方式进行发送,将发送者ID和接受者ID一同作为Key进行发布。
4.根据权利要求1所述基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,所述通过Kafka Stream将消息检索过滤,持久化存入数据库,并保持对接收端状态的监听,具体为:
对保存在每个分区下的消息记录,通过Kafka Stream建立一个流处理拓扑,并将键值对反序列化为数据对象的方式进行处理;
所述流处理拓扑中包含有一个Source处理节点、一个Sink处理节点以及M个自定义的处理节点,利用聚合操作可将消息记录为空的数据筛选过滤;其中M≥2;
过滤后的数据序列化后会发送至订阅了相应Topic的消费端,在消费端中,通过多线程将消息记录持久化数据库,并通过回调函数处理持久化后的结果;
另外,消费端还需订阅另外一个主题用于监听接收端的在线状态,并维持有一个在线列表。
5.根据权利要求1所述基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,所述将过滤后的数据发送到MQTT服务器中,保存在不同的Topic下,具体为:在Kafka的消费端,逐条消费事先保存在Kafka Broker分区中的消息,并提取每条消息的Key,Key中包含有该条消息的发送者ID和接收者ID;
发布消息指定Topic时,需将接收者ID作为发送者ID的父级Topic,并在Message Body 前加入发送者ID便于接收端解析;
接收者启动后将订阅以自身ID为第一层级的Topic,并使用通配符接收所有以自身ID 为第一层级Topic的消息;在对收到的消息解析其Message Body后,辨识出该消息内容的发送源。
6.根据权利要求1所述基于MQTT和Kafka高并发场景下的消息路由方法,其特征在于,所述接收端在每次上线或下线时都将发送状态消息以更新在线列表,具体为:启动接收者时需设定服务质量和是否清除
Session;因为接收端需获取离线消息,所以要保存Session,并将服务质量设定为At Least Once;
另外,设定遗嘱消息,当接收者下线时,将下线消息发给Kafka,避免消息在MQTT服务器中一直得不到消费,占用内存空间;当重新上线时,则再一次把消息发给Kafka,以更新其维持的在线的接收端的列表。
基于MQTT和Kafka高并发场景下的消息路由方法
技术领域
[0001]本发明涉及数据通信领域,特别涉及基于MQTT和Kafka高并发场景下的消息路由方法。
背景技术
[0002]随着互联网技术的飞速发展,智能终端设备的普及,以及业务需求的多样化,终端设备之间数据通信的重要性日益凸显,特别是在通过多种设备进行数据采集后再由计算设备进行统一处理的应用场景,如何保证实际应用中数据获取的实时性,高并发场景下的可用性,不稳定网络环境下离线消息存储成为了解决数据通信问题的关键。
[0003]MQTT协议是一种基于“发布/订阅”模式的即时通讯协议,可有效实现一对多或多对多的通信。它
设计在TCP协议之上,具有轻量、简单和易于实现的特点,旨在为低带宽和不稳定的网络环境中的终端设备提供可靠的网络服务。由于其消息标题可短至2个字节,因而具有非常小的通信开销。并且,其在设计之初就专门针对低功耗目标而进行了优化。更为独特的是,为了满足不同的场景需求,MQTT支持三种不同的服务质量。而且提供有遗嘱消息和会话保存功能,以针对不稳定的网络环境下,客户端与服务器断开后重新连接的解决方案。[0004]Kafka是一个支持分区存储、多副本的分布式消息系统,采用发布/订阅的消息处理模式,能有效应对代理宕机后的数据处理问题。Kafka以集的方式运行,由多个broker 共同构成。生产者将消息发送到特定的主题,再由订阅主题的消费者以poll的方式进行消费。其中,每个主题又被分成一个或多个的分区,每个分区由一系列有序、不可变的消息组成,是一个有序队列。特别的是,Kafka中以顺序写的方式写入磁盘,因而速度要比随机写入磁盘的方式快得多。除此之外,Kafka还引入消费组的概念,每个消费者都属于一个特定的消费组,同一分区下的一条消息只能被同一个消费组内一个消费者消费,但多个消费组可同时消费这一消息。
发明内容
[0005]本发明的主要目的在于克服现有技术的缺点与不足,提供基于MQTT和Kafka高并发场景下的消息路由方法,该方法充分利用MQTT轻量级协议在通信中支持数百万个设备同时连接的特点,引入Kafka集以弥补MQTT协议不支持负载均衡的缺陷,并通过磁盘顺序写速度快的特点来应对高并发场景下的应用需求,大大提高了消息的传送速度,支持实时数据流的保存和异步处理。
[0006]本发明的目的通过以下的技术方案实现:
[0007]基于MQTT和Kafka高并发场景下的消息路由方法,包括以下步骤:
[0008]利用Kafka的分区机制接收高并发场景下的用户消息,并以顺序写的方式写入磁盘,同时基于发布/订阅模式的消息队列进行保存,利用Kafka代理集实现负载均衡;[0009]然后通过Kafka Stream将消息检索过滤,持久化存入数据库,并保持对接收端状态的监听;将过滤后的数据发送到MQTT服务器中,保存在不同的Topic下;
[0010]最后由MQTT协议中订阅了不同Topic的接收端进行匹配,获取消息,并且接收端在每次上线或下线时都将发送状态消息以更新在线列表。
[0011]所述利用Kafka的分区机制接收高并发场景下的用户消息,是依赖于磁盘顺序写的方式来存储和缓存消息的,且具有一定的时间期限;由于Kafka中每个Topic下都有一个或多个分区,因而用户在客户端进行消息的发送时,可指定消息要送达的分区,将partitioner.class设置为自定义的分区策略实现,并在partition()中设定消息发送到分区的具体规则。因为在Kafka中创建主题是一种较为影响性能的操作,所以并不在Kafka 代理中对每个用户发起的会话请求都创建相应的主题,而采用一个主题下多个分区的方法,基于多个broker保存分区完成备份,并实现负载均衡。
[0012]所述用户消息,其记录以Key-Value键值对的方式进行发送,将发送者ID和接受者ID一同作为Key进行发布。这样使Kafka能够处理高并发的数据量,并承担消息存储的任务。[0013]所述通过Kafka Stream将消息检索过滤,持久化存入数据库,并保持对接收端状态的监听,具体为:
[0014]对保存在每个分区下的消息记录,通过Kafka Stream建立一个流处理拓扑,并将键值对反序列化为数据对象的方式进行处理;
[0015]所述流处理拓扑中包含有一个Source处理节点、一个Sink处理节点以及M个自定义的处理节点,利用聚合操作可将消息记录为空的数据筛选过滤;其中M≥2;
[0016]过滤后的数据序列化后会发送至订阅了相应Topic的消费端,在消费端中,通过多线程将消息记录持久化数据库,并通过回调函数处理持久化后的结果;
[0017]另外,消费端还需订阅另外一个主题用于监听接收端的在线状态,并维持有一个在线列表。
[0018]所述将过滤后的数据发送到MQTT服务器中,保存在不同的Topic下,具体为:[0019]在Kafka的消费端,逐条消费事先保存在Kafka Broker分区中的消息,并提取每条消息的Key,Key中包含有该条消息的发送者ID和接收者ID;
[0020]发布消息指定Topic时,需将接收者ID作为发送者ID的父级Topic,并在Message Body前加入发送
者ID便于接收端解析;
[0021]接收者启动后将订阅以自身ID为第一层级的Topic,并使用通配符接收所有以自身ID为第一层级Topic的消息;在对收到的消息解析其Message Body后,辨识出该消息内容的发送源。
[0022]由于MQTT支持多级Topic,且可用通配符匹配多层主题,因而在设计该消息路由系统时,关键在于如何使接收端分辨出具体的发送者。
[0023]所述接收端在每次上线或下线时都将发送状态消息以更新在线列表,具体为:[0024]启动接收者时需设定服务质量和是否清除Session;因为接收端需获取离线消息,所以要保存Session,并将服务质量设定为At Least Once,以减少消息的确认,提高Kafka 的并发能力;
[0025]另外,设定遗嘱消息,当接收者下线时,将下线消息发给Kafka,避免消息在MQTT服务器中一直得不到消费,占用内存空间;当重新上线时,则再一次把消息发给Kafka,以更新其维持的在线的接收端的列表。
[0026]本发明与现有技术相比,具有如下优点和有益效果:

本文发布于:2024-09-20 13:56:29,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/757045.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   场景   进行   并发   分区   接收端   处理
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议