kafka怎么保证数据消费一次且仅消费一次?使用消息队列如何保证幂等性?

kafka怎么保证数据消费⼀次且仅消费⼀次?使⽤消息队列如
何保证幂等性?
精确⼀次处理语义(exactly onece semantic–EOS),的EOS主要体现在3个⽅⾯:
1)producer 保证单个分区的只会发送⼀次,不会出现重复消息
2)事务(transation):保证的写⼊多个分区,即写⼊到多个分区的消息要么全部成功,要么全部回滚
3)流式EOS:流处理本质上可看成是“读取-处理-写⼊管道”。整个过程的操作是原⼦性。
幂等producer只能保证单分区上⽆重复消息;事务可以保证多分区写⼊消息的完整性;⽽流处理EOS保证的是端到端(E2E)消息处理的EOS。⽤户在使⽤过程中需要根据⾃⼰的需求选择不同的EOS。
以下是启⽤⽅法:
1)启⽤幂等producer:在producer程序中设置属性enabled.idempotence=true,但不要设置transational_id.注意是不要设置,⽽不是设置为空字符串。
2)启⽤事务⽀持:在producer程序中设置属性transcational.id为⼀个指定字符串(你可以认为这是你的额事务名称,故最好七个有意义的名字),同时设置enable.idempotence=true
3)启⽤流处理EOS:在Kafka Streams程序中设置processing.guarantee=exactly_once
关于幂等producer的⼀些讨论
所谓幂等producer指producer.send的逻辑是幂等的,即发送相同的Kafka消息,broker端不会重复写⼊消息。同⼀条消息Kafka保证底层⽇志中只会持久化⼀次,既不会丢失也不会重复。幂等性可以极⼤地减轻下游consumer系统实现消息去重的⼯作负担,因此是⾮常实⽤的功能。值得注意的是,幂等producer提供的语义保证是有条件的:
单分区幂等性:幂等producer⽆法实现多分区上的幂等性。如前所述,若要实现多分区上的原⼦性,需要引⼊事务。
单会话幂等性:幂等producer⽆法跨会话实现幂等性。即使同⼀个producer宕机并重启也⽆法保证消息的EOS语义
虽然有上⾯两个限制,幂等producer依然是⼀个⾮常实⽤的新功能。下⾯我们来讨论下它的设计原理。如果要实现幂等性, 通常都需要花费额外的空间来保存状态以执⾏消息去重。Kafka的幂等producer整体上也是这样的思想。
⾸先,producer对象引⼊了⼀个新的字段:Producer ID(下称PID),它唯⼀标识⼀个producer,当producer启动时Kafka会为每个producer分配⼀个PID(64位整数),因此PID的⽣成和分配对⽤户来说是完全透明的,⽤户⽆需考虑PID的事情,甚⾄都感受不到PID的存在。Kafka重构了消息格式(有兴趣的参见Kafka 0.11消息设计),引⼊了序列号字段(sequence number,下称seq number)来标识某个PID producer发送的消息。
和consumer端的offset类似,seq number从0开始计数并严格单调增加。同时在broker端会为每个PID(即每个producer)保存该producer发送过来的消息batch的某些元信息,⽐如PID信息、消息batch的起始seq number及结束seq number等。这样每当该PID发送新的消息batch时,Kafka broker就会对⽐这些信息,如果发⽣冲突(⽐如起始seq number和结束seq number与当前缓存的相同),那么broker就会拒绝这次写⼊请求。倘若没有冲突,那么broker端就会更新这部分缓存然后再开始写⼊消息。
这就是Kafka实现幂等producer的设计思路:
  1. 为每个producer设置唯⼀的PID;
  2. 引⼊seq number以及broker端seq number缓存更新机制来去重。
kafka怎样保证消息仅被消费⼀次?
在使⽤kafka时,⼤多数场景对于数据少量的不⼀致(重复或者丢失)并不关注,⽐如⽇志,因为不会影响最终的使⽤或者分析,但是在某些应⽤场景(⽐如业务数据),需要对任何⼀条消息都要做到精确⼀次的消费,才能保证系统的正确性,kafka并不提供准确⼀致的消费API,需要我们在实际使⽤时借⽤外部的⼀些⼿段来保证消费的精确性,下⾯我们介绍如何实现。
名词说明:
offset:如图所⽰
重复消费(最少⼀次消费语义实现):消费数据处理业务“完成后”,再提交offset。因为在提交offset的过程中,可能出现提交失败的情况,已经消费了数据,但是offset没提交。从⽽导致数据重复消费。
解决办法:
⾄少成功发送⼀次+去重操作(幂等性)
a,如何保证⾄少成功发送⼀次?
保证不丢失消息:
⽣产者(ack=all 代表⾄少成功发送⼀次)
消费者 (offset⼿动提交,业务逻辑成功处理后,提交offset)去重问题:消息可以使⽤唯⼀id标识
b,保证不重复消费:落表(主键或者唯⼀索引的⽅式,避免重复数据)。
业务逻辑处理(选择唯⼀主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插⼊Redis或Mongdb,再进⾏业务逻辑处理)。
丢失数据(最多⼀次消费语义实现):在消费数据业务“处理前”进⾏offset提交。因为在后续数据业务处理过程中,如果出现故障,没有消费到消息,那么将导致数据丢失。为了避免数据丢失,可以设置:
enable.automit=false  关闭⾃动提交位移,这样,在消息被完整处理之后再⼿动提交位移。
丢包:指发送⽅发送的数据未到达接收⽅。常见的丢包可能发⽣在发送端,⽹络,接收端。
解决⽅案:
对kafka进⾏限速,平滑流量
启⽤重试机制,重试间隔时间设置长⼀些。
Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。
说明:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出⼀个 leader ,但是 leader 的数据还有⼀些没有被follower 副本的同步的话,就会造成消息丢失。
解决办法就是,设置 acks = all。
acks 是 Kafka ⽣产者(Producer) 很重要的⼀个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性。
数据有状态:可以根据数据信息进⾏确认数据是否重复消费,这时候可以使⽤⼿动提交的最少⼀次消费语义实现,即使消费的数据有重复,可以通过状态进⾏数据去重,以达到幂等的效果。
存储数据容器具备幂等性:在数据存⼊的容器具备天然的幂等(⽐如ElasticSearch的put操作具备幂等性,相同的数据多次执⾏Put操作和⼀次执⾏Put操作的结果是⼀致的),这样的场景也可以使⽤⼿动提交的最少⼀次消费语义实现,由存储数据端来进⾏数据去重。
数据⽆状态,并且存储容器不具备幂等:这种场景需要⾃⾏控制offset的准确性,这⾥数据不具备状态,存储使⽤关系型数据库,⽐如MySQL。通过⾃⼰管理offset的⽅式,来确保数据和offset信息是同时变化,通过数据库事务的特性来保证⼀致性和原⼦性。
Kafka 的 ISR 机制是什么?
现在我们来看⼀个 Kafka 的核⼼机制,就是 ISR 机制。远程电源管理
这个机制简单来说,就是会⾃动给每个 Partition 维护⼀个 ISR 列表,这个列表⾥⼀定会有 Leader,然后还会包含跟 Leader 保持同步的Follower。
也就是说,只要 Leader 的某个 Follower ⼀直跟他保持数据同步,那么就会存在于 ISR 列表⾥。
但是如果 Follower 因为⾃⾝发⽣⼀些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是“out-of-sync”,被从 ISR 列表⾥踢出去。
所以⼤家先得明⽩这个 ISR 是什么,说⽩了,就是 Kafka ⾃动维护和监控哪些 Follower 及时的跟上了 Leader 的数据同步。
Kafka 写⼊的数据如何保证不丢失?
所以如果要让写⼊ Kafka 的数据不丢失,你需要保证如下⼏点:
* 每个 Partition 都⾄少得有 1 个 Follower 在 ISR 列表⾥,跟上了 Leader 的数据同步。
* 每次写⼊数据的时候,都要求⾄少写⼊ Partition Leader 成功,同时还有⾄少⼀个 ISR ⾥的 Follower 也写⼊成功,才算这个写⼊是成功了。
自动美甲机
* 如果不满⾜上述两个条件,那就⼀直写⼊失败,让⽣产系统不停的尝试重试,直到满⾜上述两个条件,然后才能认为写⼊成功。
* 按照上述思路去配置相应的参数,才能保证写⼊ Kafka 的数据不会丢失。
android智能电视好!现在咱们来分析⼀下上⾯⼏点要求。
第⼀条,必须要求⾄少⼀个 Follower 在 ISR 列表⾥。
那必须的啊,要是 Leader 没有 Follower 了,或者是 Follower 都没法及时同步 Leader 数据,那么这个事⼉肯定就没法弄下去了。
第⼆条,每次写⼊数据的时候,要求 Leader 写⼊成功以外,⾄少⼀个 ISR ⾥的 Follower 也写成功。
这个要求就是保证说,每次写数据,必须是 Leader 和 Follower 都写成功了,才能算是写成功,保证⼀条数据必须有两个以上的副本。
这个时候万⼀ Leader 宕机,就可以切换到那个 Follower 上去,那么 Follower 上是有刚写⼊的数据的,此时数据就不会丢失了。
简单总结⼀下:
消费端重复消费:很容易解决,建⽴去重表。
消费端丢失数据:也容易解决,关闭⾃动提交offset,处理完之后受到移位。
⽣产端重复发送:这个不重要,消费端消费之前从去重表中判重就可以。
⽣产端丢失数据:这个是最⿇烦的情况。
解决策略:饲料加工工艺
1.异步⽅式缓冲区满了,就阻塞在那,等着缓冲区可⽤,不能清空缓冲区。
2.发送消息之后回调函数,发送成功就发送下⼀条,发送失败就记在⽇志中,等着定时脚本来扫描。(发送失败可能并不真的发送失败,只是没收到反馈,定时脚本可能会重发)。
带风扇的安全帽如何保证有序:
如果有⼀个发送失败了,后⾯的就不能继续发了,不然重发的那个肯定乱序了。
⽣产者在收到发送成功的反馈之前,不能发下⼀条数据,但我感觉⽣产者是⼀个流,阻塞⽣产者感觉业务上不可⾏,怎么会因为⼀条消息发出去没收到反馈,就阻塞⽣产者。
同步发送模式:发出消息后,必须阻塞等待收到通知后,才发送下⼀条消息。
异步发送模式:⼀直往缓冲区写,然后⼀把写到队列中去。
两种都是各有利弊:
同步发送模式虽然吞吐量⼩,但是发⼀条收到确认后再发下⼀条,既能保证不丢失消息,⼜能保证顺序。
设置 acks = all。
设置 replication.factor >= 3
为了保证 leader 副本能有 follower 副本能同步消息,我们⼀般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) ⾄少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
高速车针
设置 plicas > 1
⼀般情况下我们还需要设置 plicas> 1 ,这样配置代表消息⾄少要被写⼊到 2 个副本才算是被成功发送。
plicas 的默认值为 1 ,在实际⽣产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的⾼可⽤性,你需要确保 replication.factor > plicas 。为什么呢?设想⼀下加⼊两者相等的话,只要是有⼀个副本挂掉,整个分区就⽆法正常⼯作了。这明显违反⾼可⽤性!⼀般推荐设置成 replication.factor =
plicas + 1。

本文发布于:2024-09-22 19:21:23,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/183308.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   消费   数据
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议