RocketMQ源码系列(二):RocketMQ路由中心NameServer

RocketMQ源码系列(⼆):RocketMQ路由中⼼NameServer
这篇⽂章主要讲解RocketMQ路由管理、服务注册及服务发现机制。
1、NameServer架构设计
消息中间件的设计思路⼀般是基于主题的订阅发布机制,消息⽣产者(Producer)发送某⼀主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者(Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送给消费者(push模式)或者消息消费者主动向消息服务器拉取消息(pull模式),从⽽实现消息⽣产者与消息消费者的解耦。为了避免因消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息⽣产者如何知道消息要发往哪台消息服务器呢?如
果某⼀台消息服务器宕机了,⽣产者如何在不重启服务的情况下感知呢?
为了解决上述问题,NameServer设计成⽀持集模式,路由管理、服务注册、服务发现架构,如下图:
Broker消息服务器在启动时向所有NameServer注册,消息⽣产者 在发送消息之前先从NameServer获取Broker服务器的地址列表,然后根据负载算法从列表中选择⼀台消息服务器发送消息。NameServer与 每台Broker服务器保持长连接,并间隔10s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息⽣产者。这样设计是为了降低 NameServer实现的复杂性,因此需要在消息发送端提供容错机制来保证消息发送的⾼可⽤性。 NameServer本⾝的⾼可⽤性可通过部署多台NameServer服务器来实现,但彼此之间互不通信。虽然NameServer服务器之间在某⼀时刻 的数据并不会完全相同,但对消息发送不会造成重⼤影响,⽆⾮就是短暂造成消息发送不均衡,这也是RocketMQ NameServer设计的⼀个亮点。
消息客户端与NameServer、Broker的交互:
Broker每隔30s向NameServer集的每⼀台机器发送⼼跳包, 包含⾃⾝创建的topic路由等信息。
消息客户端每隔30s向NameServer更新对应topic的路由信息。
NameServer收到Broker发送的⼼跳包时会记录时间戳。
NameServer每隔10s会扫描⼀次brokerLiveTable(存放⼼跳包的时间戳信息),如果在120s内没有收到⼼跳包,则认为Broker失效,更新topic的路由信息,将失效的Broker信息移除。
2、NameServer启动流程源码分析
namesrv模块下,到NameServer启动类NamesrvStartup.java,重点关注NameServer相关启动参数。
⾸先是解析配置⽂件,需要填充NamesrvConfig、 NettyServerConfig属性值
⽅法流转:main0->createNamesrvController
先创建NamesrvConfig(NameServer业务参 数)、NettyServerConfig(NameServer⽹络参数),然后在解析启动 时把指定的配置⽂件或启动命令中的选项值填充到NamesrvConfig、 NettyServerConfig对象中。参数来源有如下两种⽅式:
-c configFile通过-c命令指定配置⽂件的路径。
使⽤“–属性名 属性值”命令,例如 --listenPort 9876。
部分代码如下:
final NamesrvConfig namesrvConfig =new NamesrvConfig();
final NettyServerConfig nettyServerConfig =new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if(commandLine.hasOption('c')){
String file = OptionValue('c');
if(file !=null){
InputStream in =new BufferedInputStream(new FileInputStream(file));
properties =new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if(commandLine.hasOption('p')){
InternalLogger console = Logger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
}
MixAll.properties2Object(ServerUtilmandLine2Properties(commandLine), namesrvConfig);
chdtv
NamesrvConfig对象中的默认参数:
/**
* RocketMQ主⽬录,通过Drocketmq.home.dir=path或设置环境变量ROCKETMQ_HOME可以配置RocketMQ的主⽬录
*/
private String rocketmqHome = Property(MixAll.ROCKETMQ_HOME_PROPERTY, v(MixAll.ROCKETMQ_HOME_ENV)); /**
* NameServer存储KV配置属性的持久化路径
*/
private String kvConfigPath = Property("user.home")+ File.separator +"namesrv"+ File.separator +"kvConfig.json";
/**
* NameServer默认配置⽂件路径。
* NameServer启动时如果要通过配置⽂件配置NameServer启动属性,请
* 使⽤-c选项
*/
private String configStorePath = Property("user.home")+ File.separator +"namesrv"+ File.separator +"namesrv.properties"; private String productEnvName ="center";
private boolean clusterTest =false;
隧道喷浆/**
* 是否⽀持顺序消息,默认是不⽀持
*/
private boolean orderMessageEnable =false;
NettyServerConfig对象中默认的参数:
/**
* NameServer监听端⼝,该值默认会被初始化为9876
*/
private int listenPort =8888;
/**
* Netty业务线程池线程个数
*/
private int serverWorkerThreads =8;
/**
* Netty public任务线程池
* 线程个数。Netty⽹络会根据业务类型创建不同的线程池,⽐如处理消
* 息发送、消息消费、⼼跳检测等。如果该业务类型(RequestCode)未
* 注册线程池,则由public线程池执⾏
*/
private int serverCallbackExecutorThreads =0;
/**
* I/O线程池线程个数,主要是
* NameServer、Broker端解析请求、返回相应的线程个数。这类线程主
* 要⽤于处理⽹络请求,先解析请求包,然后转发到各个业务线程池完
* 成具体的业务操作,最后将结果返回给调⽤⽅
*/
private int serverSelectorThreads =3;
/
**
* send oneway消息请求的并发度(Broker端参数)
*/
private int serverOnewaySemaphoreValue =256;
/**
* 异步消息发送的最⼤并发度
* (Broker端参数)
*/
private int serverAsyncSemaphoreValue =64;
/**
* ⽹络连接最⼤空闲时
* 间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被
* 关闭
*/
private int serverChannelMaxIdleTimeSeconds =120;
/**
* ⽹络socket发送缓存区⼤⼩,默认为64KB
*/
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
/**
* ⽹络socket接收缓存区⼤⼩,默认为64KB
*/
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
/**
* ByteBuffer是否开启缓存,建议开启
*/
private boolean serverPooledByteBufAllocatorEnable =true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
/**
* 是否启⽤Epoll I/O模型,Linux环境下建议开启
*/
private boolean useEpollNativeSelector =false;
注意:
在启动NameServer时,可以先使⽤./mqnameserver -c configFile -p 命令打印当前加载的配置属性。
根据启动属性创建NamesrvController实例并初始化, NameServerController实例为NameServer核⼼控制器:⽅法流转:main0->createNamesrvController->start->initialize
加载KV配置,先创建NettyServer⽹络处理对象,然后开启两个定时任务,在RocketMQ中此类定时任务统称为⼼跳检测。1)定时任务1:NameServer每隔10s扫描⼀次Broker,移除处于未 激活状态的Broker。
2)定时任务2:NameServer每隔10min打印⼀次KV配置。
public boolean initialize(){
//加载KV配置
this.kvConfigManager.load();
/**
* 创建NettyServer⽹络处理对象
*/
万能倒角机ingServer =new NettyRemotingServer(thistyServerConfig,this.brokerHousekeepingService);
滑水鞋ingExecutor =
/**设备防尘罩
* 定时任务1:NameServer每隔10s扫描⼀次Broker,移除处于未
* 激活状态的Broker
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
uteInfoManager.scanNotActiveBroker();
}
},5,10, TimeUnit.SECONDS);
/**
* 定时任务2::NameServer每隔10min打印⼀次KV配置
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
},1,10, TimeUnit.MINUTES);
if(TlsSystemConfig.tlsMode != TlsMode.DISABLED){
/
/ Register a listener to reload SslContext
try{
fileWatchService =new FileWatchService(
消音降噪
new String[]{
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener(){
boolean certChanged, keyChanged =false;
@Override
public void onChanged(String path){
if(path.equals(TlsSystemConfig.tlsServerTrustCertPath)){
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if(path.equals(TlsSystemConfig.tlsServerCertPath)){
certChanged =true;
}
if(path.equals(TlsSystemConfig.tlsServerKeyPath)){

本文发布于:2024-09-23 01:32:02,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/266282.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   服务器   启动
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议