博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo源码分析-服务端发布流程-笔记
阅读量:6846 次
发布时间:2019-06-26

本文共 25361 字,大约阅读时间需要 84 分钟。

hot3.png

Spring对外留出的扩展

  • dubbo是基于spring 配置来实现服务的发布的,那么一定是基于spring的扩展来写了一套自己的标签,那么spring是如何解析这些配置呢?具体细节就不在这里讲解,大家之前在学习spring源码的时候,应该有讲过。总的来说,就是可以通过spring的扩展机制来扩展自己的标签。大家在dubbo配置文件中看到的<dubbo:service> ,就是属于自定义扩展标签

要实现自定义扩展,有三个步骤(在spring中定义了两个接口,用来实现扩展)

  1. NamespaceHandler: 注册一堆BeanDefinitionParser,利用他们来进行解析
  2. BeanDefinitionParser:用于解析每个element的内容
  3. Spring默认会加载jar包下的META-INF/spring.handlers文件寻找对应的NamespaceHandler。

以下是Dubbo-config模块下的dubbo-config-spring

11da8fcef00dbe09ff2137e7ab397448f7b.jpg

Dubbo的接入实现

  • Dubbo中spring扩展就是使用spring的自定义类型,所以同样也有NamespaceHandler、BeanDefinitionParser。
  • 而NamespaceHandler是DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport {   static {      Version.checkDuplicate(DubboNamespaceHandler.class);   }   public void init() {        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));    }}
  • BeanDefinitionParser全部都使用了DubboBeanDefinitionParser,如果我们向看<dubbo:service>的配置,就直接看DubboBeanDefinitionParser中
  • 这个里面主要做了一件事,把不同的配置分别转化成spring容器中的bean对象
    • application对应ApplicationConfig
    • registry对应RegistryConfig
    • monitor对应MonitorConfig
    • provider对应ProviderConfig
    • consumer对应ConsumerConfig

为了在spring启动的时候,也相应的启动provider发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean

ServiceBeanReferenceBean

  • 分别继承了ServiceConfig和ReferenceConfig
  • 同时还分别实现了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware
  • InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。

DisposableBean bean被销毁的时候,spring容器会自动执行destory方法,比如释放资源

ApplicationContextAware 实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来

ApplicationListener  ApplicationEvent事件监听,spring容器启动后会发一个事件通知

BeanNameAware 获得自身初始化时,本身的bean的id属性

那么基本的实现思路可以整理出来了

  1. 利用spring的解析收集xml中的配置信息,然后把这些配置信息存储到serviceConfig中
  2. 调用ServiceConfig的export方法来进行服务的发布和注册

服务的发布过程

  • serviceBean是服务发布的切入点,通过afterPropertiesSet方法,调用export()方法进行发布。
  • export为父类ServiceConfig中的方法,所以跳转到SeviceConfig类中的export方法

delay的使用

bae0f9a073d747f31564397807de7471cd6.jpg

我们发现,delay的作用就是延迟暴露,而延迟的方式也很直截了当,Thread.sleep(delay)

  1. export是synchronized修饰的方法。也就是说暴露的过程是原子操作,正常情况下不会出现锁竞争的问题,毕竟初始化过程大多数情况下都是单一线程操作,这里联想到了spring的初始化流程,也进行了加锁操作,这里也给我们平时设计一个不错的启示:初始化流程的性能调优优先级应该放的比较低,但是安全的优先级应该放的比较高!
  2. 继续看doExport()方法。同样是一堆初始化代码

export的过程

继续看doExport(),最终会调用到doExportUrls()中:

3d8ec5115b2597323cb19c3c5e5d103ddad.jpg

  • 这个protocols长这个样子<dubbo:protocol name="dubbo" port="20888" id="dubbo" /> protocols也是根据配置装配出来的。
  • 接下来让我们进入doExportUrlsFor1Protocol方法看看dubbo具体是怎么样将服务暴露出去的

最终实现逻辑

if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){    if (logger.isInfoEnabled()) {        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);    }    if (registryURLs != null && registryURLs.size() > 0            && url.getParameter("register", true)) {        for (URL registryURL : registryURLs) {            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));            URL monitorUrl = loadMonitor(registryURL);            if (monitorUrl != null) {                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());            }            if (logger.isInfoEnabled()) {                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);            }            //通过proxyFactory来获取Invoker对象            Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //注册服务 Exporter
exporter = protocol.export(invoker); //将exporter添加到list中 exporters.add(exporter); } } else { Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter
exporter = protocol.export(invoker); exporters.add(exporter); }}

看到这里就比较明白dubbo的工作原理了doExportUrlsFor1Protocol方法,先创建两个URL,分别如下

  • dubbo://192.168.xx.63:20888/com.gupaoedu.IGHello;
  • registry://192.168.xx ;

是不是觉得这个URL很眼熟,没错在注册中心看到的services的providers信息就是这个

在上面这段代码中可以看到Dubbo的比较核心的抽象:Invoker, Invoker是一个代理类,从ProxyFactory中生成。

这个地方可以做一个小结:

  1. Invoker - 执行具体的远程调用(这块后续单独讲
  2. Protocol – 服务地址的发布和订阅
  3. Exporter – 暴露服务或取消暴露

protocol.export(invoker)

  • protocol这个地方,其实并不是直接调用DubboProtocol协议的export, 大家跟我看看protocol这个属性是在哪里实例化的?以及实例化的代码是什么?
private static final Protocol protocol = ExtensionLoader.        getExtensionLoader(Protocol.class).        getAdaptiveExtension(); //Protocol$Adaptive
  • 实际上这个Protocol得到的应该是一个Protocol$Adaptive。一个自适应的适配器。
  • 这个时候,通过protocol.export(invoker),实际上调用的应该是Protocol$Adaptive这个动态类的export方法。我们看看这段代码
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {    public void destroy() {        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");    }    public int getDefaultPort() {        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");    }    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {        if (arg1 == null) throw new IllegalArgumentException("url == null");        com.alibaba.dubbo.common.URL url = arg1;        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.refer(arg0, arg1);    }    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");        com.alibaba.dubbo.common.URL url = arg0.getUrl();        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }}

上面这段代码做两个事情

  1. 从url中获得protocol的协议地址,如果protocol为空,表示dubbo协议发布服务,否则根据配置的协议类型来发布服务。
  2. 调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);

  • 这段代码做了什么事情呢?前面这段代码我们已经理解了,通过工厂模式获得一个ExtensionLoader实例,我们来分析下下getExtension这个方法。

getExtension

这个方法的主要作用是用来获取ExtensionLoader实例代表的扩展的指定实现。已扩展实现的名字作为参数,结合前面学习getAdaptiveExtension的代码.

@SuppressWarnings("unchecked")public T getExtension(String name) {   if (name == null || name.length() == 0)       throw new IllegalArgumentException("Extension name == null");   if ("true".equals(name)) {       return getDefaultExtension();   }//判断是否已经缓存过该扩展点   Holder holder = cachedInstances.get(name);   if (holder == null) {       cachedInstances.putIfAbsent(name, new Holder());       holder = cachedInstances.get(name);   }   Object instance = holder.get();   if (instance == null) {       synchronized (holder) {            instance = holder.get();            if (instance == null) {//createExtension ,创建扩展点                instance = createExtension(name);                holder.set(instance);            }        }   }   return (T) instance;}

createExtension

这个方法主要做4个事情

  1. 根据name获取对应的class
  2. 根据class创建一个实例
  3. 对获取的实例进行依赖注入
  4. 对实例进行包装,分别调用带Protocol参数的构造函数创建实例,然后进行依赖注入。
    1. 在dubbo-rpc-api的resources路径下,找到com.alibaba.dubbo.rcp.Protocol文件中有存在filter/listener
    2. 遍历cachedWrapperClass对DubboProtocol 进行包装,会通过ProtocolFilterWrapper、ProtocolListenerWrapper包装
@SuppressWarnings("unchecked")private T createExtension(String name) {      Class
clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance);//对获取的的和实例进行依赖注入 Set
> wrapperClasses = cachedWrapperClasses;//cachedWrapperClasses是在loadFile中进行赋值的 if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class
wrapperClass : wrapperClasses) {// 对实例进行包装,分别调用带Protocol参数的构造函数创建实例,然后进行依赖注入。 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); }}

getExtensionClasses 

这个方法之前在讲自适应扩展点的时候讲过了,其实就是加载扩展点实现类了。然后调用loadExtensionClasses,去对应文件下去加载指定的扩展点:

private Map
> getExtensionClasses() { Map
> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes;}

总结

  • ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
url = registry://192.168.48.133:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.6.0&export=dubbo%3A%2F%2F172.31.225.23%3A20880%2Fcom.gupaoedu.dubbo.IGpHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26bind.ip%3D172.31.225.23%26bind.port%3D20880%26default.service.filter%3DtraceFilter%26dubbo%3D2.6.0%26generic%3Dfalse%26interface%3Dcom.gupaoedu.dubbo.IGpHello%26methods%3DsayHello%26owner%3Dmic%26pid%3D98424%26side%3Dprovider%26timestamp%3D1543485230853&owner=mic&pid=98424&registry=zookeeper&timestamp=1543485230834
  • 这段代码中,ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
  • 当extName为registry的时候,我们不需要再次去阅读这块代码了,
  • 直接可以在扩展点中找到相应的实现扩展点
    • [/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol]
  • 配置如下
    • registry=com.alibaba.dubbo.registry.integration.RegistryProtocol

所以,我们可以定位到RegistryProtocol好这个类中的export方法

public 
Exporter
export(final Invoker
originInvoker) throws RpcException { //export invoker , 本地发布服务(启动netty) final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter
() { public Invoker
getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } };}

doLocalExport

  • 本地先启动监听服务
private 
ExporterChangeableWrapper
doLocalExport(final Invoker
originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper
exporter = (ExporterChangeableWrapper
) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper
) bounds.get(key); if (exporter == null) { final Invoker
invokerDelegete = new InvokerDelegete
(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper
((Exporter
)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper
) exporter;}

上面代码中,protocol代码是怎么赋值的呢?我们看看代码,熟悉吗?是一个依赖注入的扩展点。不熟悉的话,我们再回想一下,在加载扩展点的时候,

  • 有一个injectExtension方法,针对已经加载的扩展点中的扩展点属性进行依赖注入。(牛逼的代码)
private Protocol protocol;public void setProtocol(Protocol protocol) {    this.protocol = protocol;}
  • 因此我们知道protocol是一个自适应扩展点,Protocol$Adaptive,然后调用这个自适应扩展点中的export方法,这个时候传入的协议地址应该是:
    • dubbo://127.0.0.1/xxxx… 因此在Protocol$Adaptive.export方法中,ExtensionLoader.getExtension(Protocol.class).getExtension。应该就是基于DubboProtocol协议去发布服务了吗?如果是这样,那你们太单纯了。

      (获得的DubboProtocol,在获取过程中createExtension 中就已经对其进行了依赖注入和装饰器一层一层外套)

      这里并不是获得一个单纯的DubboProtocol扩展点,而是会通过Wrapper对Protocol进行装饰,装饰器分别为: ProtocolFilterWrapper/ ProtocolListenerWrapper; 至于MockProtocol为什么不在装饰器里面呢?大家再回想一下我们在看ExtensionLoader.loadFile这段代码的时候,有一个判断,装饰器必须要具备一个带有Protocol的构造方法,如下

public ProtocolFilterWrapper(Protocol protocol){    if (protocol == null) {        throw new IllegalArgumentException("protocol == null");    }    this.protocol = protocol;}
  • 截止到这里,我们已经知道,Protocol$Adaptive里面的export方法,会调用ProtocolFilterWrapper以及ProtocolListenerWrapper类的方法

这两个装饰器是用来干嘛的呢?我们来分析下

ProtocolFilterWrapper

这个类非常重要,dubbo机制里面日志记录、超时等等功能都是在这一部分实现的

这个类有3个特点,

  • 第一,它有一个参数为Protocol protocol的构造函数;
  • 第二,它实现了Protocol接口;
  • 第三,它使用责任链模式,对export和refer函数进行了封装;部分代码如下
public 
Exporter
export(Invoker
invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}public void destroy() { protocol.destroy();}//buildInvokerChain函数:它读取所有的filter类,利用这些类封装invokerprivate static
Invoker
buildInvokerChain(final Invoker
invoker, String key, String group) { Invoker
last = invoker; List
filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);//自动激活扩展点,根据条件获取当前扩展可自动激活的实现 if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker
next = last; last = new Invoker
() { public Class
getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last;}

我们看如下文件: /dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter

其实就是对Invoker,通过如下的Filter组装成一个责任链:

echo=com.alibaba.dubbo.rpc.filter.EchoFiltergeneric=com.alibaba.dubbo.rpc.filter.GenericFiltergenericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFiltertoken=com.alibaba.dubbo.rpc.filter.TokenFilteraccesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilteractivelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilterclassloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFiltercontext=com.alibaba.dubbo.rpc.filter.ContextFilterconsumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilterexception=com.alibaba.dubbo.rpc.filter.ExceptionFilterexecutelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFiltercompatible=com.alibaba.dubbo.rpc.filter.CompatibleFiltertimeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
  • 这其中涉及到很多功能,包括权限验证、异常、超时等等,当然可以预计计算调用时间等等应该也是在这其中的某个类实现的;
  • 这里我们可以看到export和refer过程都会被filter过滤
  • (在getActivateExtension 方法里面,根据 传入的key ,group 来进行匹配,返回需要激活的类 list)

ProtocolListenerWrapper

在这里我们可以看到export和refer分别对应了不同的Wrapper;export是对应的ListenerExporterWrapper。

这块暂时先不去分析,因为这个地方并没有提供实现类。

public 
Exporter
export(Invoker
invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper
(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}public
Invoker
refer(Class
type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper
(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));}

DubboProtocol.export

通过上面的代码分析完以后,最终我们能够定位到DubboProtocol.export方法。我们看一下dubboProtocol的export方法:openServer(url)

export

public 
Exporter
export(Invoker
invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter
exporter = new DubboExporter
(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //暴露服务 openServer(url); return exporter;}

openServer

  • 开启服务
private void openServer(URL url) {    // find server.    String key = url.getAddress();//192.168.11.156:20880    //client 也可以暴露一个只有server可以调用的服务。    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);    if (isServer) {       ExchangeServer server = serverMap.get(key);       if (server == null) {//没有的话就是创建服务          serverMap.put(key, createServer(url));       } else {          //server支持reset,配合override功能使用          server.reset(url);       }    }}

createServer

  • 创建服务,开启心跳检测,默认使用netty。组装url
private ExchangeServer createServer(URL url) {    //默认开启server关闭时发送readonly事件    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());    //默认开启heartbeat    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))        throw new RpcException("Unsupported server type: " + str + ", url: " + url);    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);    ExchangeServer server;    try {        server = Exchangers.bind(url, requestHandler);    } catch (RemotingException e) {        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);    }    str = url.getParameter(Constants.CLIENT_KEY);    if (str != null && str.length() > 0) {        Set
supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server;}

Exchangers.bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handler == null) {        throw new IllegalArgumentException("handler == null");    }    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).bind(url, handler);}

getExchanger

  • 通过ExtensionLoader获得指定的扩展点,type默认为header
public static Exchanger getExchanger(URL url) {  //url中获得exchanger, 默认为header    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);    return getExchanger(type);}public static Exchanger getExchanger(String type) {    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}

HeaderExchanger.bind

  • 调用headerExchanger的bind方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}

Transporters.bind

通过transporter.bind来进行绑定。

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handlers == null || handlers.length == 0) {        throw new IllegalArgumentException("handlers == null");    }    ChannelHandler handler;    if (handlers.length == 1) {        handler = handlers[0];    } else {        handler = new ChannelHandlerDispatcher(handlers);    }    return getTransporter().bind(url, handler);}

NettyTransport.bind

通过NettyTranport创建基于Netty的server服务

public Server bind(URL url, ChannelHandler listener) throws RemotingException {    return new NettyServer(url, listener);}

new HeaderExchangeServer

在调用HeaderExchanger.bind方法的时候,是先new一个HeaderExchangeServer. 这个server是干嘛呢? 是对当前这个连接去建立心跳机制

public class HeaderExchangeServer implements ExchangeServer {  private final ScheduledExecutorService scheduled = Executors.   newScheduledThreadPool(1,new NamedThreadFactory(   "dubbo-remoting-server-heartbeat", true));  // 心跳定时器  private ScheduledFuture
heatbeatTimer; // 心跳超时,毫秒。缺省0,不会执行心跳。 private int heartbeat; private int heartbeatTimeout; private final Server server; private volatile boolean closed = false; public HeaderExchangeServer(Server server) { //..属性赋值 //心跳 startHeatbeatTimer(); } private void startHeatbeatTimer() { //关闭心跳定时 stopHeartbeatTimer(); if (heartbeat > 0) { //每隔heartbeat时间执行一次 heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { //获取channels public Collection
getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels() ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat,TimeUnit.MILLISECONDS); } } //关闭心跳定时 private void stopHeartbeatTimer() { try { ScheduledFuture
timer = heatbeatTimer; if (timer != null && ! timer.isCancelled()) { timer.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heatbeatTimer =null; } }

心跳线程HeartBeatTask

  • 在超时时间之内,发送数据
  • 在超时时间在外,是客户端的话,重连;是服务端,那么关闭

服务发布总结

直接从官方网站上扒了一个图过来,,好这个图显示的很清楚了。

04942d3f86aea85750ed0db0f1b88f1e425.jpg

转载于:https://my.oschina.net/u/3847203/blog/2966843

你可能感兴趣的文章
JAVA帮助文档全系列 JDK1.5 JDK1.6 JDK1.7 官方中英完整版下载
查看>>
android 1.6中LinearLayout getBaseline函数的一个bug
查看>>
shell3
查看>>
分享几个好用的工具,有效提升工作效率
查看>>
论北京北漂的家人们
查看>>
delphi 检查用户输入必须是汉字串
查看>>
思科交换机和路由器设备实现DHCP功能
查看>>
MongoDB安装与操作大全
查看>>
人我的是好有是的好sula
查看>>
编译工程时报java:[1,0] illegal character: \65279问题排查与解决过
查看>>
RHEL6子接口及双网卡绑定配置
查看>>
常见系统故障排查
查看>>
正则验证手机号是否合法
查看>>
《Git权威指南》读书笔记 第四章 git初始化
查看>>
《Head first HTML与CSS 第二版》读书笔记 第九章 盒模型
查看>>
《Python面向对象……》之目录
查看>>
集群入门简析及LB下LVS详解
查看>>
Linux与GPT
查看>>
管理或技术
查看>>
分配到弱属性;对象将在赋值之后释放
查看>>