LeaderFollower多线程网络模型介绍

LeaderFollower多线程⽹络模型介绍之前分享过《轻量级web server Tornado代码分析》,介绍了⽬前我们采⽤nginx + tornado的⽅式搭建升级、配管、数据中⼼等各类服务组建客户端迭代体系。最近注意到,淘宝⽬前公开了其⽹络服务器源代码Tengine。根据官⽅介绍,Tengine 是由淘宝⽹发起的Web服务器项⽬。它在Nginx的基础上,针对⼤访问量⽹站的需求,添加了很多⾼级功能和特性。Tengine 的性能和稳定性已经在⼤型的⽹站如淘宝⽹,天猫商城等得到了很好的检验。它的最终⽬标是打造⼀个⾼效、稳定、安全、易⽤的Web平台。它们都采⽤了多线程⾮阻塞模式,并使⽤了LF模型。我最近整理了⼀下LF的相关资料,和⼤家分享⼀下。对淘宝开源的Tengine有兴趣的同学可以到这⾥checkout代码研究:
1、 引⾔
⼤家知道,多线程⽹络服务最简单的⽅式就是⼀个连接⼀个线程,这种模型当客户端连接数快速增长是就会出现性能瓶颈。当然,这时候,我们理所当然会考虑使⽤线程池,⽽任何池的使⽤,都会带来⼀个管理和切换的问题。 在java 1.4中引⼊了NIO编程模型,它采⽤了Reactor模式,或者说观察者模式,由于它的读写操作都是⽆阻塞的,使得我们能够只⽤⼀个线程处理所有的IO事件,这种处理⽅式是同步的。为了提⾼性能,当⼀个线程收到事件后,会考虑启动⼀个新的线程去处理,⽽⾃⼰继续等待下⼀个请求。这⾥可能会有性能问题,就是把⼯作交给别⼀个线程的时候的上下⽂切换,包括数据拷贝。今天向⼤家介绍⼀种Leader-Follower模型。
2、 基本思想
所有线程会有三种⾝份中的⼀种:leader和follower,以及⼀个⼲活中的状态:proccesser。它的基本原则就是,永远最多只有⼀个leader。⽽所有follower都在等待成为leader。线程池启动时会⾃动产⽣⼀个Leader负责等待⽹络IO事件,当有⼀个事件产⽣时,Leader线程⾸先通知⼀个Follower线程将其提拔为新的Leader,然后⾃⼰就去⼲活了,去处理这个⽹络事件,处理完毕后加⼊Follower线程等待队列,等待下次成为Leader。这种⽅法可以增强CPU⾼速缓存相似性,及消除动态内存分配和线程间的数据交换。
3、 原理分析
显然地,通过预先分配⼀个线程池,Leader/Follower设计避免了动态线程创建和销毁的额外开销。将线程放在⼀个⾃组织的池中,⽽且⽆需交换数据,这种⽅式将上下⽂切换、同步、数据移动和动态内存管理的开销都降到了最低。
不过,这种模式在处理短暂的、原⼦的、反复的和基于事件的动作上可以取得明显的性能提升,⽐如接收和分发⽹络事件或者向数据库存储⼤量数据记录。事件处理程序所提供的服务越多,其体积也就越⼤,⽽处理⼀个请求所需的时间越长,池中的线程占⽤的资源也就越多,同时也需要更多的线程。相应的,应⽤程序中其它功能可⽤的资源也就越少,从⽽影响到应⽤程序的总体性能、吞吐量、可扩展性和可⽤性。
在⼤多数LEADER/FOLLOWERS设计中共享的事件源封装在⼀个分配器组件中。如果在⼀个设计中联合使⽤了
LEADER/FOLLOWERS和REACTOR事件处理基础设施,由reactor组件进⾏分发。封装事件源将事件分离和分派机制与事件处理程序隔离开来。每个线程有两个⽅法:⼀个是join⽅法,使⽤这个⽅法可以把新初始化的线程加⼊到池中。新加⼊的线程将⾃⼰的执⾏挂起到线程池监听者条件(monitor condition)上,并开始等待被提升为新的Leader。在它变成⼀个Leader之后,它便可以访问共享的事件源,等待执⾏下⼀个到来的事件。另⼀个是promote_new_leader⽅法,当前的Leader线程使⽤这个⽅法可以提升新的Leader,其做法是通过线程池监听者条件通知休眠的Follower。收到通知的Follower继续执⾏(resume)线程池的join⽅法,访问共享事件源,并等待下⼀个事件的到来。
4、 代码演⽰
⾸先⽤⼀段简单的代码演⽰⼀下整个⾓⾊转换的过程。由于同⼀时刻只有⼀个leader,⽤⼀个互斥量就可以解决了。每个线程⼀直在做如下4个步骤循环:
public class WorkThread
{
真空装public static Mutex mutex = new Mutex();
public void start()
{
while (true)
{
// 等待成为leader
无油烟锅// 等待成为leader
waitToLeader();
飞星晒图机// ⽤select或epoll等⽅式等待消息处理
simulateReactor();
// 产⽣下⼀个leader
promoteNewLeader();
// 处理消息
simulateDojob();
}
}
private void simulateDojob()
{
}
private void promoteNewLeader()
{
Console.WriteLine(Thread.CurrentThread.Name + ": Release leadership to others..");
mutex.ReleaseMutex();
}
private void simulateReactor()
{
}
private void waitToLeader()
{
Console.WriteLine(Thread.CurrentThread.Name + ": Waiting to be Leader..");
mutex.WaitOne();
}
}
详细的代码可以参见附件。
5、 代码分析
接下来我们来看⼀下⼀个典型的开源代码实现:spserver。抄段官⽹的话,spserver 是⼀个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架,能够简化 TCP server 的开发⼯作。spserver 使⽤c++ 实现,⽬前实现了以下功能:
Ø  封装了 TCP server 中接受连接的功能
Ø  使⽤⾮阻塞型I/O和事件驱动模型,由主线程负责处理所有 TCP 连接上的数据读取和发送,因此连接数不受线程数的限制
Ø  主线程读取到的数据放⼊队列,由⼀个线程池处理实际的业务
Ø  ⼀个 http 服务器框架,即嵌⼊式 web 服务器
Spserver的每个版本都有⼀定的修改。早先版本V0.5还没有引⼊Leader/Follower模式,在V0.8版本中已经有了
sp_lfserver。在V0.9版本中将其改为了sp_iocplfserver,引⼊了iocp完成端⼝的名字,但事实上之前版本已经使⽤了完成端⼝的技术。简单地说,iocp就是事件io操作由操作系统完成,完成后才由线程接收处理事件。先看⼀下代码,server启动以后开始监听,并将线程池启动起来。线程⼊⼝函数lfHandler⼀直在循环执⾏handleOneEvent:
int SP_LFServer :: run()
{
int ret = 0;
int listenFD = -1;
ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );
if( 0 == ret ) {塑料破碎机刀片
mThreadPool = new SP_ThreadPool( mMaxThreads );
for( int i = 0; i < mMaxThreads; i++ ) {
mThreadPool->dispatch( lfHandler, this );
}
}
return ret;
}
void SP_LFServer :: lfHandler( void * arg )
{
SP_LFServer * server = (SP_LFServer*)arg;
for( ; 0 == server->mIsShutdown; ) {
server->handleOneEvent();
}
}
接下来看⼀下handleOneEvent的处理,和上⾯的演⽰程序⼀样,先mutexlock争取leader权,然后去等待读、写事件,最后释放leadership给其它⼈,⾃⼰执⾏读完成事件处理函数task->run()或写事件的完成端⼝事件completionMessage,这个completionMessage会做⼀些清理⼯作,例如delete msg:
void SP_LFServer :: handleOneEvent()
{
SP_Task * task = NULL;
SP_Message * msg = NULL;
pthread_mutex_lock( &mMutex );
for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
} else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
}
if( NULL == task && NULL == msg ) {
event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
}
}
pthread_mutex_unlock( &mMutex );
if( NULL != task ) task->run();
if( NULL != msg ) mCompletionHandler->completionMessage( msg );
}
6、 框架使⽤
和之前介绍的框架⼀样,采⽤spserver构建server⾮常快捷,如下,只要把SP_TestHandler⾥的⼏个处理事件实现即可。
class SP_TestHandler : public SP_Handler {
public:
SP_ TestHandler (){}
virtual ~SP_ TestHandler (){}
virtual int start( SP_Request * request, SP_Response * response ) {}
virtual int handle( SP_Request * request, SP_Response * response ) {}
virtual void error( SP_Response * response ) {}
virtual void timeout( SP_Response * response ) {}
virtual void close() {}
};
class SP_TestHandlerFactory : public SP_HandlerFactory {
public:
SP_ TestHandlerFactory () {}
virtual ~SP_ TestHandlerFactory () {}
virtual SP_Handler * create() const {
return new SP_TestHandler();
}
};
int main( int argc, char * argv[] )
手机盒
{
int port = 3333, maxThreads = 4, maxConnections = 20000;
int timeout = 120, reqQueueSize = 10000;
const char * serverType = "lf";
SP_IocpLFServer server( "", port, new SP_TestHandlerFactory() );
server.setTimeout( timeout );
server.setMaxThreads( maxThreads );
网络分配器server.setMaxConnections( maxConnections );
server.runForever();
return 0;
}
Spserver的代码可以在这⾥看到:齐名的⽹络编程模型:HAHS,翻译为半异步半同步模型。本⽂暂不作介绍。

本文发布于:2024-09-22 11:19:25,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/337089.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   事件   数据
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议