【1】註解@EnableDubbo
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented @EnableDubboConfig // @EnableDubboConfig註解用來將properties檔案中的設定項轉化為對應的Bean @DubboComponentScan // @DubboComponentScan註解用來掃描服務提供者和參照者(@Service與@Reference) public @interface EnableDubbo { @AliasFor(annotation = DubboComponentScan.class, attribute = "basePackages") String[] scanBasePackages() default {}; @AliasFor(annotation = DubboComponentScan.class, attribute = "basePackageClasses") Class<?>[] scanBasePackageClasses() default {}; @AliasFor(annotation = EnableDubboConfig.class, attribute = "multiple") boolean multipleConfig() default true; }
【2】註解@EnableDubboConfig
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented @Import(DubboConfigConfigurationRegistrar.class) public @interface EnableDubboConfig { boolean multiple() default true; }
1)DubboConfigConfigurationRegistrar類的作用
//因為實現了ImportBeanDefinitionRegistrar介面,spring容器就會範例化該類,並且呼叫其registerBeanDefinitions方法; public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { //執行DubboConfigConfigurationRegistrar; AnnotationAttributes attributes = AnnotationAttributes.fromMap( importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName())); boolean multiple = attributes.getBoolean("multiple"); //預設值是true // Single Config Bindings registerBeans(registry, DubboConfigConfiguration.Single.class); if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193 registerBeans(registry, DubboConfigConfiguration.Multiple.class); } } }
2)registerBeans做了什麼
public static void registerBeans(BeanDefinitionRegistry registry, Class<?>... annotatedClasses) { if (ObjectUtils.isEmpty(annotatedClasses)) { return; } ... AnnotatedBeanDefinitionReader reader = new AnnotatedBeanDefinitionReader(registry); ... // 利用Spring中的AnnotatedBeanDefinitionReader來解析annotatedClasses // 會解析該類上的註解,然後進行處理 reader.register(annotatedClasses); }
3)DubboConfigConfiguration類展示
public class DubboConfigConfiguration { /** * Single Dubbo {@link AbstractConfig Config} Bean Binding */ @EnableDubboConfigBindings({ @EnableDubboConfigBinding(prefix = "dubbo.application", type = ApplicationConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.module", type = ModuleConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.registry", type = RegistryConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.protocol", type = ProtocolConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.monitor", type = MonitorConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.provider", type = ProviderConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.consumer", type = ConsumerConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.config-center", type = ConfigCenterBean.class), @EnableDubboConfigBinding(prefix = "dubbo.metadata-report", type = MetadataReportConfig.class), @EnableDubboConfigBinding(prefix = "dubbo.metrics", type = MetricsConfig.class) }) public static class Single {} /** * Multiple Dubbo {@link AbstractConfig Config} Bean Binding */ @EnableDubboConfigBindings({ @EnableDubboConfigBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.modules", type = ModuleConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.registries", type = RegistryConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.protocols", type = ProtocolConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.monitors", type = MonitorConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.providers", type = ProviderConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.consumers", type = ConsumerConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.config-centers", type = ConfigCenterBean.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.metadata-reports", type = MetadataReportConfig.class, multiple = true), @EnableDubboConfigBinding(prefix = "dubbo.metricses", type = MetricsConfig.class, multiple = true) }) public static class Multiple {} }
4)那麼必然又會解析到@EnableDubboConfigBindings註解
//又是利用了實現了ImportBeanDefinitionRegistrar介面,在範例化該類會呼叫其registerBeanDefinitions方法; public class DubboConfigBindingsRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware { private ConfigurableEnvironment environment; @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { //執行DubboConfigBindingsRegistrar AnnotationAttributes attributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBindings.class.getName())); // 拿到多個@EnableDubboConfigBinding註解 AnnotationAttributes[] annotationAttributes = attributes.getAnnotationArray("value"); DubboConfigBindingRegistrar registrar = new DubboConfigBindingRegistrar(); //將環境變數注入 registrar.setEnvironment(environment); for (AnnotationAttributes element : annotationAttributes) { // 逐個解析@EnableDubboConfigBinding註解,比如@EnableDubboConfigBinding(prefix = "dubbo.application", type = ApplicationConfig.class) registrar.registerBeanDefinitions(element, registry); } } @Override public void setEnvironment(Environment environment) { Assert.isInstanceOf(ConfigurableEnvironment.class, environment); this.environment = (ConfigurableEnvironment) environment; } }
5)registrar.registerBeanDefinitions方法的呼叫情況
public class DubboConfigBindingRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware { private final Log log = LogFactory.getLog(getClass()); private ConfigurableEnvironment environment; @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { //執行DubboConfigBindingRegistrar AnnotationAttributes attributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(EnableDubboConfigBinding.class.getName())); registerBeanDefinitions(attributes, registry); } protected void registerBeanDefinitions(AnnotationAttributes attributes, BeanDefinitionRegistry registry) { // prefix = "dubbo.application" String prefix = environment.resolvePlaceholders(attributes.getString("prefix")); // type = ApplicationConfig.class Class<? extends AbstractConfig> configClass = attributes.getClass("type"); boolean multiple = attributes.getBoolean("multiple"); //針對設定分別進行註冊成Bean物件,方法1 registerDubboConfigBeans(prefix, configClass, multiple, registry); } //方法1,因為Single和Multiple都是走同一套邏輯,採用引數boolean multiple區分 private void registerDubboConfigBeans(String prefix, Class<? extends AbstractConfig> configClass, boolean multiple, BeanDefinitionRegistry registry) { // 從properties檔案中根據字首拿對應的設定項,比如根據dubbo.application字首, // 就可以拿到如下設定: // dubbo.application.name=dubbo-demo-provider-application // dubbo.application.logger=log4j Map<String, Object> properties = getSubProperties(environment.getPropertySources(), prefix); // 如果沒有相關的設定項,則不需要註冊BeanDefinition if (CollectionUtils.isEmpty(properties)) { if (log.isDebugEnabled()) { log.debug(...); } return; } // 根據設定項生成beanNames,為什麼會有多個? // 普通情況一個dubbo.application字首對應一個ApplicationConfig型別的Bean // 特殊情況下(設定兩種協定),比如dubbo.protocols對應了: // dubbo.protocols.p1.name=dubbo // dubbo.protocols.p1.port=20880 // dubbo.protocols.p1.host=0.0.0.0 // dubbo.protocols.p2.name=http // dubbo.protocols.p2.port=8082 // dubbo.protocols.p2.host=0.0.0.0 // 那麼就需要對應兩個ProtocolConfig型別的Bean,那麼就需要兩個beanName:p1和p2 // 這裡就是multiple為true或false的區別,名字的區別,根據multiple用來判斷是否從設定項中獲取beanName // 如果multiple為false,則看有沒有設定id屬性,如果沒有設定則自動生成一個beanName. Set<String> beanNames = multiple ? resolveMultipleBeanNames(properties) : Collections.singleton(resolveSingleBeanName(properties, configClass, registry)); for (String beanName : beanNames) { // 為每個beanName,註冊一個空的BeanDefinition,方法2 registerDubboConfigBean(beanName, configClass, registry); // 為每個bean註冊一個DubboConfigBindingBeanPostProcessor的Bean後置處理器,方法3 //這裡存在的問題就是對應每一種設定都會產生對應的BeanPostProcessor,最多好像也就是10種左右 //但其實一個就可以做的任務,拓展成多個貌似不太合理,結合處理邏輯都是同一套就很尷尬 registerDubboConfigBindingBeanPostProcessor(prefix, beanName, multiple, registry); } // 註冊一個NamePropertyDefaultValueDubboConfigBeanCustomizer的bean registerDubboConfigBeanCustomizers(registry); } //方法2,為對應的設定生成一個beanDefinition,並注入到容器 private void registerDubboConfigBean(String beanName, Class<? extends AbstractConfig> configClass,BeanDefinitionRegistry registry) { BeanDefinitionBuilder builder = rootBeanDefinition(configClass); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); registry.registerBeanDefinition(beanName, beanDefinition); // ApplicatinoConfig物件 if (log.isInfoEnabled()) { log.info("...); //紀錄檔記錄 } } //方法3 private void registerDubboConfigBindingBeanPostProcessor(String prefix, String beanName, boolean multiple,BeanDefinitionRegistry registry) { // 註冊一個DubboConfigBindingBeanPostProcessor的Bean // 每個XxConfig的Bean對應一個DubboConfigBindingBeanPostProcessor的Bean // 比如,一個ApplicationConfig對應一個DubboConfigBindingBeanPostProcessor, // 一個ProtocolConfig也會對應一個DubboConfigBindingBeanPostProcessor // 在構造DubboConfigBindingBeanPostProcessor的時候會指定構造方法的值,這樣就可以區別開來了 Class<?> processorClass = DubboConfigBindingBeanPostProcessor.class; BeanDefinitionBuilder builder = rootBeanDefinition(processorClass); // 真實的字首,比如dubbo.registries.r2 String actualPrefix = multiple ? normalizePrefix(prefix) + beanName : prefix; // 新增兩個構造方法引數值,所以會呼叫DubboConfigBindingBeanPostProcessor的兩個引數的構造方法 builder.addConstructorArgValue(actualPrefix).addConstructorArgValue(beanName); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registerWithGeneratedName(beanDefinition, registry); if (log.isInfoEnabled()) { log.info(...); } } private void registerDubboConfigBeanCustomizers(BeanDefinitionRegistry registry) { registerInfrastructureBean(registry, BEAN_NAME, NamePropertyDefaultValueDubboConfigBeanCustomizer.class); } @Override public void setEnvironment(Environment environment) { Assert.isInstanceOf(ConfigurableEnvironment.class, environment); this.environment = (ConfigurableEnvironment) environment; } private Set<String> resolveMultipleBeanNames(Map<String, Object> properties) { Set<String> beanNames = new LinkedHashSet<String>(); // 比如dubbo.protocols.p1.name=dubbo的propertyName為p1.name for (String propertyName : properties.keySet()) { // propertyName為p1.name int index = propertyName.indexOf("."); if (index > 0) { // 擷取beanName名字為p1 String beanName = propertyName.substring(0, index); beanNames.add(beanName); } } return beanNames; } private String resolveSingleBeanName(Map<String, Object> properties, Class<? extends AbstractConfig> configClass,BeanDefinitionRegistry registry) { // 設定了dubbo.application.id=appl,那麼appl就是beanName String beanName = (String) properties.get("id"); // 如果beanName為null,則會進入if分支,由spring自動生成一個beanName,比如org.apache.dubbo.config.ApplicationConfig#0 if (!StringUtils.hasText(beanName)) { BeanDefinitionBuilder builder = rootBeanDefinition(configClass); beanName = BeanDefinitionReaderUtils.generateBeanName(builder.getRawBeanDefinition(), registry); } return beanName; } }
6)單個DubboConfigBindingBeanPostProcessor的展示(刪減掉部分不怎麼用到的)
public class DubboConfigBindingBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware, InitializingBean, BeanDefinitionRegistryPostProcessor { private final String prefix; private final String beanName; private DubboConfigBinder dubboConfigBinder; .... private List<DubboConfigBeanCustomizer> configBeanCustomizers = Collections.emptyList(); .... @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { // 每個XxConfig對應一個BeanPostProcessor,所以每個DubboConfigBindingBeanPostProcessor只處理對應的beanName if (this.beanName.equals(beanName) && bean instanceof AbstractConfig) { AbstractConfig dubboConfig = (AbstractConfig) bean; // 從properties檔案中獲取值,並設定到dubboConfig物件中 bind(prefix, dubboConfig); // 設定dubboConfig物件的name屬性,設定為beanName customize(beanName, dubboConfig); } return bean; } private void bind(String prefix, AbstractConfig dubboConfig) { dubboConfigBinder.bind(prefix, dubboConfig); if (log.isInfoEnabled()) { log.info(...); } } private void customize(String beanName, AbstractConfig dubboConfig) { for (DubboConfigBeanCustomizer customizer : configBeanCustomizers) { customizer.customize(beanName, dubboConfig); } } ... @Override public void afterPropertiesSet() throws Exception { initDubboConfigBinder(); // 建立DefaultDubboConfigBinder initConfigBeanCustomizers(); } private void initDubboConfigBinder() { if (dubboConfigBinder == null) { try { // 先從Spring容器中獲取DubboConfigBinder,預設獲取不到 dubboConfigBinder = applicationContext.getBean(DubboConfigBinder.class); } catch (BeansException ignored) { if (log.isDebugEnabled()) { log.debug("DubboConfigBinder Bean can't be found in ApplicationContext."); } // Use Default implementation // 生成一個預設的 dubboConfigBinder = createDubboConfigBinder(applicationContext.getEnvironment()); } } dubboConfigBinder.setIgnoreUnknownFields(ignoreUnknownFields); dubboConfigBinder.setIgnoreInvalidFields(ignoreInvalidFields); } private void initConfigBeanCustomizers() { // 得到之前建立了的NamePropertyDefaultValueDubboConfigBeanCustomizer Collection<DubboConfigBeanCustomizer> configBeanCustomizers = beansOfTypeIncludingAncestors(applicationContext, DubboConfigBeanCustomizer.class).values(); this.configBeanCustomizers = new ArrayList<>(configBeanCustomizers); AnnotationAwareOrderComparator.sort(this.configBeanCustomizers); } ... }
【3】註解@DubboComponentScan
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(DubboComponentScanRegistrar.class) public @interface DubboComponentScan { String[] value() default {}; String[] basePackages() default {}; Class<?>[] basePackageClasses() default {}; }
1)匯入的DubboComponentScanRegistrar類做了什麼
/又是利用了實現了ImportBeanDefinitionRegistrar介面,在範例化該類會呼叫其registerBeanDefinitions方法; public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { //執行DubboComponentScanRegistrar // 拿到DubboComponentScan註解所定義的包路徑,掃描該package下的類,識別這些類上 Set<String> packagesToScan = getPackagesToScan(importingClassMetadata); // 註冊ServiceAnnotationBeanPostProcessor一個Bean // 實現了BeanDefinitionRegistryPostProcessor介面,所以在Spring啟動時會呼叫postProcessBeanDefinitionRegistry方法 // 該方法會進行掃描,掃描@Service註解了的類,然後生成BeanDefinition(會生成兩個,一個普通的bean,一個ServiceBean),後續的Spring週期中會生成Bean // 在ServiceBean中會監聽ContextRefreshedEvent事件,一旦Spring啟動完後,就會進行服務匯出 registerServiceAnnotationBeanPostProcessor(packagesToScan, registry); // 註冊ReferenceAnnotationBeanPostProcessor // 實現了AnnotationInjectedBeanPostProcessor介面,繼而實現了InstantiationAwareBeanPostProcessorAdapter介面 // 所以Spring在啟動時,在對屬性進行注入時會呼叫AnnotationInjectedBeanPostProcessor介面中的postProcessPropertyValues方法 // 在這個過程中會按照@Refrence註解的資訊去生成一個RefrenceBean物件 registerReferenceAnnotationBeanPostProcessor(registry); } //核心方法1,註冊一個對@Service註解處理的 BeanDefinitionRegistryPostProcessor private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) { // 生成一個RootBeanDefinition,對應的beanClass為ServiceAnnotationBeanPostProcessor.class BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class); // 將包路徑作為在構造ServiceAnnotationBeanPostProcessor時呼叫構造方法時的傳入引數 builder.addConstructorArgValue(packagesToScan); builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry); } //核心方法2,註冊一個對屬性賦值處理的AnnotationInjectedBeanPostProcessor且帶有ApplicationListener事件監聽功能 private void registerReferenceAnnotationBeanPostProcessor(BeanDefinitionRegistry registry) { // Register @Reference Annotation Bean Processor // 註冊一個ReferenceAnnotationBeanPostProcessor做為bean,ReferenceAnnotationBeanPostProcessor是一個BeanPostProcessor BeanRegistrar.registerInfrastructureBean(registry,ReferenceAnnotationBeanPostProcessor.BEAN_NAME, ReferenceAnnotationBeanPostProcessor.class); } private Set<String> getPackagesToScan(AnnotationMetadata metadata) { AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(DubboComponentScan.class.getName())); String[] basePackages = attributes.getStringArray("basePackages"); Class<?>[] basePackageClasses = attributes.getClassArray("basePackageClasses"); String[] value = attributes.getStringArray("value"); // Appends value array attributes Set<String> packagesToScan = new LinkedHashSet<String>(Arrays.asList(value)); packagesToScan.addAll(Arrays.asList(basePackages)); for (Class<?> basePackageClass : basePackageClasses) { packagesToScan.add(ClassUtils.getPackageName(basePackageClass)); } if (packagesToScan.isEmpty()) { return Collections.singleton(ClassUtils.getPackageName(metadata.getClassName())); } return packagesToScan; } }
【4】掃描@Service註解,並且進行處理
彙總說明:實際上便是通過處理器掃描@Service註解的類,生成兩個Bean【類對應的普通Bean,與Dubbo中要用到的ServiceBean】
其中ServiceBean,是先根據註解上的資訊填充對應的屬性,後採用環境變數中獲取設定的屬性,來完成屬性填充。
public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,ResourceLoaderAware, BeanClassLoaderAware { ... //核心方法1 @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) { // 掃描包,進行Bean註冊,核心方法2呼叫 registerServiceBeans(resolvedPackagesToScan, registry); } else { if (logger.isWarnEnabled()) { logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!"); } } } //核心方法2 private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) { DubboClassPathBeanDefinitionScanner scanner = new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader); BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); scanner.setBeanNameGenerator(beanNameGenerator); // 掃描被Service註解標註的類 scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class)); scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class)); for (String packageToScan : packagesToScan) { // Registers @Service Bean first // 掃描Dubbo自定義的@Service註解 scanner.scan(packageToScan); // 查詢被@Service註解的類的BeanDefinition(無論這個類有沒有被@ComponentScan註解標註了) Set<BeanDefinitionHolder> beanDefinitionHolders = findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); if (!CollectionUtils.isEmpty(beanDefinitionHolders)) { // 掃描到BeanDefinition開始處理它,核心方法3的呼叫 for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { registerServiceBean(beanDefinitionHolder, registry, scanner); } if (logger.isInfoEnabled()) { logger.info(b...); } } else { if (logger.isWarnEnabled()) { logger.warn(...); } } } } //核心方法3 private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry, DubboClassPathBeanDefinitionScanner scanner) { // 處理掃描到的每一個BeanDefinition // 1. 得到@Service註解上所設定的引數 // 2. 根據每一個BeanDefinition會再額外的生成一個ServiceBean // 3. 對於每一個被@Service註解的類(服務的實現類),會生成兩個Bean,一個服務實現類對應的Bean(普通Bean,和@Component一樣),一個ServiceBean(Dubbo中要用到的Bean,因為在ServiceBean中包括了很的Config) // 具體的服務實現類 Class<?> beanClass = resolveClass(beanDefinitionHolder); // @Service可以對服務進行各種設定 Annotation service = findServiceAnnotation(beanClass); AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false); // 服務實現類對應的介面 Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass); // 服務實現類對應的bean的名字,比如:demoServiceImpl String annotatedServiceBeanName = beanDefinitionHolder.getBeanName(); // 生成一個ServiceBean,核心方法4的呼叫 AbstractBeanDefinition serviceBeanDefinition = buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName); // ServiceBean Bean name ServiceBean表示服務,我們要使用一個服務應該拿ServiceBean String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass); if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean // 把ServiceBean註冊進去,對應的beanName為ServiceBean:org.apache.dubbo.demo.DemoService registry.registerBeanDefinition(beanName, serviceBeanDefinition); if (logger.isInfoEnabled()) { logger.info(..); } } else { if (logger.isWarnEnabled()) { logger.warn(...); } } } ... //核心方法4 private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,AnnotationAttributes serviceAnnotationAttributes,Class<?> interfaceClass,String annotatedServiceBeanName) { // 生成一個ServiceBean對應的BeanDefinition BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); MutablePropertyValues propertyValues = beanDefinition.getPropertyValues(); String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol", "interface", "interfaceName", "parameters"); // 把serviceAnnotation中的引數值賦值給ServiceBean的屬性 // 如:@Service(test = "test") propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames)); // References "ref" property to annotated-@Service Bean // 如:@Service(protocol = "P1"),這種就是要根據對應的值找到對應的P1的config物件裡面的值 // ref屬性賦值為另外一個bean, 對應的就是被@Service註解的服務實現類對應的bean addPropertyReference(builder, "ref", annotatedServiceBeanName); // Set interface builder.addPropertyValue("interface", interfaceClass.getName()); // Convert parameters into map builder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters"))); // 設定了methods屬性,則給ServiceBean對應的methods屬性賦值 // Add methods parameters List<MethodConfig> methodConfigs = convertMethodConfigs(serviceAnnotationAttributes.get("methods")); if (!methodConfigs.isEmpty()) { builder.addPropertyValue("methods", methodConfigs); } /** * Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference */ String providerConfigBeanName = serviceAnnotationAttributes.getString("provider"); if (StringUtils.hasText(providerConfigBeanName)) { addPropertyReference(builder, "provider", providerConfigBeanName); } /** * Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference */ String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor"); if (StringUtils.hasText(monitorConfigBeanName)) { addPropertyReference(builder, "monitor", monitorConfigBeanName); } /** * Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference */ String applicationConfigBeanName = serviceAnnotationAttributes.getString("application"); if (StringUtils.hasText(applicationConfigBeanName)) { addPropertyReference(builder, "application", applicationConfigBeanName); } /** * Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference */ String moduleConfigBeanName = serviceAnnotationAttributes.getString("module"); if (StringUtils.hasText(moduleConfigBeanName)) { addPropertyReference(builder, "module", moduleConfigBeanName); } /** * Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference * 獲取註解上設定的註冊中心的beanName */ String[] registryConfigBeanNames = serviceAnnotationAttributes.getStringArray("registry"); List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames); if (!registryRuntimeBeanReferences.isEmpty()) { builder.addPropertyValue("registries", registryRuntimeBeanReferences); } /** * Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference */ String[] protocolConfigBeanNames = serviceAnnotationAttributes.getStringArray("protocol"); List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames); if (!protocolRuntimeBeanReferences.isEmpty()) { builder.addPropertyValue("protocols", protocolRuntimeBeanReferences); } return builder.getBeanDefinition(); } .... }
【5】掃描@Reference註解,並且進行處理
1)ReferenceAnnotationBeanPostProcessor類會被呼叫是基於繼承關係
//class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor //abstract class AnnotationInjectedBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter //InstantiationAwareBeanPostProcessorAdapter類便是屬性注入時候會呼叫的 //呼叫AnnotationInjectedBeanPostProcessor抽象類的postProcessPropertyValues方法 @Override public PropertyValues postProcessPropertyValues( PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException { // 尋找需要注入的屬性(被@Reference標註的Field) InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs); try { metadata.inject(bean, beanName, pvs); } catch (BeanCreationException ex) { throw ex; } catch (Throwable ex) { throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName() + " dependencies is failed", ex); } return pvs; } //最終走回到ReferenceAnnotationBeanPostProcessor類的doGetInjectedBean方法
2)ReferenceAnnotationBeanPostProcessor中的方法
public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements ApplicationContextAware, ApplicationListener { ... // 該方法得到的物件會賦值給@ReferenceBean註解的屬性 @Override protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,InjectionMetadata.InjectedElement injectedElement) throws Exception { // 得到引入服務的beanName // attributes裡存的是@Reference註解中的所設定的屬性與值 // injectedType表示引入的是哪個服務介面 // referencedBeanName的值為 ServiceBean:org.apache.dubbo.demo.DemoService 表示得到該服務Bean的beanName // referencedBeanName表示 我現在要參照的這個服務,它匯出時對應的ServiceBean的beanName是什麼,可以用來判斷現在我參照的這個服務是不是我自己匯出的 String referencedBeanName = buildReferencedBeanName(attributes, injectedType); // @Reference(methods=[Lorg.apache.dubbo.config.annotation.Method;@39b43d60) org.apache.dubbo.demo.DemoService // 我要生成一個RefrenceBean,對應的beanName, 根據@Reference註解來標識不同 String referenceBeanName = getReferenceBeanName(attributes, injectedType); // 生成一個ReferenceBean物件,方法1 ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); // 把referenceBean新增到Spring容器中去,方法2 registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType); cacheInjectedReferenceBean(referenceBean, injectedElement); // 建立一個代理物件,Service中的屬性被注入的就是這個代理物件 // 內部會呼叫referenceBean.get(); ,核心方法1 return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType); } //方法1 private ReferenceBean buildReferenceBeanIfAbsent(String referenceBeanName, AnnotationAttributes attributes, Class<?> referencedType) throws Exception { ReferenceBean<?> referenceBean = referenceBeanCache.get(referenceBeanName); if (referenceBean == null) { // 生成了一個ReferenceBean物件,attributes是@Reference註解的引數值 ReferenceBeanBuilder beanBuilder = ReferenceBeanBuilder .create(attributes, applicationContext) .interfaceClass(referencedType); referenceBean = beanBuilder.build(); referenceBeanCache.put(referenceBeanName, referenceBean); } else if (!referencedType.isAssignableFrom(referenceBean.getInterfaceClass())) { throw new IllegalArgumentException(...); } return referenceBean; } //方法2 private void registerReferenceBean(String referencedBeanName, ReferenceBean referenceBean, AnnotationAttributes attributes, Class<?> interfaceClass) { ConfigurableListableBeanFactory beanFactory = getBeanFactory(); // @Reference(parameters=[Ljava.lang.String;@72ef8d15) org.apache.dubbo.demo.DemoService // ReferenceBean的beanName,注意這個beanName,它是直接取的@Reference的全資訊 // 所以,就算參照的是同一個服務,如果@Reference註解上的資訊不同,那麼就會生成不同的ReferenceBean String beanName = getReferenceBeanName(attributes, interfaceClass); // 要引入的服務就是本地提供的一個服務 if (existsServiceBean(referencedBeanName)) { // If @Service bean is local one /** * Get the @Service's BeanDefinition from {@link BeanFactory} * Refer to {@link ServiceAnnotationBeanPostProcessor#buildServiceBeanDefinition} */ AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) beanFactory.getBeanDefinition(referencedBeanName); RuntimeBeanReference runtimeBeanReference = (RuntimeBeanReference) beanDefinition.getPropertyValues().get("ref"); // ServiceBean --- ref // The name of bean annotated @Service String serviceBeanName = runtimeBeanReference.getBeanName(); // register Alias rather than a new bean name, in order to reduce duplicated beans // 如果是本地提供的一個服務,那麼就@Reference(parameters=[Ljava.lang.String;@72ef8d15) org.apache.dubbo.demo.DemoService // 的別名是demoService,不需要是ServiceBean的名字 beanFactory.registerAlias(serviceBeanName, beanName); } else { // Remote @Service Bean if (!beanFactory.containsBean(beanName)) { beanFactory.registerSingleton(beanName, referenceBean); } } } //核心方法1 //這裡面其實有點繞,因為@Reference其實也相當於做了@Autowired的工作 //能在本地找到,如果不代理的話其實相當於@Autowired注入屬性(不會走Dubbo的邏輯),所以包裝成代理,讓它也走Dubbo的邏輯 private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) { //這個其實是判斷本地有沒有 if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean //進行代理,讓它走Dubbo的邏輯 return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType}, wrapInvocationHandler(referenceBeanName, referenceBean)); } else { // ReferenceBean should be initialized and get immediately // 重點,服務引入的地方 return referenceBean.get(); } } ... }
【6】圖示:
【0】服務匯出要做的幾件事情:
1. 確定服務的引數 2. 確定服務支援的協定 3. 構造服務最終的URL 4. 將服務URL註冊到註冊中心去 5. 根據服務支援的不同協定,啟動不同的Server,用來接收和處理請求 6. 因為Dubbo支援動態設定服務引數,所以服務匯出時還需要繫結一個監聽器Listener來監聽服務的引數是否有修改,如果發現有修改,則需要重新進行匯出
【1】核心點記錄
ServiceBean的兩種暴露服務的方法 1.利用InitializingBean介面,呼叫export()方法(沒有監聽器的情況下才行) 2.利用監聽ContextRefreshedEvent事件達到服務暴露
動態代理生成 Invoker 包裝成 wrapperInvoker
RegistryProtocol進行註冊
DubboProtocol對 Invoker 進行匯出,返回一個Exporter
ExchangeServer
【2】ServiceBean是怎麼程序服務匯出的
//1是利用InitializingBean介面,呼叫export()方法【主要是呼叫父類別的export()方法和釋出ServiceBeanExportedEvent事件】 //2是利用監聽ContextRefreshedEvent事件達到服務暴露 public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware { .... @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 如果某一個Service是通過Spring暴露的, // 那麼當需要獲取該服務時就要從Spring容器中進行獲取, // 也就是從applicationContext中獲取,所以需要把applicationContext新增到SpringExtensionFactory中去 SpringExtensionFactory.addApplicationContext(applicationContext); // 一定要有這一步,不然ServiceBean將接收不到ContextRefreshedEvent事件 supportedApplicationListener = addApplicationListener(applicationContext, this); } //當接收到監聽ContextRefreshedEvent事件時候 @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 當前服務沒有被匯出並且沒有解除安裝,才匯出服務 if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 服務匯出(服務註冊) export(); } } @Override @SuppressWarnings({"unchecked", "deprecation"}) public void afterPropertiesSet() throws Exception { // 如果@Service中沒有設定provider if (getProvider() == null) { // 就從Spring容器中找ProviderConfig型別的Bean Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false); if (providerConfigMap != null && providerConfigMap.size() > 0) { // 從Spring容器中找ProtocolConfig型別的Bean Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false); // 如果存在ProtocolConfig存在,並且存在多個ProviderConfig if (CollectionUtils.isEmptyMap(protocolConfigMap) && providerConfigMap.size() > 1) { // backward compatibility // 如果找到多個,取第一個default等於true的ProviderConfig List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>(); for (ProviderConfig config : providerConfigMap.values()) { if (config.isDefault() != null && config.isDefault()) { providerConfigs.add(config); } } if (!providerConfigs.isEmpty()) { setProviders(providerConfigs); } } else { ProviderConfig providerConfig = null; for (ProviderConfig config : providerConfigMap.values()) { if (config.isDefault() == null || config.isDefault()) { if (providerConfig != null) { throw new IllegalStateException(...); } providerConfig = config; } } if (providerConfig != null) { setProvider(providerConfig); } } } } if (getApplication() == null && (getProvider() == null || getProvider().getApplication() == null)) { Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false); if (applicationConfigMap != null && applicationConfigMap.size() > 0) { ApplicationConfig applicationConfig = null; for (ApplicationConfig config : applicationConfigMap.values()) { if (applicationConfig != null) { throw new IllegalStateException(...); } applicationConfig = config; } if (applicationConfig != null) { setApplication(applicationConfig); } } } if (getModule() == null && (getProvider() == null || getProvider().getModule() == null)) { Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false); if (moduleConfigMap != null && moduleConfigMap.size() > 0) { ModuleConfig moduleConfig = null; for (ModuleConfig config : moduleConfigMap.values()) { if (config.isDefault() == null || config.isDefault()) { if (moduleConfig != null) { throw new IllegalStateException(...); } moduleConfig = config; } } if (moduleConfig != null) { setModule(moduleConfig); } } } // registryIds程式碼能看到,但是沒找到在哪裡能設定 if (StringUtils.isEmpty(getRegistryIds())) { if (getApplication() != null && StringUtils.isNotEmpty(getApplication().getRegistryIds())) { setRegistryIds(getApplication().getRegistryIds()); } if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getRegistryIds())) { setRegistryIds(getProvider().getRegistryIds()); } } if ((CollectionUtils.isEmpty(getRegistries())) && (getProvider() == null || CollectionUtils.isEmpty(getProvider().getRegistries())) && (getApplication() == null || CollectionUtils.isEmpty(getApplication().getRegistries()))) { Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false); if (CollectionUtils.isNotEmptyMap(registryConfigMap)) { List<RegistryConfig> registryConfigs = new ArrayList<>(); if (StringUtils.isNotEmpty(registryIds)) { Arrays.stream(COMMA_SPLIT_PATTERN.split(registryIds)).forEach(id -> { if (registryConfigMap.containsKey(id)) { registryConfigs.add(registryConfigMap.get(id)); } }); } if (registryConfigs.isEmpty()) { for (RegistryConfig config : registryConfigMap.values()) { if (StringUtils.isEmpty(registryIds) && (config.isDefault() == null || config.isDefault().booleanValue())) { registryConfigs.add(config); } } } if (!registryConfigs.isEmpty()) { super.setRegistries(registryConfigs); } } } if (getMetadataReportConfig() == null) { Map<String, MetadataReportConfig> metadataReportConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class, false, false); if (metadataReportConfigMap != null && metadataReportConfigMap.size() == 1) { super.setMetadataReportConfig(metadataReportConfigMap.values().iterator().next()); } else if (metadataReportConfigMap != null && metadataReportConfigMap.size() > 1) { throw new IllegalStateException("Multiple MetadataReport configs: " + metadataReportConfigMap); } } if (getConfigCenter() == null) { Map<String, ConfigCenterConfig> configenterMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterConfig.class, false, false); if (configenterMap != null && configenterMap.size() == 1) { super.setConfigCenter(configenterMap.values().iterator().next()); } else if (configenterMap != null && configenterMap.size() > 1) { throw new IllegalStateException("Multiple ConfigCenter found:" + configenterMap); } } if (getMonitor() == null && (getProvider() == null || getProvider().getMonitor() == null) && (getApplication() == null || getApplication().getMonitor() == null)) { Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false); if (monitorConfigMap != null && monitorConfigMap.size() > 0) { MonitorConfig monitorConfig = null; for (MonitorConfig config : monitorConfigMap.values()) { if (config.isDefault() == null || config.isDefault()) { if (monitorConfig != null) { throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config); } monitorConfig = config; } } if (monitorConfig != null) { setMonitor(monitorConfig); } } } if (getMetrics() == null) { Map<String, MetricsConfig> metricsConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class, false, false); if (metricsConfigMap != null && metricsConfigMap.size() > 0) { MetricsConfig metricsConfig = null; for (MetricsConfig config : metricsConfigMap.values()) { if (metricsConfig != null) { throw new IllegalStateException("Duplicate metrics configs: " + metricsConfig + " and " + config); } metricsConfig = config; } if (metricsConfig != null) { setMetrics(metricsConfig); } } } // protocolIds也沒看到在哪裡設定 if (StringUtils.isEmpty(getProtocolIds())) { if (getProvider() != null && StringUtils.isNotEmpty(getProvider().getProtocolIds())) { setProtocolIds(getProvider().getProtocolIds()); } } if (CollectionUtils.isEmpty(getProtocols()) && (getProvider() == null || CollectionUtils.isEmpty(getProvider().getProtocols()))) { Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false); if (protocolConfigMap != null && protocolConfigMap.size() > 0) { List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>(); if (StringUtils.isNotEmpty(getProtocolIds())) { Arrays.stream(COMMA_SPLIT_PATTERN.split(getProtocolIds())) .forEach(id -> { if (protocolConfigMap.containsKey(id)) { protocolConfigs.add(protocolConfigMap.get(id)); } }); } if (protocolConfigs.isEmpty()) { for (ProtocolConfig config : protocolConfigMap.values()) { if (StringUtils.isEmpty(protocolIds)) { protocolConfigs.add(config); } } } if (!protocolConfigs.isEmpty()) { super.setProtocols(protocolConfigs); } } } if (StringUtils.isEmpty(getPath())) { if (StringUtils.isNotEmpty(beanName) && StringUtils.isNotEmpty(getInterface()) && beanName.startsWith(getInterface())) { setPath(beanName); } } //沒有監聽事件才做暴露服務 if (!supportedApplicationListener) { export(); } } //服務暴露的核心方法 @Override public void export() { super.export(); // Publish ServiceBeanExportedEvent publishExportEvent(); } private void publishExportEvent() { ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this); applicationEventPublisher.publishEvent(exportEvent); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }
1)ServiceConfig類#export()方法
public synchronized void export() { //讀取設定並補全(最新最全的設定),方法1 checkAndUpdateSubConfigs(); // 檢查服務是否需要匯出 if (!shouldExport()) { return; } // 檢查是否需要延遲釋出 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 匯出服務,方法2 doExport(); } }
2)方法1:ServiceConfig類#checkAndUpdateSubConfigs()方法
/** * 1. ServiceConfig中的某些屬性如果是空的,那麼就從ProviderConfig、ModuleConfig、ApplicationConfig中獲取 * 2. 從設定中心獲取設定,包括應用設定和全域性設定 * 3. 從設定中心獲取Provider設定 * 4. 從設定中心獲取Protocol設定 * 5. 如果ApplicationConfig為空,則構造一個ApplicationConfig * 6. 從設定中心獲取Registry設定 * 7. 更新ServiceConfig中的屬性為優先順序最高的設定 * 8. 更新MetadataReportConfig中的屬性為優先順序最高的設定 * 9. 檢查當前服務是不是一個泛化服務 * 10.檢查Stub和Local * 11.檢查Mock */ public void checkAndUpdateSubConfigs() { // ServiceConfig中的某些屬性如果是空的,那麼就從ProviderConfig、ModuleConfig、ApplicationConfig中獲取(之前生成的設定Bean) completeCompoundConfigs(); // 方法1.1 // 從設定中心獲取設定,包括應用設定和全域性設定 // 把獲取到的設定放入到Environment中的externalConfigurationMap和appExternalConfigurationMap中 // 並重新整理所有的Config屬性 startConfigCenter(); // 如果沒有ProviderConfig物件,則建立一個 checkDefault(); // 如果沒有單獨的設定protocols,那麼就從provider獲取設定的協定,新增到的ServiceConfig中去 // 假如程式設計師在組態檔中配了一個dubbo協定,設定中心的全域性設定或應用設定中也設定了一個協定,那麼就會被新增到ServiceConfig中 checkProtocol(); checkApplication(); // if protocol is not injvm checkRegistry // 如果protocol不是隻有injvm協定,表示服務呼叫不是隻在本機jvm裡面呼叫,那就需要用到註冊中心 // 如果protocol是injvm,表示本地呼叫 if (!isOnlyInJvm()) { checkRegistry(); } // 重新整理ServiceConfig,方法1.2 this.refresh(); // 如果配了metadataReportConfig,那麼就重新整理設定 checkMetadataReport(); if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } // 當前服務對應的實現類是一個GenericService,表示沒有特定的介面 if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { // 載入介面 try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 重新整理MethodConfig,並判斷MethodConfig中對應的方法在介面中是否存在 checkInterfaceAndMethods(interfaceClass, methods); // 實現類是不是該介面型別 checkRef(); generic = Boolean.FALSE.toString(); } // local和stub一樣,不建議使用了 if (local != null) { // 如果本地存根為true,則存根類為interfaceName + "Local" if (Boolean.TRUE.toString().equals(local)) { local = interfaceName + "Local"; } // 載入本地存根類 Class<?> localClass; try { localClass = ClassUtils.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } // 本地存根 if (stub != null) { // 如果本地存根為true,則存根類為interfaceName + "Stub" if (Boolean.TRUE.toString().equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName); } } // 檢查local和stub checkStubAndLocal(interfaceClass); // 檢查mock checkMock(interfaceClass); }
3)方法1.1,AbstractInterfaceConfig類#startConfigCenter()方法
void startConfigCenter() { if (configCenter == null) { ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc); } // 如果設定了ConfigCenter if (this.configCenter != null) { // 從其他位置獲取設定中心的相關屬性資訊,比如設定中心地址 // TODO there may have duplicate refresh this.configCenter.refresh(); // 屬性更新後,從遠端設定中心獲取資料(應用設定,全域性設定) prepareEnvironment(); } // 從設定中心取到設定資料後,重新整理所有的XxConfig中的屬性 ConfigManager.getInstance().refreshAll(); } private void prepareEnvironment() { if (configCenter.isValid()) { if (!configCenter.checkOrUpdateInited()) { return; } // 動態設定中心,管理臺上的設定中心 DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl()); // 如果是zookeeper,獲取的就是/dubbo/config/dubbo/dubbo.properties節點中的內容 String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup()); String appGroup = application != null ? application.getName() : null; String appConfigContent = null; if (StringUtils.isNotEmpty(appGroup)) { // 獲取的就是/dubbo/config/dubbo-demo-consumer-application/dubbo.properties節點中的內容 // 這裡有bug appConfigContent = dynamicConfiguration.getProperties (StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(), appGroup ); } try { Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority()); Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent)); Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent)); } catch (IOException e) { throw new IllegalStateException(...); } } }
4)方法1.2,AbstractInterfaceConfig類#refresh()方法
// 重新整理XxConfig // 一個XxConfig物件的屬性可能是有值的,也可能是沒有值的,這時需要從其他位置獲取屬性值,來進行屬性的覆蓋 // 覆蓋的優先順序,從大到小為系統變數->設定中心應用設定->設定中心全域性設定->註解或xml中定義->dubbo.properties檔案 // 以ServiceConfig為例,ServiceConfig中包括很多屬性,比如timeout // 但是在定義一個Service時,如果在註解上沒有設定timeout,那麼就會其他地方獲取timeout的設定 // 比如可以從系統變數->設定中心應用設定->設定中心全域性設定->註解或xml中定義->dubbo.properties檔案 // refresh是重新整理,將當前ServiceConfig上的set方法所對應的屬性更新為優先順序最高的值 public void refresh() { try { CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId()); // 表示XxConfig物件本身- AbstractConfig Configuration config = new ConfigConfigurationAdapter(this); //設定順序, if (Environment.getInstance().isConfigCenterFirst()) { // The sequence would be: SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration compositeConfiguration.addConfiguration(4, config); } else { // The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration compositeConfiguration.addConfiguration(2, config); } // loop methods, get override value and set the new value back to method // Method[] methods = getClass().getMethods(); for (Method method : methods) { // 是不是setXX()方法 if (MethodUtils.isSetter(method)) { // 獲取xx設定項的value String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); // isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig. if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) { method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value)); } // 是不是setParameters()方法 } else if (isParametersSetter(method)) { // 獲取parameter設定項的value String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); if (StringUtils.isNotEmpty(value)) { Map<String, String> map = invokeGetParameters(getClass(), this); map = map == null ? new HashMap<>() : map; map.putAll(convert(StringUtils.parseParameters(value), "")); invokeSetParameters(getClass(), this, map); } } } } catch (Exception e) { logger.error("Failed to override ", e); } } public CompositeConfiguration getConfiguration(String prefix, String id) { CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); // Config center has the highest priority // JVM環境變數 compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id)); // 作業系統環境變數 compositeConfiguration.addConfiguration(this.getEnvironmentConfig(prefix, id)); // 設定中心APP設定 compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id)); // 設定中心Global設定 compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id)); // dubbo.properties中的設定 compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id)); return compositeConfiguration; }
5)方法2,ServiceConfig類#doExport()方法
protected synchronized void doExport() { // 當前服務已經被取消了,就不能再匯出了 if (unexported) { throw new IllegalStateException(...); } // 已經匯出了,就不再匯出了 if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); } @SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { // 得到url,註冊服務也是一個服務,所以也會有對應的url,通過呼叫該url完成服務註冊 List<URL> registryURLs = loadRegistries(true); // // 遍歷每個協定 // 一個協定一個服務 for (ProtocolConfig protocolConfig : protocols) { // path表示服務名 // contextPath表示應用名(可設定) // pathKey = group/contextpath/path:version // 例子:myGroup/user/org.apache.dubbo.demo.DemoService:1.0.1 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); // ProviderModel中存在服務提供者存取路徑,實現類,介面,以及介面中的各個方法對應的ProviderMethodModel // ProviderMethodModel表示某一個方法,方法名,所屬的服務的, ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); // ApplicationModel表示應用中有哪些服務提供者和參照了哪些服務 ApplicationModel.initProviderModel(pathKey, providerModel); // 每種協定匯出一個單獨的服務,註冊到各個註冊中心 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
6)doExportUrlsFor1Protocol方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // protocolConfig表示某個協定,registryURLs表示所有的註冊中心 // 如果設定的某個協定,沒有設定name,那麼預設為dubbo String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } // 這個map表示服務url的引數 Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); // 監控中心引數 appendParameters(map, metrics); // 應用相關引數 appendParameters(map, application); // 模組相關引數 appendParameters(map, module); // 提供者相關引數 appendParameters(map, provider); // 協定相關引數 appendParameters(map, protocolConfig); // 服務本身相關引數 appendParameters(map, this); // 服務中某些方法引數 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { // 某個方法的設定引數,注意有prefix appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; // 如果某個方法設定存在xx.retry=false,則改成xx.retry=0 if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if (Boolean.FALSE.toString().equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { // 遍歷當前方法設定中的引數設定 for (ArgumentConfig argument : arguments) { // 如果設定了type,則遍歷當前介面的所有方法,然後找到方法名和當前方法名相等的方法,可能存在多個 // 如果設定了index,則看index對應位置的引數型別是否等於type,如果相等,則向map中存入argument物件中的引數 // 如果沒有設定index,那麼則遍歷方法所有的引數型別,等於type則向map中存入argument物件中的引數 // 如果沒有設定type,但設定了index,則把對應位置的argument放入map // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException(...); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException(...); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException(...); } } } } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 通過介面對應的Wrapper,拿到介面中所有的方法名字 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // Token是為了防止服務被消費者直接呼叫(偽造http請求) // 主要是存於註冊中心,呼叫時Token匹配成功即算通過(所以要求呼叫者是通過註冊中心獲取提供方的資訊) if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // export service // 通過該host和port存取該服務 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 服務url URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); // url:http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=285072&release=&side=provider×tamp=1585206500409 // 可以通過ConfiguratorFactory,在服務匯出時候進行統一設定 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // scope可能為null,remote, local,none // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // 如果scope為none,則不會進行任何的服務匯出,既不會遠端,也不會本地 // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // 如果scope不是remote,則會進行本地匯出,會把當前url的protocol改為injvm,然後進行匯出 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { // 如果scope不是local,則會進行遠端匯出 if (CollectionUtils.isNotEmpty(registryURLs)) { // 如果有註冊中心,則將服務註冊到註冊中心 for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register // 如果是injvm,則不需要進行註冊中心註冊 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } // 該服務是否是動態,對應zookeeper上表示是否是臨時節點,對應dubbo中的功能就是靜態服務 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 基於註冊中心地址的到監控中心地址,為什麼是基於註冊中心地址? URL monitorUrl = loadMonitor(registryURL); // 把監控中心地址新增到服務url中 if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } // 服務的register引數,如果為true,則表示要註冊到註冊中心 if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info(...); } else { logger.info(...); } } // For providers, this is used to enable custom proxy to generate invoker // 服務使用的動態代理機制,如果為空則使用javassit String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 生成一個當前服務介面的代理物件 // 使用代理生成一個Invoker,Invoker表示服務提供者的代理,可以使用Invoker的invoke方法執行服務 // 對應的url為 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=http%3A%2F%2F192.168.40.17%3A80%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.40.17%26bind.port%3D80%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D19472%26release%3D%26side%3Dprovider%26timestamp%3D1585207994860&pid=19472®istry=zookeeper×tamp=1585207994828 // 這個Invoker中包括了服務的實現者、服務介面類、服務的註冊地址(針對當前服務的,引數export指定了當前服務) // 此invoker表示一個可執行的服務,呼叫invoker的invoke()方法即可執行服務,同時此invoker也可用來匯出 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker也表示服務提供者,包括了Invoker和服務的設定 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 使用特定的協定來對服務進行匯出,這裡的協定為RegistryProtocol,匯出成功後得到一個Exporter // 1. 先使用RegistryProtocol進行服務註冊 // 2. 註冊完了之後,使用DubboProtocol進行匯出 Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // 沒有設定註冊中心時,也會匯出服務 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } // 根據服務url,講服務的元資訊存入後設資料中心 MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
7)protocol.export的呼叫
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 匯出服務 // registry:// ---> RegistryProtocol // zookeeper:// ---> ZookeeperRegistry // dubbo:// ---> DubboProtocol // provider:// ---> // 將registry://xxx?xx=xx®istry=zookeeper 轉為---> zookeeper://xxx?xx=xx URL registryUrl = getRegistryUrl(originInvoker); // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.40.17%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.40.17%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D27656%26release%3D2.7.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1590735956489&logger=log4j&pid=27656&release=2.7.0×tamp=1590735956479 // 得到服務提供者url URL providerUrl = getProviderUrl(originInvoker); // dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489 // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // overrideSubscribeUrl是老版本的動態設定監聽url,表示了需要監聽的服務以及監聽的型別(configurators, 這是老版本上的動態設定) // 在服務提供者url的基礎上,生成一個overrideSubscribeUrl,協定為provider://,增加引數category=configurators&check=false final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 一個overrideSubscribeUrl對應一個OverrideListener,用來監聽變化事件,監聽到overrideSubscribeUrl的變化後, // OverrideListener就會根據變化進行相應處理,具體處理邏輯看OverrideListener的實現 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 在這個方法裡會利用providerConfigurationListener和serviceConfigurationListener去重寫providerUrl // providerConfigurationListener表示應用級別的動態設定監聽器,providerConfigurationListener是RegistyProtocol的一個屬性 // serviceConfigurationListener表示服務級別的動態設定監聽器,serviceConfigurationListener是在每暴露一個服務時就會生成一個 // 這兩個監聽器都是新版本中的監聽器 // 新版本監聽的zk路徑是: // 服務: /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators節點的內容 // 應用: /dubbo/config/dubbo/dubbo-demo-provider-application.configurators節點的內容 // 注意,要喝設定中心的路徑區分開來,設定中心的路徑是: // 應用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties節點的內容 // 全域性:/dubbo/config/dubbo/dubbo.properties節點的內容 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // export invoker // 根據動態設定重寫了providerUrl之後,就會呼叫DubboProtocol或HttpProtocol去進行匯出服務了 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 得到註冊中心-ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 得到存入到註冊中心去的providerUrl,會對服務提供者url中的引數進行簡化 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // 將當前服務提供者Invoker,以及該服務對應的註冊中心地址,以及簡化後的服務url存入ProviderConsumerRegTable ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish //是否需要註冊到註冊中心 boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 註冊服務,把簡化後的服務提供者url註冊到registryUrl中去 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // 針對老版本的動態設定,需要把overrideSubscribeListener繫結到overrideSubscribeUrl上去進行監聽 // 相容老版本的設定修改,利用overrideSubscribeListener去監聽舊版本的動態設定變化 // 監聽overrideSubscribeUrl provider://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=416332&release=&side=provider×tamp=1585318241955 // 那麼新版本的providerConfigurationListener和serviceConfigurationListener是在什麼時候進行訂閱的呢?在這兩個類構造的時候 // Deprecated! Subscribe to override rules in 2.6.x or before. // 老版本監聽的zk路徑是:/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000 // 監聽的是路徑的內容,不是節點的內容 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } public void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); // 呼叫FailbackRegistry類的方法再轉到ZookeeperRegistry的register方法 registry.register(registeredProviderUrl); } //FailbackRegistry類(進行失敗重試) @Override public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // 這裡才會呼叫ZookeeperRegistry類的方法 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException(...); } else { logger.error(...); } addFailedRegistered(url); } } //ZookeeperRegistry的真正註冊的地方 @Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
8)doLocalExport方法
@SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); // protocol屬性的值是哪來的,是在SPI中注入進來的,是一個代理類 // 這裡實際利用的就是DubboProtocol或HttpProtocol去export NettyServer // 為什麼需要ExporterChangeableWrapper?方便登出已經被匯出的服務 return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); // 構造一個Exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(...); } } else { // 服務的stub方法 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 開啟NettyServer openServer(url); optimizeSerialization(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); // 獲得ip地址和port, 192.168.40.17:20880 // NettyClient, NettyServer //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 快取Server物件 ExchangeServer server = serverMap.get(key); // DCL,Double Check Lock if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 建立Server,並進行快取 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override // 服務重新匯出時,就會走這裡 server.reset(url); } } } private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); // 協定的伺服器端實現型別,比如:dubbo協定的mina,netty等,http協定的jetty,servlet等,預設為netty String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } // 通過url繫結埠,和對應的請求處理器 ExchangeServer server; try { // requestHandler是請求處理器,型別為ExchangeHandler // 表示從url的埠接收到請求後,requestHandler來進行處理 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 協定的使用者端實現型別,比如:dubbo協定的mina,netty等 str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
9)當資料發生改變時 OverrideListener 監聽者的處理
@Override public synchronized void notify(List<URL> urls) { List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY)); // No matching results if (matchedUrls.isEmpty()) { return; } // 對發生了變化的url進行過濾,只取url是override協定,或者引數category等於configurators的url this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator)).orElse(configurators); // 根據Override協定修改 doOverrideIfNecessary(); } public synchronized void doOverrideIfNecessary() { final Invoker<?> invoker; if (originInvoker instanceof InvokerDelegate) { invoker = ((InvokerDelegate<?>) originInvoker).getInvoker(); } else { invoker = originInvoker; } //The origin invoker 當前服務的原始服務提供者url URL originUrl = RegistryProtocol.this.getProviderUrl(invoker); String key = getCacheKey(originInvoker); ExporterChangeableWrapper<?> exporter = bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); return; } //The current, may have been merged many times,當前服務被匯出的url URL currentUrl = exporter.getInvoker().getUrl(); //根據configurators修改url,configurators是全量的,並不是某個新增的或刪除的,所以是基於原始的url進行修改,並不是基於currentUrl //Merged with this configuration URL newUrl = getConfigedInvokerUrl(configurators, originUrl); newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl); newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()).getConfigurators(), newUrl); // 修改過的url如果和目前的url不相同,則重新按newUrl匯出 if (!currentUrl.equals(newUrl)) { RegistryProtocol.this.reExport(originInvoker, newUrl); logger.info(...); } } public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) { // 根據newInvokerUrl進行匯出 // update local exporter ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl); // 獲取準確的ProviderUrl // update registry URL registryUrl = getRegistryUrl(originInvoker); // 對於一個服務提供者url,在註冊到註冊中心時,會先進行簡化,所以如果 final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl); //decide if we need to re-publish // 根據getServiceKey獲取ProviderInvokerWrapper ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker); // 生成一個新的ProviderInvokerWrapper ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); /** * Only if the new url going to Registry is different with the previous one should we do unregister and register. * 如果新的服務提供者url簡化後的url和這個服務之前的服務提供者url簡化後的url不相等,則需要把新的簡化後的服務提供者url註冊到註冊中心去 */ if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) { unregister(registryUrl, providerInvokerWrapper.getProviderUrl()); register(registryUrl, registeredProviderUrl); newProviderInvokerWrapper.setReg(true); } exporter.setRegisterUrl(registeredProviderUrl); } @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) { String key = getCacheKey(originInvoker); final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); } else { // 到這裡才能真正明白,為什麼需要InvokerDelegate // InvokerDelegate表示一個呼叫者,由invoker+url構成,invoker不變,url可變 final Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl); //這次openServer會走HeaderExchangeServer的reset方法 exporter.setExporter(protocol.export(invokerDelegate)); } return exporter; } //這裡面存在重新匯出,關閉舊的延遲任務(舊的心跳任務),開啟新的心跳任務,但是Netty不會關閉,也不會重啟 @Override public void reset(URL url) { server.reset(url); try { int currHeartbeat = getHeartbeat(getUrl()); int currIdleTimeout = getIdleTimeout(getUrl()); int heartbeat = getHeartbeat(url); int idleTimeout = getIdleTimeout(url); if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) { cancelCloseTask(); startIdleCheckTask(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); } }
【3】彙總
服務匯出的入口為ServiceBean中的export()方法,當Spring啟動完之後,通過接收Spring的ContextRefreshedEvent事件來觸發export()方法的執行。 一個ServiceBean物件就表示一個Dubbo服務,ServiceBean物件中的引數就表示服務的引數,比如timeout,該物件的引數值來至@Service註解中所定義的。 服務匯出主要得做兩件事情: 1. 根據服務的引數資訊,啟動對應的網路伺服器(netty、tomcat、jetty等),用來接收網路請求 2. 將服務的資訊註冊到註冊中心 但是在做這兩件事情之前得先把服務的引數確定好,因為一個Dubbo服務的引數,除開可以在@Service註解中去設定,還會繼承Dubbo服務所屬應用(Application)上的設定,
還可以在設定中心或JVM環境變數中去設定某個服務的引數,所以首先要做的是確定好當前服務最終的(優先順序最高)的引數值。 確定好服務引數之後,就根據所設定的協定啟動對應的網路伺服器。在啟動網路伺服器時,並且在網路伺服器接收請求的過程中,都可以從服務引數中獲取資訊,比如最大連線數,執行緒數,socket超時時間等等。 啟動完網路伺服器之後,就將服務資訊註冊到註冊中心。同時還有向註冊中心註冊監聽器,監聽Dubbo的中的動態設定資訊變更。
【0】核心點記錄
生成代理物件(代理物件應該包含的功能){ 1.獲取服務提供者列表 2.Mock--------MockClusterInvoker 3.路由篩選 4.負載均衡 5.叢集容錯------FailoverClusterInvoker 6.構造NettyClient 7.傳送資料(Invocation) } 代理物件的Invoker MockClusterInvoker Invoker屬性塞入FailoverClusterInvoker FailoverClusterInvoker Invoker屬性塞入DubboInvoker
【1】服務要怎麼引入(ReferenceConfig類#get()方法)
//服務引入的入口方法 public synchronized T get() { //讀取設定並補全(最新最全的設定) checkAndUpdateSubConfigs(); if (destroyed) { throw new IllegalStateException(...); } if (ref == null) { // 入口 init(); } return ref; // Invoke代理 }
【2】檢查並拿到最新設定(ReferenceConfig類#checkAndUpdateSubConfigs()方法)
public void checkAndUpdateSubConfigs() { if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException(...); } // 填充ReferenceConfig物件中的屬性 completeCompoundConfigs(); // 開啟設定中心 startConfigCenter(); // get consumer's global configuration checkDefault(); // 重新整理ReferenceConfig物件的屬性值 this.refresh(); // 設定泛化 if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(...); } checkInterfaceAndMethods(interfaceClass, methods); } resolveFile(); checkApplication(); checkMetadataReport(); }
【3】初始化生成代理物件(ReferenceConfig類#init()方法)
private void init() { if (initialized) { return; } //準備引數,進行引數設定 checkStubAndLocal(interfaceClass); checkMock(interfaceClass); Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, CONSUMER_SIDE); appendRuntimeParameters(map); if (!ProtocolUtils.isGeneric(getGeneric())) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR)); } } map.put(INTERFACE_KEY, interfaceName); appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); // remove 'default.' prefix for configs from ConsumerConfig // appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, consumer); appendParameters(map, this); Map<String, Object> attributes = null; if (CollectionUtils.isNotEmpty(methods)) { attributes = new HashMap<String, Object>(); for (MethodConfig methodConfig : methods) { appendParameters(map, methodConfig, methodConfig.getName()); String retryKey = methodConfig.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(methodConfig.getName() + ".retries", "0"); } } attributes.put(methodConfig.getName(), convertMethodConfig2AsyncInfo(methodConfig)); } } String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException(...); } map.put(REGISTER_IP_KEY, hostToRegistry); //引數設定完成後去生成代理物件 ref = createProxy(map); String serviceKey = URL.buildKey(interfaceName, group, version); ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); initialized = true; }
【4】生成代理物件(ReferenceConfig類#createProxy()方法)
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { if (shouldJvmRefer(map)) { // injvm:// URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); } else { // 為什麼會有urls,因為可以在@Reference的url屬性中設定多個url,可以是對等的服務地址,也可以是註冊中心的地址 urls.clear(); // reference retry init will add url to urls, lead to OOM // @Reference中指定了url屬性 if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = SEMICOLON_SPLIT_PATTERN.split(url); // 用;號切分 if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } // 如果是註冊中心地址,則在url中新增一個refer引數 if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // map表示消費者端設定的引數 urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { // 如果是服務地址 // 有可能url中設定了引數,map中表示的服務消費者消費服務時的引數,所以需要合併 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // @Reference中的protocol屬性表示使用哪個協定呼叫服務,如果不是本地呼叫協定injvm://,則把註冊中心地址找出來 // 對於injvm://協定已經在之前的邏輯中就已經生成invoke了 // if protocols not injvm checkRegistry if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){ checkRegistry(); // 載入註冊中心地址 List<URL> us = loadRegistries(false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 對於註冊中心地址都新增REFER_KEY urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException(...); } } } // 如果只有一個url則直接refer得到一個invoker if (urls.size() == 1) { // RegistryProtocol.refer() 或者 DubboProtocol.refer() invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { // 如果有多個url // 1. 根據每個url,refer得到對應的invoker // 2. 如果這多個urls中存在註冊中心url,則把所有invoker整合為RegistryAwareClusterInvoker,該Invoker在呼叫時,會檢視所有Invoker中是否有預設的,如果有則使用預設的Invoker,如果沒有,則使用第一個Invoker // 2. 如果這多個urls中不存在註冊中心url,則把所有invoker整合為FailoverCluster List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 用來記錄urls中最後一個註冊中心url for (URL url : urls) { invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } // 如果存在註冊中心地址 if (registryURL != null) { // registry url is available // use RegistryAwareCluster only when register's CLUSTER is available URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); // StaticDirectory表示靜態服務目錄,裡面的invokers是不會變的, 生成一個RegistryAwareCluster // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. // 如果不存在註冊中心地址, 生成一個FailoverClusterInvoker invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } if (shouldCheck() && !invoker.isAvailable()) { throw new IllegalStateException(...); } MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataReportService.publishConsumer(consumerURL); } // create service proxy return (T) PROXY_FACTORY.getProxy(invoker); }
【5】PROXY_FACTORY.getProxy等過程
//預設採用JavassistProxyFactory來產生代理物件 public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' // 如果現在被代理的物件proxy本身就是一個已經被代理過的物件,那麼則取代理類的Wrapper,否則取type(介面)的Wrapper // Wrapper是針對某個類或某個介面的包裝類,通過wrapper物件可以更方便的去執行某個類或某個介面的方法 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // proxy是服務實現類 // type是服務介面 // url是一個註冊中心url,但同時也記錄了 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 執行proxy的method方法 // 執行的proxy範例的方法 // 如果沒有wrapper,則要通過原生的反射技術去獲取Method物件,然後執行 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } } public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 這裡的recreate方法很重要,他會呼叫AppResponse的recreate方法, // 如果AppResponse物件中存在exception資訊,則此方法中會throw這個異常 return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
【6】對URL的處理過程
1)從@Reference的url屬性中設定多個url,然後採用字串分割的形式拿出來,包裝後塞入urls列表中
2)載入註冊中心地址
protected List<URL> loadRegistries(boolean provider) { // check && override if necessary List<URL> registryList = new ArrayList<URL>(); if (CollectionUtils.isNotEmpty(registries)) { for (RegistryConfig config : registries) { String address = config.getAddress(); // 如果註冊中心沒有配地址,則地址為0.0.0.0 if (StringUtils.isEmpty(address)) { address = ANYHOST_VALUE; } // 如果註冊中心的地址不是"N/A" if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 把application中的引數放入map中,注意,map中的key是沒有prefix的 appendParameters(map, application); // 把config中的引數放入map中,注意,map中的key是沒有prefix的 // config是RegistryConfig,表示註冊中心 appendParameters(map, config); // 此處path值固定為RegistryService.class.getName(),因為現在是在載入註冊中心 map.put(PATH_KEY, RegistryService.class.getName()); // 把dubbo的版本資訊和pid放入map中 appendRuntimeParameters(map); // 如果map中如果沒有protocol,那麼預設為dubbo if (!map.containsKey(PROTOCOL_KEY)) { map.put(PROTOCOL_KEY, DUBBO_PROTOCOL); } // 構造註冊中心url,地址+引數 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = URLBuilder.from(url) .addParameter(REGISTRY_KEY, url.getProtocol()) .setProtocol(REGISTRY_PROTOCOL) .build(); // 到此為止,url的內容大概為: // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=269936®istry=zookeeper×tamp=1584886077813 // 該url表示:使用registry協定呼叫org.apache.dubbo.registry.RegistryService服務 // 引數為application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=269936®istry=zookeeper×tamp=1584886077813 // 這裡是服務提供者和服務消費者區別的邏輯 // 如果是服務提供者,獲取register的值,如果為false,表示該服務不註冊到註冊中心 // 如果是服務消費者,獲取subscribe的值,如果為false,表示該引入的服務不訂閱註冊中心中的資料 if ((provider && url.getParameter(REGISTER_KEY, true)) || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
【7】invoker的包裝過程
1)前置說明
//RegistryProtocol實際上會被兩個包裝類包裝ProtocolListenerWrapper與ProtocolFilterWrapper //如ProtocolListenerWrapper的protocol屬性存放ProtocolFilterWrapper //ProtocolFilterWrapper的protocol屬性存放RegistryProtocol //然後是Cluster,通過介面可知預設是FailoverCluster,但實際上還會有一個包裝類MockClusterWrapper //MockClusterWrapper的cluster屬性存放FailoverCluster或者RegistryAwareCluster
2)先是呼叫REF_PROTOCOL.refer
//ProtocolListenerWrapper的處理 @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); } //ProtocolFilterWrapper的處理 @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 根據url獲取filter,根據url中的parameters取key為key的value所對應的filter,但是還會匹配group List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // ConsumerContextFilter--->FutureFilter--->MonitorFilter // ConsumerContextFilter用來設定RpcContext if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 得到一個非同步結果 asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } throw e; } return asyncResult; } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return new CallbackRegistrationInvoker<>(last, filters); } //RegistryProtocol的處理 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 從registry://的url中獲取對應的註冊中心,比如zookeeper, 預設為dubbo,dubbo提供了自帶的註冊中心實現 // url由 registry:// 改變為---> zookeeper:// url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); // 拿到註冊中心實現,ZookeeperRegistry Registry registry = registryFactory.getRegistry(url); // 下面這個程式碼,通過過git歷史提交記錄是用來解決SimpleRegistry不可用的問題 if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // qs表示 queryString, 表示url中的引數,表示消費者引入服務時所設定的引數 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); // group="a,b" or group="*" String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // group有多個值,這裡的cluster為MergeableCluster return doRefer(getMergeableCluster(), registry, type, url); } } // 這裡的cluster是cluster的Adaptive物件 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // RegistryDirectory表示動態服務目錄,會和註冊中心的資料保持同步 // type表示一個服務對應一個RegistryDirectory,url表示註冊中心地址 // 在消費端,最核心的就是RegistryDirectory RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY // 引入服務所設定的引數 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 消費者url URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); // 註冊簡化後的消費url registry.register(directory.getRegisteredConsumerUrl()); } // 構造路由鏈,路由鏈會在引入服務時按路由條件進行過濾 // 路由鏈是動態服務目錄中的一個屬性,通過路由鏈可以過濾某些服務提供者 directory.buildRouterChain(subscribeUrl); // 服務目錄需要訂閱的幾個路徑 // 當前所引入的服務的消費應用目錄:/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators // 當前所引入的服務的動態設定目錄:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators // 當前所引入的服務的提供者目錄:/dubbo/org.apache.dubbo.demo.DemoService/providers // 當前所引入的服務的老版本動態設定目錄:/dubbo/org.apache.dubbo.demo.DemoService/configurators // 當前所引入的服務的老版本路由器目錄:/dubbo/org.apache.dubbo.demo.DemoService/routers directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 利用傳進來的cluster,join得到invoker, Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
3)再是呼叫CLUSTER.join
//MockClusterWrapper的處理 public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } } //有註冊中心,RegistryAwareCluster的處理 public class RegistryAwareCluster implements Cluster { public final static String NAME = "registryaware"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new RegistryAwareClusterInvoker<T>(directory); } } //沒有註冊中心,FailoverCluster的處理 public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
【8】RegistryProtocol裡面的監聽過程
//directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //RegistryDirectory類#subscribe方法 public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 監聽consumer應用 serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 監聽所引入的服務的動態設定 registry.subscribe(url, this); //老版本的監聽 } //FailbackRegistry類#subscribe方法 @Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 核心方法 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (CollectionUtils.isNotEmpty(urls)) { notify(url, listener, urls); logger.error(...); } else { // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // Record a failed registration request to a failed list, retry regularly // 新增listener,向zk新增監聽器時如果報錯了,那麼會把這個listener新增到failedSubscribed中,並會定時重試(重新註冊listener) addFailedSubscribed(url, listener); } } // 父類別AbstractRegistry#subscribe方法 // 把listener新增到subscribed中,subscribed是一個map, 存的是URL:Set<NotifyListener> @Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); listeners.add(listener); } // 進行訂閱,先看父類別的subscribe方法 @Override public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { // 訂閱所有服務 String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { // 單獨訂閱某一個服務 List<URL> urls = new ArrayList<>(); // 得到真正要監聽的zk上的路徑, for (String path : toCategoriesPath(url)) { // 根據監聽地址去拿listeners,如果沒有則生成 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } // 一個NotifyListener對應一個ChildListener ChildListener zkListener = listeners.get(listener); if (zkListener == null) { // lambda表示式就是監聽邏輯, parentPath表示父path,currentChilds表示當前擁有的child, 會呼叫notify方法進行實際的處理 listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } // 建立zk上路徑 zkClient.create(path, false); // 新增真正跟zk相關的ChildListener,ChildListener中的邏輯就是監聽到zk上資料發生了變化後會觸發的邏輯 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 這裡的urls就是從現在所引入的服務的目錄下查到的url,比如下面這個三個目錄下的路徑 // "/dubbo/org.apache.dubbo.demo.DemoService/providers" // "/dubbo/org.apache.dubbo.demo.DemoService/configurators" // "/dubbo/org.apache.dubbo.demo.DemoService/routers" notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
【9】監聽器觸發時
//RegistryDirectory類#notify方法 public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); // 獲取動態設定URL,生成configurators List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); // 獲取老版本路由URL,生成Router,並新增到路由鏈中 List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // 獲取服務提供者URL List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs); } private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); } // 利用動態設定重寫服務目錄地址 private void overrideDirectoryUrl() { // merge override parameters this.overrideDirectoryUrl = directoryUrl; List<Configurator> localConfigurators = this.configurators; // local reference doOverrideUrl(localConfigurators); List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference doOverrideUrl(localAppDynamicConfigurators); if (serviceConfigurationListener != null) { List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference doOverrideUrl(localDynamicConfigurators); } } private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } // 這裡會先按Protocol進行過濾,並且呼叫DubboProtocol.refer方法得到DubboInvoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map /** * If the calculation is wrong, it is not processed. * * 1. The protocol configured by the client is inconsistent with the protocol of the server. * eg: consumer protocol = dubbo, provider only has other protocol services(rest). * 2. The registration center is not robust and pushes illegal specification data. * */ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(...); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. // 得到了所引入的服務Invoker之後,把它們設定到路由鏈中去,在呼叫時使用,並且會呼叫TagRouter的notify方法 routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); String queryProtocols = this.queryMap.get(PROTOCOL_KEY); // 遍歷當前服務所有的服務提供者URL for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); // 當前消費者如果手動設定了Protocol,那麼則進行匹配 for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } // 當前Protocol是否在應用中存在對應的擴充套件點 if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(...); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); // 如果當前服務提供者URL沒有生產過Invoker if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) { // 呼叫Protocol的refer方法得到一個Invoker invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error(...); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }