Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix get wrong ProviderInfo when use reuses client transport. #102

Merged
merged 1 commit into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ protected void checkAlias(ProviderInfo providerInfo, SofaRequest message) {
* @throws SofaRpcException 请求RPC异常
*/
protected SofaResponse filterChain(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
RpcInternalContext.getContext().setProviderInfo(providerInfo);
RpcInternalContext context = RpcInternalContext.getContext();
context.setInterfaceConfig(consumerConfig);
context.setProviderInfo(providerInfo);
return filterChain.invoke(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.SystemInfo;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.event.ClientBeforeSendEvent;
import com.alipay.sofa.rpc.event.ClientSyncReceiveEvent;
import com.alipay.sofa.rpc.event.EventBus;
import com.alipay.sofa.rpc.message.ResponseFuture;

import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -149,17 +153,31 @@ public ResponseFuture asyncSend(SofaRequest message, int timeout) throws SofaRpc
@Override
public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcException {
RpcInternalContext context = RpcInternalContext.getContext();
SofaResponse response = null;
SofaRpcException throwable = null;
try {
beforeSend(context, request);
return doInvokeSync(request, timeout);
if (EventBus.isEnable(ClientBeforeSendEvent.class)) {
EventBus.post(new ClientBeforeSendEvent(request));
}
response = doInvokeSync(request, timeout);
return response;
} catch (InvocationTargetException e) {
throw convertToRpcException(e);
throwable = convertToRpcException(e);
throw throwable;
} catch (SofaRpcException e) {
throwable = e;
throw e;
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Fail to send message to remote", e);
throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
"Failed to send message to remote", e);
throw throwable;
} finally {
afterSend(context, request);
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent((ConsumerConfig) context.getInterfaceConfig(),
context.getProviderInfo(), request, response, throwable));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.annotation.Unstable;
import com.alipay.sofa.rpc.common.struct.StopWatch;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.message.ResponseFuture;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -148,40 +150,48 @@ protected RpcInternalContext() {
/**
* The Future.
*/
private ResponseFuture<?> future;
private ResponseFuture<?> future;

/**
* The Local address.
*/
private InetSocketAddress localAddress;
private InetSocketAddress localAddress;

/**
* The Remote address.
*/
private InetSocketAddress remoteAddress;
private InetSocketAddress remoteAddress;

/**
* 附带属性功能,遵循谁使用谁清理的原则。Key必须为 "_" 和 "."开头<br>
* 如果关闭了 {@link #ATTACHMENT_ENABLE} 功能,"_" 开头的Key将不被保持和传递。
*
* @see #ATTACHMENT_ENABLE
*/
private Map<String, Object> attachments = new HashMap<String, Object>();
private Map<String, Object> attachments = new HashMap<String, Object>();

/**
* The Stopwatch
*/
private StopWatch stopWatch = new StopWatch();
private StopWatch stopWatch = new StopWatch();

/**
* The Provider side.
*/
private Boolean providerSide;
private Boolean providerSide;

/**
* 要调用的服务端信息
*/
private ProviderInfo providerInfo;
private ProviderInfo providerInfo;

/**
* 发起调用的客户端信息
*
* @since 5.3.3
*/
@Unstable
private AbstractInterfaceConfig interfaceConfig;

/**
* Is provider side.
Expand Down Expand Up @@ -422,8 +432,8 @@ public StopWatch getStopWatch() {
* Clear context for next user
*/
public void clear() {
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(false)
.setProviderInfo(null);
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(null)
.setProviderInfo(null).setInterfaceConfig(null);
this.attachments = new HashMap<String, Object>();
this.stopWatch.reset();
}
Expand All @@ -448,6 +458,30 @@ public ProviderInfo getProviderInfo() {
return providerInfo;
}

/**
* Gets interface config.
*
* @return the config
* @since 5.3.3
*/
@Unstable
public AbstractInterfaceConfig getInterfaceConfig() {
return interfaceConfig;
}

/**
* Sets interface config.
*
* @param interfaceConfig the interface config
* @return the config
* @since 5.3.3
*/
@Unstable
public RpcInternalContext setInterfaceConfig(AbstractInterfaceConfig interfaceConfig) {
this.interfaceConfig = interfaceConfig;
return this;
}

@Override
public String toString() {
return "RpcInternalContext{" +
Expand All @@ -458,6 +492,7 @@ public String toString() {
", stopWatch=" + stopWatch +
", providerSide=" + providerSide +
", providerInfo=" + providerInfo +
", interfaceConfig=" + interfaceConfig +
'}';
}

Expand All @@ -473,6 +508,7 @@ public RpcInternalContext clone() {
context.stopWatch = this.stopWatch.clone();
context.providerSide = this.providerSide;
context.providerInfo = this.providerInfo;
context.interfaceConfig = this.interfaceConfig;
context.attachments.putAll(this.attachments);
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@
*/
package com.alipay.sofa.rpc.context;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.message.ResponseFuture;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
*
*
Expand Down Expand Up @@ -64,4 +73,63 @@ public void testPop() {
Assert.assertEquals(RpcInternalContext.getContext().getRemoteAddress().toString(), "127.0.0.1:12200");
}

@Test
public void testClear() {
RpcInternalContext context = RpcInternalContext.getContext();
context.setRemoteAddress("127.0.0.1", 1234);
context.setLocalAddress("127.0.0.1", 2345);
context.setFuture(new ResponseFuture<String>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

@Override
public String get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}

@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}

@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}
});

context.setProviderInfo(ProviderInfo.valueOf("127.0.0.1:80"));
context.setInterfaceConfig(new ProviderConfig());
context.setAttachment("_xxxx", "yyyy");

context.clear();
Assert.assertNull(context.getRemoteAddress());
Assert.assertNull(context.getLocalAddress());
Assert.assertNull(context.getFuture());
Assert.assertFalse(context.isProviderSide());
Assert.assertFalse(context.isConsumerSide());
Assert.assertNull(context.getProviderInfo());
Assert.assertNull(context.getInterfaceConfig());
Assert.assertTrue(context.getAttachments().isEmpty());

RpcInternalContext.removeAllContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public abstract class FaultBaseServiceTest extends FaultBaseTest {
public void beforeClass() throws Exception {
providerConfig.setRef(new HelloServiceTimeOutImpl());
providerConfig.export();
// test reuse client transport
consumerConfigNotUse.refer();
helloService = consumerConfig.refer();
}

@After
public void afterClass() {
providerConfig.unExport();
consumerConfigNotUse.unRefer();
consumerConfig.unRefer();
consumerConfig = null;
consumerConfig2 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class FaultBaseTest {
public static final String APP_NAME2 = "testAnotherApp";

protected ServerConfig serverConfig;
protected ConsumerConfig<FaultHelloService> consumerConfigNotUse;
protected ConsumerConfig<FaultHelloService> consumerConfig;
protected ConsumerConfig<FaultHelloService2> consumerConfig2;
protected ConsumerConfig<FaultHelloService> consumerConfigAnotherApp;
Expand All @@ -72,6 +73,15 @@ public void init() {
.setRegister(false)
.setApplication(providerAconfig);

// just for test
consumerConfigNotUse = new ConsumerConfig<FaultHelloService>()
.setInterfaceId(FaultHelloService.class.getName())
.setTimeout(500)
.setDirectUrl("127.0.0.1:12299")
.setRegister(false)
.setUniqueId("xxx")
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT);

ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setAppName(APP_NAME1);
consumerConfig = new ConsumerConfig<FaultHelloService>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest
}
if (invoker instanceof ProviderProxyInvoker) {
providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
context.setInterfaceConfig(providerConfig);
// 找到服务后,打印服务的appName
appName = providerConfig != null ? providerConfig.getAppName() : null;
}
Expand Down
Loading