消息队列CMQ七大功能实践案例

消息队列CMQ七⼤功能实践案例
背景
消息队列,在业务解耦、削峰填⾕、流量控制、⼴播消息等场景下都有很好的应⽤,已经成为很多企业IT系统内部通信重要⼿段。
现有常⽤的开源消息中间件有RabbitMQ、Kafka、RocketMQ等,但各⾃有着不同的应⽤场景和特点,例如,Kafka注重的是消息的吞吐量,不保证消息存储的可靠性以及⼀致性,因此多⽤于⽇志系统数据的上报;RabbitMQ能保证消息可靠存储投递,但性能较差。
CMQ(Cloud Message Queue)是腾讯云开发的⼀款⾼可靠、⾼可⽤、⾼性能的分布式消息队列服务,具有低耦合、消息可靠、强⼀致性、可扩展性等特点,⽀持Push/Pull消费模型、消息回溯、延时消息、发布订阅、路由⼴播、消息加密等⼀系列功能,以满⾜更多的mq应⽤场景。
相对Kafka,CMQ更多注重消息⾼可靠的应⽤场景,例如⾦融、交易、订单等业务;相⽐RabbitMQ,CMQ在可⽤性和性能上做了很⼤的优化和提升。更详细的对⽐,请参考官⽹介绍。
本⽂先简单介绍CMQ底层的架构实现,然后着重结合CMQ的功能特点来介绍CMQ的实践案例,让⼤家快速理解和上⼿CMQ的开发。
底层架构
CMQ整体架构如上图所⽰,每个set由三个broker节点副本组成,保证消息的可靠存储以及⾼可⽤性,且基于raft算法保证数据的⼀致性。CMQ单个set 在CAP理论中优先保证了CP,当SET中过半数节点都正常⼯作时,才能进⾏消息的⽣产消费。
单水合肼实践案例u型吊臂
⼀、⼴播拉取消息模型
CMQ⽀持队列(queue)和主题(topic)两种模型,如下所⽰:
其中,queue模型是⼀对⼀的消息拉取(pull)模式,client端主动pull消息;⽽topic模型,也称发布/订阅模型,是⼀对多的消息推送(push)模式,CMQ服务端⼴播消息时,根据各个订阅地址主动推送消息给client。两种模型基本能满⾜⼤部分应⽤场景了,对⽐如下:
queue模型,client端可以灵活根据⾃⾝能⼒去消费pull消息,消息实时性依赖client的消费速度,如果消费速度⽐⽣产速度慢,会引起⼤量消息堆积。
topic模型,服务端主动推送消息,消息实时性⽐较⾼,但要求client性能上能及时处理⼤量推送过来的消息,并且在client发⽣故障的时候可能会导致丢消息(有消息重发策略做基本保障)。
对于topic模型,有以下特殊场景需求:吸脂器
client端想根据⾃⾝能⼒去pull消息
创建订阅的时候需要暴露client端的接收消息的地址,但在⼀些企业内⽹、vpc⽹络等特殊情况下,CMQ⽆法推送到,只能⽤pull⽅式获取消息。g蛋
针对以上特殊场景,CMQ结合queue和topic两种模型实现了⼀对多的⼴播拉取消息模型,如下所⽰:
topic的订阅者可以是⼀个queue实例,topic发布消息后,会⾃动将消息推送到queue,然后client和使⽤queue模型⼀样去消费消息即可。
# python sdk demo code: create subion of queue protocal
my_sub = _subion(topic_name, subion_name)
subion_meta = SubionMeta()
subion_meta.Endpoint = "queue1"
subion_meta.Protocal = "queue"
ate(subion_meta)
⼆、Pull长轮询
对于Queue模型,消费者需要pull获取消息,但问题是:消费者不知道队列什么时候有消息,只能不停轮询请求去pull,如果轮询间隔时间短,在队列长时间没有消息时会耗费消费者请求资源且效率低,如果轮询间隔时间长,则消费速度慢,消息实时性低,且造成消息⼤量堆积。
黄鳝精针对以上问题,CMQ解决⽅案是设计了长轮询功能。例如,假设设置队列长轮询时间为10s
当消费者pull消息时,如果队列中有消息则马上返回
如果队列暂时没有消息,消费者pull请求不会马上返回,⽽是会等待阻塞10s:当10s内有新的⽣产消息到达队列,CMQ 会马上将消息投递给正在阻塞等待的消费者,消费者端感知就是阻塞的pull请求被唤醒并且收到消息返回;当10s内队列都没有消息,则请求返回告诉消费者当前队列没有消息。
# python sdk demo code: receive message through long polling
pollingWaitSeconds = 3
recv_msg = ive_message(pollingWaitSeconds)
三、延时消息
CMQ提供延时消息功能:消息发送到队列后,从⼊队时间算起,消息在设置的延时时间后才对消费者可见,即才能被消费者消费到。延时消息功能可以很轻松实现⼀些定时任务的应⽤场景。
如上图所⽰,根据CMQ延迟消息功能实现的定时任务检查告警系统。
# python sdk demo code: send delayed message
msg_body = "I am delay message"
msg = Message(msg_body)
delaySeconds = 3
my_queue.send_message(msg, delaySeconds)
四、消息回溯
CMQ提供类似于Kafka的消息回溯能⼒,已经消费删除的消息是可以通过回溯来重新消费的。⽬前⽀持指定回溯时间点,在这个时间点开始被删除的消息可以重新消费到。此功能在⼀些⾦融业务对账、
业务系统重试等场景下有很好的实⽤性。
最⼤可回溯时间点 = 当前时间 - 设置的可回溯时长。消息⽣产时间在这个值之前的不可回溯,之后的可回溯,如下图所⽰:
小型塑料封口机
# python sdk demo code: rewind the queue
# backtrack one hour
backTrackingTime = int(time.time()) - 3600
windQueue(backTrackingTime)
五、Topic路由匹配
CMQ topic模型提供类似于RabbitMQ的消息路由匹配功能,在消息⼴播基础上实现了消息的⾃动分发。
订阅者可以指定bindingKey,即路由规则,如上所⽰,*(星号)可以匹配⼀个单词,#(井号)可以匹配⼀个或多个单词。例如,⽣产者发布⼀个消息,且消息的路由键(routingKey)是”ange.elephant”,
那么该消息只会推送给消费者C1;如果routingKey=”ange.rabbit”,则消息会推送给C1和C2;如果routingKey=”lazy.brown.fox”,则消息只会推送给C2。
# python sdk demo code: set topic-subion route-rule
my_sub = _subion(topic_name, subion_name)
subion_meta = SubionMeta()
subion_meta.Endpoint = "test"
subion_meta.Protocal = "http"
subion_meta.bindingKey = ['*.*.rabbit','lazy.#']
ate(subion_meta)
message = Message()
message.msgBody = "route msg test"
my_topic.publish_message(message, 'ange.rabbit')
六、超⼤消息传输
⽬前CMQ的队列消息⼤⼩最⼤限制为1MB,⽽当消息⼤⼩不超过64KB时,收发消息的最⼤QPS限制分别为正常的
5k(有特殊需求可调整),当消息⼤⼩超过64KB⽽⼩于1MB时,CMQ不保证收发消息的QPS性能。因此,⽀持⼤于64KB 的消息只是为了考虑业务偶尔传输少量⼤消息且不想做消息分⽚的应⽤场景。
⼀般来说,64KB的消息限制⼤⼩基本能满⾜⼤部分业务场景需求了,但在某些特殊场景下,消息数据⼤于64KB甚⾄⼤于1MB时,业务和CMQ如何⽀持这种超⼤消息的传输呢?这⾥有两种解决⽅案:
1.消息分⽚。类似IP数据包分⽚传输原理,⽣产者对消息分⽚标记后分别发送到队列,消费者从队列取出所有分⽚消息进⾏组装。个⼈⽅案如下:
每个消息body分为header和data两部分。其中,data就是原消息分⽚后的内容,header包含三个标记:业务指定消息的ID号,唯⼀记录⼀个消息的ID值,具有同⼀个ID号的消息分⽚才会在消费端重新组装;分⽚序号(从1开始),记录⼀个消息分⽚的次序编号,消费端依据分⽚序号依次组装消息;下⼀分⽚是否存在的标记,如果是,说明消息包还不完整,否则消息组装完毕。
由于可能存在多个消费者client,不同分⽚可能被不同client接收到,为了能够组装分⽚,需要⼀个集
中式的地⽅存储所有分⽚并最终组装成完整的消息包,但⽆疑⼤⼤增加了系统设计的复杂度。
2.COS代理存储(COS是腾讯云的对象存储服务)。类似编程中的指针原理,⽅案如下(具体代码实现参考附件):
⽣产者先把超⼤消息的数据以⽂件形式上传到COS,并返回消息⽂件的COS URL地址;
⽣产者将URL地址作为消息发送到CMQ队列中;
消费者从CMQ队列中读取消息,判断消息内容是否是COS的URL地址信息,如果是,则根据URL地址从COS下载相应的消息⽂件,并从⽂件中读取出超⼤消息的数据。
七、消息加密传输
腾讯云提供秘钥管理服务KMS,能对数据进⾏安全加密。CMQ消息加密功能有以下两种⽅案:
1.CMQ SDK客户端加密⽅案。客户端发送消息时,根据设置的CMK(KMS的秘钥ID)调⽤KMS⽣成数据秘钥接⼝,会返回数据秘钥的明⽂key以及加密后的密⽂key,使⽤明⽂key对消息进⾏本地加密,然后将加密的数据和密⽂key作为消息发送给CMQ;消费者接收消息时,先获取消息中的密⽂key,调⽤KMS接⼝解密(不必每次均调⽤,可做缓存)得到对应的明⽂key,最后根据明⽂key本地解密密⽂数据即可。具体代码实现参考附件。
2.CMQ服务端加密⽅案。该⽅案,由CMQ服务端和KMS服务打通,CMQ⾃动对消息加解密,⽤户⽆感知,例如,⽤户通过https接⼝发送消息,由CMQ⾃动加密后存储,通过https接⼝接收消息时,CMQ对消息⾃动解密后返回给⽤户。此功能正在开发中。

本文发布于:2024-09-21 10:42:00,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/183340.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   消费者   队列   功能
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议