Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
呈铭 committed Feb 23, 2024
1 parent 3502092 commit f3ce43c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.actuator.extension;

import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import com.alipay.sofa.rpc.boot.extension.ProviderConfigDelayRegisterChecker;
import org.springframework.beans.BeansException;
import org.springframework.boot.availability.ReadinessState;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class ProviderConfigDelayRegisterCheckerSupport implements ApplicationContextAware,
ProviderConfigDelayRegisterChecker {

private ApplicationContext applicationContext;

@Override
public boolean isDelayRegisterHealthCheck() {
ReadinessCheckListener readinessCheckListener = applicationContext
.getBean(ReadinessCheckListener.class);
return ReadinessState.ACCEPTING_TRAFFIC.equals(readinessCheckListener.getReadinessState());
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.boot.autoconfigure.rpc;

import com.alipay.sofa.boot.actuator.extension.ProviderConfigDelayRegisterCheckerSupport;
import com.alipay.sofa.boot.autoconfigure.condition.ConditionalOnSwitch;
import com.alipay.sofa.boot.autoconfigure.rpc.SofaRpcAutoConfiguration.RegistryConfigurationImportSelector;
import com.alipay.sofa.boot.autoconfigure.runtime.SofaRuntimeAutoConfiguration;
Expand Down Expand Up @@ -72,12 +73,21 @@ public class SofaRpcAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public ProviderConfigContainer providerConfigContainer(SofaBootRpcProperties sofaBootRpcProperties) {
public ProviderConfigDelayRegisterCheckerSupport providerConfigDelayRegisterCheckerSupport() {
return new ProviderConfigDelayRegisterCheckerSupport();
}

@Bean
@ConditionalOnMissingBean
public ProviderConfigContainer providerConfigContainer(SofaBootRpcProperties sofaBootRpcProperties,
ProviderConfigDelayRegisterCheckerSupport providerConfigDelayRegisterCheckerSupport) {
ProviderConfigContainer providerConfigContainer = new ProviderConfigContainer();
providerConfigContainer.setProviderRegisterWhiteList(sofaBootRpcProperties
.getProviderRegisterWhiteList());
providerConfigContainer.setProviderRegisterBlackList(sofaBootRpcProperties
.getProviderRegisterBlackList());
providerConfigContainer
.setProviderConfigDelayRegister(providerConfigDelayRegisterCheckerSupport);
return providerConfigContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.boot.container;

import com.alipay.sofa.rpc.boot.config.SofaBootRpcConfigConstants;
import com.alipay.sofa.rpc.boot.extension.ProviderConfigDelayRegisterChecker;
import com.alipay.sofa.rpc.boot.log.SofaBootRpcLoggerFactory;
import com.alipay.sofa.rpc.boot.runtime.binding.RpcBinding;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand All @@ -33,6 +34,9 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* ProviderConfig持有者.维护编程界面级别的RPC组件。
Expand All @@ -58,6 +62,20 @@ public class ProviderConfigContainer {
private final ConcurrentMap<String, ProviderConfig> RPC_SERVICE_CONTAINER = new ConcurrentHashMap<String, ProviderConfig>(
256);

/**
* 用来延时发布的线程池
*/
private ScheduledExecutorService scheduledExecutorService;

/**
* 延时发布时健康检查
*/
private ProviderConfigDelayRegisterChecker providerConfigDelayRegisterChecker;

public void setProviderConfigDelayRegister(ProviderConfigDelayRegisterChecker providerConfigDelayRegisterChecker) {
this.providerConfigDelayRegisterChecker = providerConfigDelayRegisterChecker;
}

/**
* 增加 ProviderConfig
*
Expand Down Expand Up @@ -127,32 +145,52 @@ public Collection<ProviderConfig> getAllProviderConfig() {
*/
public void publishAllProviderConfig() {
for (ProviderConfig providerConfig : getAllProviderConfig()) {

ServerConfig serverConfig = (ServerConfig) providerConfig.getServer().get(0);
if (!serverConfig.getProtocol().equalsIgnoreCase(
SofaBootRpcConfigConstants.RPC_PROTOCOL_DUBBO)) {
if (allowProviderRegister(providerConfig)) {
providerConfig.setRegister(true);
} else {
LOGGER.info("Provider will not register: [{}]", providerConfig.buildKey());
int delay = providerConfig.getDelay();
// 没有配置延时加载则直接去注册中心注册服务
if (delay <= 0) {
doPublishProviderConfig(providerConfig, false);
} else {
// 根据延时时间异步去注册中心注册服务
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newScheduledThreadPool(16);
}
scheduledExecutorService.schedule(() -> doPublishProviderConfig(providerConfig, true), delay, TimeUnit.MILLISECONDS);
}
}
}

List<RegistryConfig> registrys = providerConfig.getRegistry();
for (RegistryConfig registryConfig : registrys) {
private void doPublishProviderConfig(ProviderConfig providerConfig, boolean needHealthCheck) {
if (needHealthCheck && !providerConfigDelayRegisterChecker.isDelayRegisterHealthCheck()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("service publish failed, interfaceId["
+ providerConfig.getInterfaceId() + "], please check.");
}
return;
}

Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();
ServerConfig serverConfig = (ServerConfig) providerConfig.getServer().get(0);
if (!serverConfig.getProtocol().equalsIgnoreCase(
SofaBootRpcConfigConstants.RPC_PROTOCOL_DUBBO)) {
if (allowProviderRegister(providerConfig)) {
providerConfig.setRegister(true);
} else {
LOGGER.info("Provider will not register: [{}]", providerConfig.buildKey());
}

registry.register(providerConfig);
List<RegistryConfig> registrys = providerConfig.getRegistry();
for (RegistryConfig registryConfig : registrys) {

if (LOGGER.isInfoEnabled()) {
LOGGER.info("service published. interfaceId["
+ providerConfig.getInterfaceId() + "]; protocol["
+ serverConfig.getProtocol() + "]");
}
}
Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();

registry.register(providerConfig);

if (LOGGER.isInfoEnabled()) {
LOGGER.info("service published. interfaceId["
+ providerConfig.getInterfaceId() + "]; protocol["
+ serverConfig.getProtocol() + "]");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.boot.extension;

public interface ProviderConfigDelayRegisterChecker {

boolean isDelayRegisterHealthCheck();
}

0 comments on commit f3ce43c

Please # to comment.