挽回爱情时,你还在用这些【低效策略】吗?

小说:挽回爱情时,你还在用这些【低效策略】吗?作者:平开更新时间:2019-05-22字数:66586

前面讲过Dubbo SPI拓展机制,通过ExtensionLoader实现可插拔加载拓展,本节将接着分析Dubbo的服务发布过程。

以源码中dubbo-demo模块作为切入口一步步走进Dubbo源码。在 dubbo-demo-provider模块下配置文件 dubbo-demo-provider.xml中定义了服务提供方、注册中心、协议及端口、服务接口等信息,如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="demo-provider"/>

    <!-- 使用multicast广播注册中心暴露服务地址 -->
    <!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
    <dubbo:registry address="zookeeper://192.168.1.197:2181"/>

    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20880"/>

    <!-- 和本地bean一样实现服务 -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- 声明需要暴露的服务接口 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

</beans>

本人搭建了一个zookeeper服务器,将注册中心由multicast更改为zookeeper。可是,在Spring中并没有定义这些配置节点,配置文件中的内容如何自动加载到内存中、赋值给相应的对象呢?

dubbo-demo-provider模块com.alibaba.dubbo.demo.provider.Provider类中 main 方法作为入口,进行断点跟踪。可发现,Dubbo配置文件加载是基于Spring可拓展Schema自定义实现的。Spring可拓展Schema需要实现如下几步:

  1. 定义配置文件相应Java Bean
  2. 编写XSD文件
  3. 实现BeanDefinitionParser接口和继承NamespaceHandlerSupport抽象类
  4. 编写handlers和schemas文件,串联各部分

关于Spring可拓展Schema机制,请自行Google了解。

dubbo-config子模块 dubbo-config-spring中定义了 spring.handlersspring.schemasdubbo.xsdDubboBeanDefinitionParserDubboNamespaceHandler文件,配置的Java Bean是在 dubbo-config-api模块中定义的。spring.handlers配置定义了 Dubbo名空间的处理类,spring.schemas 定义了XSD文件的位置,DubboBeanDefinitionParser实现了BeanDefinitionParser接口,DubboNamespaceHandler继承了NamespaceHandlerSupport抽象类。

spring.handlers文件内容如下:

http://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

spring.schemas 文件内容如下:

http://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

通过运用Spring Schema机制,实现对自定义配置文件参数注入后,会继续执行InitializingBean接口的afterPropertiesSet() throws Exception方法。实例对象初始化完成后会执行事件监听器ApplicationListener接口的void onApplicationEvent(E event)方法。在Dubbo中,ServiceBean类实现了ApplicationListener接口方法。ServiceBean类如下所示:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
    ......
    public ServiceBean() {
        super();
        this.service = null;
    }

    public ServiceBean(Service service) {
        super(service);
        this.service = service;
    }
    ......

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
    }

    ......

    @SuppressWarnings({"unchecked", "deprecation"})
    public void afterPropertiesSet() throws Exception {
        
        ......

        if (!isDelay()) {
            export();
        }
    }
}

实际源码篇幅过长,这里只截取关键的核心部分。通过上述源码片段,可以看到ServiceBean实现了ApplicationListener接口、InitializingBean接口,同时也继承了ServiceConfig类。在构造器方法中均调用了父类ServiceConfig的构造方法。

在加载完自定义配置文件属性后,会执行afterPropertiesSet方法,根据配置文件中delay属性判断是否立即执行export方法。delay属性是用于标识是否延迟暴露服务,Dubbo中默认延迟暴露。若服务延迟暴露,则继续初始化示例对象,待对象初始化完成后,执行onApplicationEvent方法,此时会执行export()

export()是在父类ServiceConfig中定义的,是一个同步方法。进入export()才是真正的开启服务发布之旅。export()源码如下:

    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

export()方法通过关键字synchronized实现同步处理。并且实现了立即发布和延迟发布,并通过定时器来实现延迟发布,延迟发布时间单位是分钟。

具体的服务发布过程交给doExport()处理,doExport()也是一个同步方法,通过synchronized关键字实现。追踪源码,发现doExport()仅仅是做了服务发布的前期准备工作,实际的发布工作交给doExportUrls()方法来完成。doExportUrls()方法源码如下:

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

源码中,通过loadRegistries(boolean provider)方法将所有服务URL封装为List,得到服务提供者集合registryURLs。然后,通过for方法遍历,调用doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)方法来实现对服务的发布。doExportUrlsFor1Protocol方法源码如下:

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);

        ......

        if (ProtocolUtils.isGeneric(generic)) {
            map.put("generic", generic);
            map.put("methods", Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put("token", UUID.randomUUID().toString());
            } else {
                map.put("token", token);
            }
        }
        if ("injvm".equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // 导出服务
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

        //获取注册监听地址和端口
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);

        //根据之前收集的map数据和地址端口,组装URL
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置为none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    //生成代理对象,invoker可看作服务的代理或封装
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);    //此时加载的protocol为DubboProtocol对象
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

方法大体流程如下:

  1. map装配参数
  2. 利用map中的参数构建URL,为暴露服务做准备。此URL为Dubbo自定义类型,是final类,实现了Serializable接口
  3. 根据范围选择是暴露本地服务,还是暴露远程服务
  4. 根据代理工厂生成服务代理invoker
  5. 根据配置的协议,暴露服务代理

值得注意的是:在判断是否暴露本地服务和远程服务时,有个简单的逻辑值得学习,用得很漂亮,使得代码变得很简洁。提取核心部分简化如下:

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                ......
                //exportRemote(url)
                ......
            }

默认情况下,当scope为null的时候,会同时暴露本地服务和远程服务。这个小巧的逻辑技巧值得学习!

经过以上分析:可以大体了解Dubbo RPC服务的发布过程,但是在整个流程中具体是如何产生服务代理的呢?请听下回分解:Dubbo RPC服务的发布之服务代理

---

如果对您有帮助,不妨点个赞、关注一波 :)

当前文章:http://scycxh.com/8n0zszfnuu.html

发布时间:2019-05-22 10:16:26

怎样看待校园暴力事件? 大过年的双方家长互相挑剔做儿女的夹在中间za办? 修持戒定慧来开悟明智和去除烦恼 罗李华:狮子座2016年运势 何谓教养? 韩国春季多条经典线路游 老公看到我不再冲动怎么办? 孩子入园不适怎么办? 【情感问答】借男友的钱要还吗 24个细节深度解读《美人鱼》

17952 60820 97002 74677 24451 90849 95288 71409 54404 88721 14619 12476 91968 13400 44608 26139 56828 55662 85944 70990 93557 57715 30286

我要说两句: (0人参与)

发布