dubbo服务之底层通讯协议(Protocol)

dubbo服务之底层通讯协议(Protocol)
from /78.htm
我们先来到通讯协议的⼊⼝点吧。通过Protocol接⼝查通讯协议⼊⼝点,我们根据接⼝的export⽅法搜索发现⼊⼝了,在ServiceConfig的doExportUrlsFor1Protocol⽅法,如下图:
然后我们进⼊ port(invoker)⽅法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置⽂件能到如下
registry=com.istry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol  //这个是默认的,我们在Protocol接⼝上可以看到spi的注解filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.i.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.ached.MemcachedProtocol
redis=com.alibaba.dubbo.dis.RedisProtocol
rest=com.alibaba.dubbo.st.RestProtocol
进⼊port(Invoker<T> invoker)⽅法⾥⾯有个 openServer(url);
代码:
private void openServer(URL url) {
// find server.
String key = Address();
//client 也可以暴露⼀个只有server可以调⽤的服务。
boolean isServer = Parameter(Constants.IS_SERVER_KEY,true);
if (isServer) {气泡垫
ExchangeServer server = (key);
if (server == null) {
serverMap.put(key, createServer(url)); //createServer是创建服务
} else {
//server⽀持reset,配合override功能使⽤
}
360历史
}
}
继续进⼊createServer,上源码
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, String());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = Parameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
滚动鼠标throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = Parameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader(Transporter.class).getSupportedExtensions();
if (!ains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进⾏服务端server的创建,同上⾯的server = Exchangers.bind(url, requestHandler) 正式创建服务。
所以基本的创建步骤是
export()  -->  openServer()  -->  createServer()  -->  server = Exchangers.bind(url, requestHandler);
我们进⾏来看 Exchangers.bind(url, requestHandler)
源码:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
然后通过getExchanger(url).bind(url, handler)的bing进⼊ HeaderExchanger类
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
在进⼊Transporters类的bing的
public static Server bind(URL url, handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
无水洗手液
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
通过bing可以知道他讲调⽤:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调⽤NettyTransporter      到这⾥我们基本明⽩dubbo的通讯默认是交给了netty来处理,
我们在看下doOPen⽅法
丝光沸石
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = wCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = wCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));脱墨纸
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Con            bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = Channels();
// /browse/NETTY-365
// /browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", Decoder());//解码
pipeline.addLast("encoder", Encoder());//编码
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
了解netty的同学,肯定早已习惯这个⽅法的写法,就是创建了netty的server嘛,到这⾥dubbo的服务创建完毕了,这个时候控制台见打
印:
[DUBBO] Start NettyServer bind /0.0.0.0:20880, export /192.168.4.241:20880, dubbo version: 2.8.4, current host: 127.0.0.1
在下⾯netty的底层就不在这⾥讲解了本⼈内功不够

本文发布于:2024-09-20 14:38:04,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/108987.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:创建   服务   配置   同学   内功
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议