Skip to content

Commit

Permalink
Callback mode supports default rejection policy adjustment (#1132)
Browse files Browse the repository at this point in the history
* Callback mode supports default rejection policy adjustment when the thread pool is full
  • Loading branch information
JervyShi authored Nov 26, 2021
1 parent de7a124 commit c687eeb
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 44 deletions.
2 changes: 1 addition & 1 deletion all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<netty.version>4.1.44.Final</netty.version>
<hessian.version>3.3.13</hessian.version>
<resteasy.version>3.6.3.Final</resteasy.version>
<bolt.version>1.5.9</bolt.version>
<bolt.version>1.5.10</bolt.version>
<tracer.version>3.0.8</tracer.version>
<lookout.version>1.4.1</lookout.version>
<bytebuddy.version>1.9.8</bytebuddy.version>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<!-- Test libs -->
<junit.version>4.13.1</junit.version>
<!-- alipay libs -->
<bolt.version>1.5.9</bolt.version>
<bolt.version>1.5.10</bolt.version>
<sofa.common.tools.version>1.3.2</sofa.common.tools.version>
<tracer.version>3.0.8</tracer.version>
<lookout.version>1.4.1</lookout.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ public class RpcOptions {
* @since 5.5.0
*/
public static final String CONCUMER_CONNECT_ELASTIC_SIZE = "consumer.connect.elastic.size";
/**
* 默认回调线程池满时的默认拒绝策略
*
* @since 5.8.1
*/
public static final String CONSUMER_REJECTED_EXECUTION_POLICY = "consumer.rejected.execution.policy";

/**
* 默认回调线程池最小
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.alipay.sofa.rpc.common.RpcConfigs.getBooleanValue;
import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue;
import static com.alipay.sofa.rpc.common.RpcConfigs.getStringValue;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REJECTED_EXECUTION_POLICY;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_HOLDER;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_WAIT;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_CHECK;
Expand Down Expand Up @@ -69,12 +70,12 @@ public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig
/**
* The constant serialVersionUID.
*/
private static final long serialVersionUID = 4244077707655448146L;
private static final long serialVersionUID = 4244077707655448146L;

/**
* 调用的协议
*/
protected String protocol = getStringValue(DEFAULT_PROTOCOL);
protected String protocol = getStringValue(DEFAULT_PROTOCOL);

/**
* 直连调用地址
Expand All @@ -89,77 +90,85 @@ public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig
/**
* 是否异步调用
*/
protected String invokeType = getStringValue(CONSUMER_INVOKE_TYPE);
protected String invokeType = getStringValue(CONSUMER_INVOKE_TYPE);

/**
* 连接超时时间
*/
protected int connectTimeout = getIntValue(CONSUMER_CONNECT_TIMEOUT);
protected int connectTimeout = getIntValue(CONSUMER_CONNECT_TIMEOUT);

/**
* 关闭超时时间(如果还有请求,会等待请求结束或者超时)
*/
protected int disconnectTimeout = getIntValue(CONSUMER_DISCONNECT_TIMEOUT);
protected int disconnectTimeout = getIntValue(CONSUMER_DISCONNECT_TIMEOUT);

/**
* 集群处理,默认是failover
*/
protected String cluster = getStringValue(CONSUMER_CLUSTER);
protected String cluster = getStringValue(CONSUMER_CLUSTER);

/**
* The ConnectionHolder 连接管理器
*/
protected String connectionHolder = getStringValue(CONSUMER_CONNECTION_HOLDER);
protected String connectionHolder = getStringValue(CONSUMER_CONNECTION_HOLDER);

/**
* 地址管理器
*/
protected String addressHolder = getStringValue(CONSUMER_ADDRESS_HOLDER);
protected String addressHolder = getStringValue(CONSUMER_ADDRESS_HOLDER);

/**
* 负载均衡
*/
protected String loadBalancer = getStringValue(CONSUMER_LOAD_BALANCER);
protected String loadBalancer = getStringValue(CONSUMER_LOAD_BALANCER);

/**
* 是否延迟建立长连接(第一次调用时新建,注意此参数可能和check冲突,开启check后lazy自动失效)
*
* @see ConsumerConfig#check
*/
protected boolean lazy = getBooleanValue(CONSUMER_LAZY);
protected boolean lazy = getBooleanValue(CONSUMER_LAZY);

/**
* 粘滞连接,一个断开才选下一个
* change transport when current is disconnected
*/
protected boolean sticky = getBooleanValue(CONSUMER_STICKY);
protected boolean sticky = getBooleanValue(CONSUMER_STICKY);

/**
* 是否jvm内部调用(provider和consumer配置在同一个jvm内,则走本地jvm内部,不走远程)
*/
protected boolean inJVM = getBooleanValue(CONSUMER_INJVM);
protected boolean inJVM = getBooleanValue(CONSUMER_INJVM);

/**
* 是否强依赖(即没有服务节点就启动失败,注意此参数可能和lazy冲突,开启check后lazy自动失效)
*
* @see ConsumerConfig#lazy
*/
protected boolean check = getBooleanValue(CONSUMER_CHECK);
protected boolean check = getBooleanValue(CONSUMER_CHECK);

/**
* 长连接个数,不是所有的框架都支持一个地址多个长连接
*/
protected int connectionNum = getIntValue(CONSUMER_CONNECTION_NUM);
protected int connectionNum = getIntValue(CONSUMER_CONNECTION_NUM);

/**
* Consumer给Provider发心跳的间隔
*/
protected int heartbeatPeriod = getIntValue(CONSUMER_HEARTBEAT_PERIOD);
protected int heartbeatPeriod = getIntValue(CONSUMER_HEARTBEAT_PERIOD);

/**
* Consumer给Provider重连的间隔
*/
protected int reconnectPeriod = getIntValue(CONSUMER_RECONNECT_PERIOD);
protected int reconnectPeriod = getIntValue(CONSUMER_RECONNECT_PERIOD);

/**
* 默认回调线程池满时的拒绝策略,可用值:
* DISCARD:默认丢弃
* CALLER_RUNS:IO 线程继续执行任务
* CALLER_HANDLE_EXCEPTION:IO 线程执行异常回调任务
*/
protected String rejectedExecutionPolicy = getStringValue(CONSUMER_REJECTED_EXECUTION_POLICY);

/**
* 路由配置别名
Expand Down Expand Up @@ -196,30 +205,30 @@ public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig
/**
* 等待地址获取时间(毫秒),-1表示等到拿到地址位置
*/
protected int addressWait = getIntValue(CONSUMER_ADDRESS_WAIT);
protected int addressWait = getIntValue(CONSUMER_ADDRESS_WAIT);

/**
* 同一个服务(接口协议uniqueId相同)的最大引用次数,防止由于代码bug导致重复引用,每次引用都会生成一个代理类对象,-1表示不检查
*
* @since 5.2.0
*/
protected int repeatedReferLimit = getIntValue(CONSUMER_REPEATED_REFERENCE_LIMIT);
protected int repeatedReferLimit = getIntValue(CONSUMER_REPEATED_REFERENCE_LIMIT);

/*-------- 下面是方法级可覆盖配置 --------*/
/**
* 客户端调用超时时间(毫秒)
*/
protected int timeout = -1;
protected int timeout = -1;

/**
* The Retries. 失败后重试次数
*/
protected int retries = getIntValue(CONSUMER_RETRIES);
protected int retries = getIntValue(CONSUMER_RETRIES);

/**
* 接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制
*/
protected int concurrents = getIntValue(CONSUMER_CONCURRENTS);
protected int concurrents = getIntValue(CONSUMER_CONCURRENTS);

/*---------- 参数配置项结束 ------------*/

Expand Down Expand Up @@ -633,6 +642,26 @@ public ConsumerConfig<T> setReconnectPeriod(int reconnectPeriod) {
return this;
}

/**
* Gets rejected execution policy.
*
* @return the rejected execution policy
*/
public String getRejectedExecutionPolicy() {
return rejectedExecutionPolicy;
}

/**
* Sets rejected execution policy.
*
* @param rejectedExecutionPolicy the rejected execution policy
* @return the rejected execution policy
*/
public ConsumerConfig<T> setRejectedExecutionPolicy(String rejectedExecutionPolicy) {
this.rejectedExecutionPolicy = rejectedExecutionPolicy;
return this;
}

/**
* Gets router.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ PS:大家也看到了,本JSON文档是支持注释的,而标准JSON是不支
"consumer.connect.elastic.size": 5,
// 是否允许通过RpcInvokeContext.getTargetUrl创建tcp连接,默认允许
"consumer.connect.create.when.absent": true,
// 默认回调线程池满时的拒绝策略,可用值:DISCARD, CALLER_RUNS, CALLER_HANDLE_EXCEPTION
"consumer.rejected.execution.policy": "DISCARD",
/*-------------Consumer相关配置结束-------------*/


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package com.alipay.sofa.rpc.message.bolt;

import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.RejectedExecutionPolicy;
import com.alipay.remoting.RejectionProcessableInvokeCallback;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ConsumerConfig;
Expand All @@ -31,27 +32,31 @@
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
* @since 5.4.0
*/
public abstract class AbstractInvokeCallback implements InvokeCallback {
public abstract class AbstractInvokeCallback implements RejectionProcessableInvokeCallback {
/**
* 服务消费者配置
*/
protected final ConsumerConfig consumerConfig;
protected final ConsumerConfig consumerConfig;
/**
* 服务提供者信息
*/
protected final ProviderInfo providerInfo;
protected final ProviderInfo providerInfo;
/**
* 请求
*/
protected final SofaRequest request;
protected final SofaRequest request;
/**
* 请求运行时的ClassLoader
*/
protected ClassLoader classLoader;
protected ClassLoader classLoader;
/**
* 线程上下文
*/
protected RpcInternalContext context;
protected RpcInternalContext context;
/**
* 线程繁忙时的拒绝策略
*/
protected RejectedExecutionPolicy rejectedExecutionPolicy;

protected AbstractInvokeCallback(ConsumerConfig consumerConfig, ProviderInfo providerInfo, SofaRequest request,
RpcInternalContext context, ClassLoader classLoader) {
Expand All @@ -60,6 +65,17 @@ protected AbstractInvokeCallback(ConsumerConfig consumerConfig, ProviderInfo pro
this.request = request;
this.context = context;
this.classLoader = classLoader;
this.setRejectedExecutionPolicy(consumerConfig);
}

private void setRejectedExecutionPolicy(ConsumerConfig consumerConfig) {
if (null == consumerConfig || consumerConfig.getRejectedExecutionPolicy() == null) {
this.rejectedExecutionPolicy = RejectedExecutionPolicy.DISCARD;
return;
}

String policy = consumerConfig.getRejectedExecutionPolicy();
this.rejectedExecutionPolicy = RejectedExecutionPolicy.valueOf(policy);
}

protected void recordClientElapseTime() {
Expand Down Expand Up @@ -93,4 +109,12 @@ protected void pickupBaggage(SofaResponse response) {

}
}

/**
* @see RejectionProcessableInvokeCallback#rejectedExecutionPolicy()
*/
@Override
public RejectedExecutionPolicy rejectedExecutionPolicy() {
return rejectedExecutionPolicy;
}
}
Loading

0 comments on commit c687eeb

Please # to comment.