Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix reconnect of bolt/rest/h2c. #180

Merged
merged 3 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void init(ServerConfig serverConfig) {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
// 服务端处理器
this.serverHandler = new HttpServerHandler(bizThreadPool);
this.serverHandler = new HttpServerHandler();

// set default transport config
this.serverTransportConfig.setContainer(container);
Expand Down Expand Up @@ -118,6 +118,9 @@ public void start() {
return;
}
try {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
this.serverHandler.setBizThreadPool(bizThreadPool);
serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
started = serverTransport.start();

Expand Down Expand Up @@ -156,6 +159,14 @@ public void stop() {
// 关闭端口,不关闭线程池
serverTransport.stop();
serverTransport = null;

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
bizThreadPool = null;
serverHandler.setBizThreadPool(null);
}

started = false;

if (EventBus.isEnable(ServerStoppedEvent.class)) {
Expand Down Expand Up @@ -216,12 +227,6 @@ public void destroy() {
}

stop();

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
}

serverHandler = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ public AtomicInteger getProcessingCount() {
return processingCount;
}

public HttpServerHandler(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
}

@Override
public void registerChannel(AbstractChannel nettyChannel) {

Expand Down Expand Up @@ -139,4 +135,9 @@ public boolean checkService(String serviceName, String methodName) {
public ThreadPoolExecutor getBizThreadPool() {
return bizThreadPool;
}

public HttpServerHandler setBizThreadPool(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,14 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {
* 服务端提供者信息
*/
protected final ProviderInfo providerInfo;
/**
* Start from 3 (because 1 is setting stream)
*/
private final static int START_STREAM_ID = 3;
/**
* StreamId, start from 3 (because 1 is setting stream)
*/
protected final AtomicInteger streamId = new AtomicInteger(3);
protected final AtomicInteger streamId = new AtomicInteger();
/**
* 正在发送的调用数量
*/
Expand All @@ -131,6 +135,9 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {

@Override
public void connect() {
if (isAvailable()) {
return;
}
EventLoopGroup workerGroup = NettyHelper.getClientIOEventLoopGroup();
Http2ClientInitializer initializer = new Http2ClientInitializer(transportConfig);
try {
Expand All @@ -152,6 +159,8 @@ public void connect() {
http2SettingsHandler.awaitSettings(transportConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);

responseChannelHandler = initializer.responseHandler();
// RESET streamId
streamId.set(START_STREAM_ID);
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_NETWORK, e);
}
Expand Down Expand Up @@ -295,8 +304,8 @@ protected void doSend(final SofaRequest request, AbstractHttpClientHandler callb
TIMEOUT_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry =
responseChannelHandler.removePromise(requestId);
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry = responseChannelHandler
.removePromise(requestId);
if (entry != null) {
ClientHandler handler = entry.getValue();
Exception e = timeoutException(request, timeoutMills, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class RestServer implements Server {
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
httpServer = buildServer();
}

private SofaNettyJaxrsServer buildServer() {
Expand Down Expand Up @@ -136,7 +137,6 @@ public void start() {
}
// 绑定到端口
try {
httpServer = buildServer();
httpServer.start();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Start the http rest server at port {}", serverConfig.getPort());
Expand Down Expand Up @@ -170,7 +170,6 @@ public void stop() {
LOGGER.info("Stop the http rest server at port {}", serverConfig.getPort());
}
httpServer.stop();
httpServer = null;
} catch (Exception e) {
LOGGER.error("Stop the http rest server at port " + serverConfig.getPort() + " error !", e);
}
Expand Down Expand Up @@ -222,6 +221,7 @@ public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNo
@Override
public void destroy() {
stop();
httpServer = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ServerConfig;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -64,7 +63,7 @@
public class SofaNettyJaxrsServer implements EmbeddedJaxrsServer {

private final ServerConfig serverConfig;
protected ServerBootstrap bootstrap = new ServerBootstrap();
protected ServerBootstrap bootstrap = null;
protected String hostname = null;
protected int port = 8080;
protected ResteasyDeployment deployment = new SofaResteasyDeployment(); // CHANGE: 使用sofa的类
Expand Down Expand Up @@ -224,7 +223,7 @@ public void start() {
serverConfig.isDaemon()));
}
// Configure the server.
bootstrap
bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(
(serverConfig != null && serverConfig.isEpoll()) ? EpollServerSocketChannel.class
Expand Down Expand Up @@ -293,5 +292,6 @@ public void stop() {
eventExecutor.shutdownGracefully().sync();
} catch (Exception ignore) { // NOPMD
}
bootstrap = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test;

import java.util.concurrent.Callable;

/**
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
*/
public class TestUtils {

public static <T> T delayGet(Callable<T> callable, T expect, int period, int times) {
T result = null;
int i = 0;
while (i++ < times) {
try {
Thread.sleep(period);
result = callable.call();
if (result != null && result.equals(expect)) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.client.bolt;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
import com.alipay.sofa.rpc.test.TestUtils;
import org.junit.Assert;

import java.util.concurrent.Callable;

/**
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
*/
public class BoltDirectUrlTest extends ActivelyDestroyTest {

// @Test
// FIXME 目前bolt的IO线程关闭时未释放,暂不支持本测试用例
public void testAll() {
// 只有2个线程 执行
ServerConfig serverConfig = new ServerConfig()
.setPort(12300)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setDaemon(true);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl())
.setBootstrap("bolt")
.setApplication(new ApplicationConfig().setAppName("serverApp"))
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();

final ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl("bolt://127.0.0.1:12300")
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setBootstrap("bolt")
.setApplication(new ApplicationConfig().setAppName("clientApp"))
.setReconnectPeriod(1000);

HelloService helloService = consumerConfig.refer();

Assert.assertNotNull(helloService.sayHello("xx", 22));

serverConfig.getServer().stop();

// 关闭后再调用一个抛异常
try {
helloService.sayHello("xx", 22);
} catch (Exception e) {
// 应该抛出异常
Assert.assertTrue(e instanceof SofaRpcException);
}

Assert.assertTrue(TestUtils.delayGet(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return CommonUtils.isEmpty(consumerConfig.getConsumerBootstrap()
.getCluster().getConnectionHolder().getAvailableConnections());
}
}, true, 50, 40));

serverConfig.getServer().start();
// 等待客户端重连服务端
Assert.assertTrue(TestUtils.delayGet(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return CommonUtils.isNotEmpty(consumerConfig.getConsumerBootstrap()
.getCluster().getConnectionHolder().getAvailableConnections());
}
}, true, 50, 60));

Assert.assertNotNull(helloService.sayHello("xx", 22));
}
}
Loading