ApacheStorm实时流处理系统ACK机制以及源码分析

ApacheStorm实时流处理系统ACK机制以及源码分析
1.ACK机制简介
Storm的可靠性是指Storm会告知⽤户每⼀个消息单元是否在⼀个指定的时间(timeout)内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍⽣的所有Tuple都经过了Topology中每⼀个应该到达的Bolt的处理。
注: timetout 可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定
Storm中的每⼀个Topology中都包含有⼀个Acker组件。Acker组件的任务就是跟踪从某个task中的Spout流出的每⼀个messageId所绑定的Tuple树中的所有Tuple的处理情况。如果在⽤户设置的最⼤超时时间内这些Tuple没有被完全处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功,它会分别调⽤Spout中的fail和ack⽅法。
Storm允许⽤户在Spout中发射⼀个新的源Tuple时为其指定⼀个MessageId,这个MessageId可以是任意的Object对象。多个源Tuple可以共⽤同⼀个MessageId,表⽰这多个源Tuple对⽤户来说是同⼀个消息单元,它们会被放到同⼀棵tuple树中,如下图所⽰:
在Spout中由message 1绑定的tuple1和tuple2分别经过bolt1和bolt2的处理,然后⽣成了两个新的Tuple,
并最终流向了bolt3。当bolt3处理完之后,称message 1被完全处理了。
2. Acker原理分析
storm⾥⾯有⼀类特殊的task称为Acker(acker bolt), 负责跟踪spout发出的每⼀个tuple的tuple树。当Acker发现⼀个tuple树已经处理完成了。它会发送⼀个消息给产⽣这个tuple的那个task。你可以通过Config.TOPOLOGY_ACKERS来设置⼀个topology⾥⾯的acker的数量, 默认值是1。 如果你的topology⾥⾯的tuple⽐较多的话, 那么把acker的数量设置多⼀点,效率会⾼⼀点。
理解storm的可靠性的最好的⽅法是来看看tuple和tuple树的⽣命周期, 当⼀个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予⼀个64位的id,⽽acker就是利⽤这个id去跟踪所有的tuple的。每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射⼀个tuple, 它的祖宗id都会传给这个新的tuple。所以当⼀个tuple被ack的时候,它会发⼀个消息给acker,告诉它这个tuple树发⽣了怎么样的变化。具体来说就是它告诉acker:  我已经完成了, 我有这些⼉⼦tuple, 你跟踪⼀下他们吧。
(spout-tuple-id, tmp-ack-val)
tmp-ark-val =  tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果
当⼀个tuple需要ack的时候,它到底选择哪个acker来发送这个信息呢?
storm使⽤ ⼀致性哈希来把⼀个spout-tuple-id对应到acker, 因为每⼀个tuple知道它所有的祖宗的tuple-id, 所以它⾃然可以算出要通知哪个Acker来ack。
注:⼀个tuple可能存在于多个tuple树,所有可能存在多个祖宗的tuple-id燃料棒
acker是怎么知道每⼀个spout tuple应该交给哪个task来处理?
当⼀个spout发射⼀个新的tuple, 它会简单的发⼀个消息给⼀个合适的acker,并且告诉acker它⾃⼰的id(taskid), 这样storm就有了taskid-tupleid的对应关系。 当acker发现⼀个树完成处理了, 它知道给哪个task发送成功的消息。
3.Acker的⾼效性
acker task并不显式的跟踪tuple树。对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反, acker⽤了⼀种不同的⽅式, 使得对于每个spout tuple所需要的内存量是恒定的(20 bytes) .  这个跟踪算法是storm如何⼯作的关键,并且也是它的主要突破。
压电陶瓷片
⼀个acker task存储了⼀个spout-tuple-id到⼀对值的⼀个mapping。这个对⼦的第⼀个值是创建这个tuple的taskid, 这个是⽤来在完成处理tuple的时候发送消息⽤的。 第⼆个值是⼀个64位的数字称作:ack val, ack val是整个tuple树的状态的⼀个表⽰,不管这棵树多⼤。它只是简单地把这棵树上的所有创建的tupleid/ack的tupleid⼀起异或(XOR)。
当⼀个acker task 发现⼀个 ack val变成0了, 它知道这棵树已经处理完成了。
例如下图是⼀个简单的Topology。
ack_val的初值为0,varl_x表⽰新产⽣的tuple id ,它们经过Spout,Bolt1,Bolt2,Bolt3 处理,并与arv_val异或,最终arv_val变为0,表⽰tuple1被成功处理。
下⾯看⼀个稍微复杂⼀点的例⼦:
msg1绑定了两个源tuple,它们的id分别为1001和1010.在经过Bolt1处理后新⽣成了tuple id为1110,新⽣成的tuple与传⼊的tuple 1001进⾏异或得到的值为0111,然后Bolt1通过spout-tuple-id映射到指定的Acker组件,向它发送消息,Acker组件将Bolt1传过来的值与ack_val异或,更新ack_val的值变为了0100。与此相同经过Bolt2处理后,ack_val的值变为0001。最后经Bolt3处理后ack_val 的值变为了0,
说明此时由msg1标识的Tuple处理成功,此时Acker组件会通过事先绑定的task id映射到对应的Spout,然后调⽤该Spout的ack⽅法。
其流程如下图所⽰:
4.ACK机制配置
Acker task是⾮常轻量级的, 所以⼀个topology⾥⾯不需要很多acker。你可以通过Strom UI(id: -1)来跟踪它的性能。 如果它的吞吐量看起来不正常,那么你就需要多加点acker了。
如果可靠性对你来说不是那么重要 — 你不太在意在⼀些失败的情况下损失⼀些数据, 那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统⾥⾯的消息数量减少⼀半, 因为对于每⼀个tuple都要发送⼀个ack消息。并且它需要更少的id来保存下游的tuple, 减少带宽占⽤。
有三种⽅法可以去掉可靠性:
1.第⼀是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下, storm会在spout发射⼀个tuple之后马上调⽤spout的ack⽅法。也就是说这个tuple树不会被跟踪。
2.第⼆个⽅法是在tuple层⾯去掉可靠性。 你可以在发射tuple的时候不指定Messageid来达到不跟踪某个特定的spout tuple的⽬的。
3.最后⼀个⽅法是如果你对于⼀个tuple树⾥⾯的某⼀部分到底成不成功不是很关⼼,那么可以在发射这些tuple的时候unanchor它们。这样这些tuple就不在tuple树⾥⾯, 也就不会被跟踪了。
5.ACK机制源码分析
下⾯我们来分析⼀下Apache Storm 2.0.0-SNAPSHOT的ACK机制源码分析。
5.1 Acker类业务逻辑
Acker Bolt就是通过不断更新和检测跟踪值来判断该消息是否已经被完全成功处理的。RotatingMap主要⽤于消息的超时。AckerBolt不断的接受来⾃各个地⽅发送过来的Tuple,并且根据Tuple的getSourceStreamId 进⾏不同的逻辑操作。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* /licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
止吠项圈* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon;
public class Acker implements IBolt {
哈特曼光阑private static final Logger LOG = Logger(Acker.class);
private static final long serialVersionUID = 4430906880683183091L;
public static final String ACKER_COMPONENT_ID = "__acker";
public static final String ACKER_INIT_STREAM_ID = "__ack_init";
public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
public static final int TIMEOUT_BUCKET_NUM = 3;
private OutputCollector collector;
private RotatingMap<Object, AckObject> pending;
private static class AckObject {
public long val = 0L;
public long startTime = Time.currentTimeMillis();无水氯化镁
public int spoutTask = -1;
public boolean failed = false;
/
/ val xor value
public void updateAck(Long value) {
val = Utils.bitXor(val, value);
val = Utils.bitXor(val, value);
}
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);
}
@Override
public void execute(Tuple input) {
/**
*  Storm ACK 源码分析
*  1.。Acker Bolt就是通过不断更新和检测跟踪值来判断该消息是否已经被完全成功处理的。RotatingMap主要⽤于消息的超时。AckerBolt不断的接受
*  来⾃各个地⽅发送过来的Tuple,并且根据Tuple的getSourceStreamId 进⾏不同的逻辑操作
*/
if (TupleUtils.isTick(input)) {
/**
* 系统预定义流,⽤于消息超时。Acker Bolt会对成员变量pending进⾏旋转操作,然后退出execute⽅法,该操作将pending中最早的⼀个桶中的数据删除掉,            * 消息的超时。由于初始化RotatingM
ap时,未传⼊关于expire的回调⽅法,故该操作只是进⾏简单的删除。如果继续对已经删除掉的消息的Rootld进⾏Ack操作            * <RootId,跟踪值>对,但是由于数据已被删除过的原因,跟踪值基本上不会再回到零,所以Spout将永远也收不到它发送出去的这条消息的Ack。Spout会通过            * 将这条消息标记为处理失败,然后调⽤Spout的失败函数来决定对失败消息进⾏重传还是忽略。这个操作的结果是去除处于僵死状态的消息跟踪。
*/
Map<Object, AckObject> tmp = ate();
LOG.debug("Number of timeout tuples:{}", tmp.size());
return;
}
boolean resetTimeout = false;
String streamId = SourceStreamId();
Object id = Value(0);
AckObject curr = (id);
if (ACKER_INIT_STREAM_ID.equals(streamId)) {
/**
* Acker中AckObject初始化操作 Spout输⼊消息的模式为<RootId,RawAckValue,SpoutTaskId> AckerBolt会根据Rootld取出<RootId, AckValue> ,并进⾏更新操            * spoutTaskId
*/
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.Long(1));
curr.spoutTask = Integer(2);
} else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
/**
* ID:输⼈消息的模式为<RootId,AckValue〉,与原有AckValue进⾏异或操作并存储。
*/
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.Long(1));
} else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
// For the case that ack_fail message arrives before ack_init
/
**
* Acker收到Bolt或者Spout发送过来的Fail消息。输⼊消息的模式为< RootId >。设置failed 为true, 表⽰消息的处理已经失败。
*/
if (curr == null) {
curr = new AckObject();
}
curr.failed = true;
pending.put(id, curr);
} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
/**
* AckerBolt 收到消息超时
* AckerBolt 收到消息超时
*/
resetTimeout = true;
if (curr != null) {
pending.put(id, curr);
} //else if it has not been added yet, there is no reason time it out later on
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, SourceTask());
return;
}
int task = curr.spoutTask;
if (curr != null && task >= 0
&& (curr.val == 0 || curr.failed || resetTimeout)) {
Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
//若此时消息对应的跟踪值已经为零,那么Storm认为该消息以及所有衍⽣的消息都已被成功处理,这时会通过向ACK - STREAM流向Spout节点发送消息,模式            if (curr.val == 0) {
} else if (curr.failed) {
//若此时消息被标记为失败,那么Storm会通过FAIL-STREAM流向Spout发送消息,模式为< RootId>
} else if(resetTimeout) {
标准车当量数
//若此时消息被标记为发送超时,那么Storm通过ACKER_RESET_TIMEOUT_STREAM流将tuple发送给Spout
} else {
throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
}
}
collector.ack(input);
}
@Override
public void cleanup() {
LOG.info("Acker: cleanup successfully");
}
private long getTimeDeltaMillis(long startTimeMillis) {
return Time.currentTimeMillis() - startTimeMillis;
}
}
1.Acker Bolt被初始化,定义⼀个this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM)。RotatingMap旋转Map类
(系统预定义流,⽤于消息超时)。系统预定义流,⽤于消息超时。Acker Bolt会对成员变量pending进⾏旋转操作,然后退出execute⽅
法,该操作将pending中最早的⼀个桶中的数据删除掉,于是实现了。消息的超时。由于初始化RotatingMap时,未传⼊关于expire的回调
⽅法,故该操作只是进⾏简单的删除。如果继续对已经删除掉的消息的Rootld进⾏Ack操作,就会创建新的<RootId,跟踪值>对,但是由于
数据已被删除过的原因,跟踪值基本上不会再回到零,所以Spout将永远也收不到它发送出去的这条消息的Ack。Spout会通过⾃有的超时
机制,将这条消息标记为处理失败,然后调⽤Spout的失败函数来决定对失败消息进⾏重传还是忽略。这个操作的结果是去除处于僵死状态
的消息跟踪。
2. 对进⼊到Acker Bolt中不同流的Tuple进⾏不同的逻辑处理。当进⼊到Acker Bolt的streamID为__ack_init时,对Acker中
AckObject初始化操作。 Spout输⼊消息的模式为<RootId,RawAckValue,SpoutTaskId>, AckerBolt会根据Rootld取出<RootId,
AckValue> ,并进⾏更新操作以及设置。

本文发布于:2024-09-21 21:51:23,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/99165.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   跟踪   处理   发送   失败
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议