RocketMQ消息队列的最佳实践

RocketMQ消息队列的最佳实践
1 ⽣产者
1.1 发送消息注意事项
1 Tags的使⽤
⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。
tags可由应⽤⾃⾏设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤:
message.setTags("TagA");
2 Keys的使⽤
每个消息在业务层⾯的唯⼀标识码要设置到keys字段,⽅便⽇后定位消息丢失问题。
服务器会为每个消息创建哈希索引,应⽤可以通过topic、key来查询这条消息内容,以及消息被谁消费。
哈希索引,请保证key尽可能唯⼀,避免潜在的哈希冲突。
// 订单Id
String orderId ="20034568923546";
message.setKeys(orderId);
3 ⽇志的打印
消息发送成功或者失败要打印消息⽇志,务必要打印SendResult和key字段。send消息⽅法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult⾥定义。以下对每个状态进⾏说明:
SEND_OK
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启⽤同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
FLUSH_DISK_TIMEOUT
消息发送成功但是服务器刷盘超时。此时消息已经进⼊服务器队列(内存),只有服务器宕机,消息
才会丢失。消息存储配置参数中可以设置刷盘⽅式和同步刷盘时间长度,如果Broker服务器设置了刷盘⽅式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘⽅式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进⼊服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的⾓⾊是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
SLAVE_NOT_AVAILABLE
消息发送成功,但是此时Slave不可⽤。如果Broker服务器的⾓⾊是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——⽆Slave服务器可⽤。
1.2 消息发送失败处理⽅式
Producer的send⽅法本⾝⽀持内部重试,重试逻辑如下:
⾄多重试2次(同步发送为2次,异步发送为0次)。
如果发送失败,则轮转到下⼀个Broker。这个⽅法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
如果本⾝向broker发送消息产⽣超时异常,就不会再重试。
⼀定程度上保证了消息可以发送成功。如果业务对消息可靠性要求⽐较⾼,建议应⽤增加相应的重试逻辑:⽐如调⽤send同步⽅法发送失败时,尝试将消息存储到DB,然后由后台线程定时重试,确保消息⼀定到达Broker。
那么DB重试⽅案为什么没有集成到MQ客户端内部,⽽要求应⽤⾃⼰完成?
MQ的客户端设计为⽆状态模式,⽅便任意的⽔平扩展,且对机器资源的消耗仅仅是cpu、内存、⽹络
如果MQ客户端内部集成⼀个KV存储模块,那么数据只有同步落盘才能较可靠,⽽同步落盘本⾝性能开销较⼤,所以通常会采⽤异步落盘,⼜由于应⽤关闭过程不受MQ运维⼈员控制,可能经常会发⽣ kill -9 这样暴⼒⽅式关闭,造成数据没有及时落盘⽽丢失
Producer所在机器的可靠性较低,⼀般为虚拟机,不适合存储重要数据。综上,推荐重试过程交由应⽤控制
1.3选择oneway形式发送
消息发送过程:
测脑龄客户端发送请求到服务器
服务器处理请求
服务器向客户端返回应答
所以,⼀次消息发送的耗时时间是上述三个步骤的总和,⽽某些场景要求耗时⾮常短,但是对可靠性要求并不⾼,例如⽇志收集类应⽤,此类应⽤可以采⽤oneway形式调⽤,oneway形式只发送请求不等待应答,⽽发送请求在客户端实现层⾯仅仅是⼀个os系统调⽤的开销,即将数据写⼊客户端的socket缓冲区,此过程耗时通常在微秒级。
2 消费者
2.1 消费过程幂等
RocketMQ⽆法避免消息重复(Exactly-Once),所以如果业务对消费重复⾮常敏感,务必要在业务
层⾯进⾏去重处理。可以借助关系数据库进⾏去重。⾸先需要确定消息的唯⼀键,可以是msgId,也可以是消息内容中的唯⼀标识字段,例如订单Id等。在消费之前判断唯⼀键是否在关系数据库中存在。如果不存在则插⼊,并消费,否则跳过。(实际过程要考虑原⼦性问题,判断是否存在可以尝试插⼊,如果报主键冲突,则插⼊失败,直接跳过)
msgId⼀定是全局唯⼀标识符,但是实际使⽤中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进⾏重复消费。
2.2 消费速度慢的处理⽅式
1 提⾼消费并⾏度
绝⼤部分消息消费⾏为都属于 IO 密集型,即可能是操作数据库,或者调⽤ RPC,这类消费⾏为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并⾏度,可以提⾼总的消费吞吐量,但是并⾏度增加到⼀定程度,反⽽会下降。所以,应⽤必须要设置合理的并⾏度。如下有⼏种修改消费并⾏度的⽅法:
同⼀个 ConsumerGroup 下,通过增加 Consumer 实例数量来提⾼并⾏度(需要注意的是超过订阅队列数的 Consumer 实例⽆效)。可以通过加机器,或者在已有机器启动多个进程的⽅式。
暗访摄像包
提⾼单个 Consumer 的消费并⾏线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
2 批量⽅式消费
某些业务流程如果⽀持批量⽅式消费,则可以很⼤程度上提⾼消费吞吐量,例如订单扣款类应⽤,⼀次处理⼀个订单耗时 1 s,⼀次处理10 个订单可能也只耗时 2 s,这样即可⼤幅度提⾼消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即⼀次只消费⼀条消息,例如设置为 N,那么每次消费的消息数⼩于等于 N。
3 跳过⾮重要消息
发⽣消息堆积时,如果消费速度⼀直追不上发送速度,如果业务对数据要求不⾼的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。⽰例代码如下:
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
long offset = (0).getQueueOffset();
String maxOffset =
<(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset)- offset;
if(diff >100000){
// TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
4 优化每条消息消费过程
举例如下,某条消息的消费过程如下:
根据消息从 DB 查询【数据 1】
根据消息从 DB 查询【数据 2】
复杂的业务计算
向 DB 插⼊【数据 3】
向 DB 插⼊【数据 4】
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时
25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提⾼了 40%。所以应⽤如果对时延敏感的话,可以把DB部署在SSD硬盘,相⽐于SCSI磁盘,前者的RT会⼩很多。
2.3 消费打印⽇志
如果消息量较少,建议在消费⼊⼝⽅法打印消息,消费耗时等,⽅便后续排查问题。
public ConsumeConcurrentlyStatus consumeMessage(
卢允忠
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
log.info("RECEIVE_MSG_BEGIN: "+ String());
miad530// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更⽅便。
2.4 其他消费建议
对路网
1 关于消费者和订阅
第⼀件需要注意的事情是,不同的消费者组可以独⽴的消费⼀些 topic,并且每个消费者组都有⾃⼰的消费偏移量,请确保同⼀组内的每个消费者订阅信息保持⼀致。
2 关于有序消息
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关⼼消息顺序的时候会很有⽤。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。
3 关于并发消费
顾名思义,消费者将并发消费这些消息,建议你使⽤它来获得良好性能,我们不建议抛出异常,你可以返回ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。
4 关于消费状态Consume Status
对于并发的消费,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息,因为你关⼼它的顺序,所以不能跳过消息,但是你可以返回
SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待⽚刻。
5 关于Blocking
不建议阻塞,因为它会阻塞线程池,并最终可能会终⽌消费进程
6 关于线程数设置
消费者使⽤ ThreadPoolExecutor 在内部对消息进⾏消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。
7 关于消费位点
当建⽴⼀个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后⽣成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使⽤CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产⽣的消息。
3 Broker
3.1 Broker ⾓⾊
Broker ⾓⾊分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求⽐较严格,可以采⽤ SYNC_MASTER加SLAVE的部署⽅式。如果对消息可靠性要求不⾼,可以采⽤ASYNC_MASTER加SLAVE的部署⽅式。如果只是测试⽅便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署⽅式。
3.2 FlushDiskType
SYNC_FLUSH(同步刷新)相⽐于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。
3.3 Broker 配置
参数名默认值说明
listenPort10911接受客户端连接的监听端⼝
namesrvAddr null nameServer 地址
brokerIP1⽹卡的 InetAddress当前 broker 监听的 IP
全自动烧录机brokerIP2跟 brokerIP1 ⼀样存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进⾏同步
brokerName null broker 的名称brokerClusterName DefaultCluster本 broker 所属的 Cluser 名称brokerId0broker id, 0 表⽰ master, 其他的正整数表⽰ slave storePathCommitLog$HOME/store/commitlog/存储 commit log 的路径storePathConsumerQueue$HOME/store/consumequeue/存储 consume queue 的路径mappedFileSizeCommitLog1024 * 1024 * 1024(1G)commit log 的映射⽂件⼤⼩deleteWhen04在每天的什么时间删除已经超过⽂件保留时间的 commit log fileReservedTime72以⼩时计算的⽂件保留时间
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认⽣产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利⽤刷盘⼀组消息的
模式,可以取得更好的性能。
参数名默认值说明
4 NameServer
R ocketMQ 中,Name Servers 被设计⽤来做简单的路由管理。其职责包括:
Brokers 定期向每个名称服务器注册路由数据。
名称服务器为客户端,包括⽣产者,消费者和命令⾏客户端提供最新的路由信息。
5 客户端配置
相对于RocketMQ的Broker集,⽣产者和消费者都是客户端。本⼩节主要描述⽣产者和消费者公共的⾏为配置。
5.1 客户端寻址⽅式
RocketMQ可以令客户端到Name Server, 然后通过Name Server再到Broker。如下所⽰有多种配置⽅式,优先级由⾼到低,⾼优先级会覆盖低优先级。
代码中指定Name Server地址,多个namesrv地址之间⽤分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
Java启动参数中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
环境变量指定Name Server地址
export  NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
HTTP静态服务器寻址(默认)
192.168.0.1:9876;192.168.0.2:9876
客户端默认每隔2分钟访问⼀次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts⽂件来改变要访问的服务器,例如在/etc/hosts增加如下配置:
10.232.22.67    jmenv.taobao
推荐使⽤HTTP静态服务器寻址⽅式,好处是客户端部署简单,且Name Server集可以热升级。
5.2 客户端配置
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig 类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以⽤spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其他参数同理。
1 客户端的公共配置
参数名默认值说明
namesrvAddr Name Server地址列表,多个NameServer地址⽤分号隔开

本文发布于:2024-09-22 03:50:45,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/183333.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   消费   发送   服务器   客户端   消费者
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议