Skip to content

Commit

Permalink
Fix rest & h2c too.
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy committed Jun 14, 2018
1 parent 3f57d88 commit 69ff3d7
Show file tree
Hide file tree
Showing 9 changed files with 440 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package com.alipay.sofa.rpc.server.http;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.ClassTypeUtils;
Expand All @@ -35,11 +40,6 @@
import com.alipay.sofa.rpc.transport.ServerTransportConfig;
import com.alipay.sofa.rpc.transport.ServerTransportFactory;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

/**
* HttpServer for HTTP/1.1 and HTTP/2
*
Expand Down Expand Up @@ -90,7 +90,7 @@ public void init(ServerConfig serverConfig) {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
// 服务端处理器
this.serverHandler = new HttpServerHandler(bizThreadPool);
this.serverHandler = new HttpServerHandler();

// set default transport config
this.serverTransportConfig.setContainer(container);
Expand Down Expand Up @@ -118,6 +118,9 @@ public void start() {
return;
}
try {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
this.serverHandler.setBizThreadPool(bizThreadPool);
serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
started = serverTransport.start();

Expand Down Expand Up @@ -156,6 +159,14 @@ public void stop() {
// 关闭端口,不关闭线程池
serverTransport.stop();
serverTransport = null;

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
bizThreadPool = null;
serverHandler.setBizThreadPool(null);
}

started = false;

if (EventBus.isEnable(ServerStoppedEvent.class)) {
Expand Down Expand Up @@ -216,12 +227,6 @@ public void destroy() {
}

stop();

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
}

serverHandler = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@
*/
package com.alipay.sofa.rpc.server.http;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.invoke.Invoker;
import com.alipay.sofa.rpc.server.ProviderProxyInvoker;
import com.alipay.sofa.rpc.server.ServerHandler;
import com.alipay.sofa.rpc.transport.AbstractChannel;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
* @since 5.4.0
Expand Down Expand Up @@ -61,10 +62,6 @@ public AtomicInteger getProcessingCount() {
return processingCount;
}

public HttpServerHandler(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
}

@Override
public void registerChannel(AbstractChannel nettyChannel) {

Expand Down Expand Up @@ -139,4 +136,9 @@ public boolean checkService(String serviceName, String methodName) {
public ThreadPoolExecutor getBizThreadPool() {
return bizThreadPool;
}

public HttpServerHandler setBizThreadPool(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@
*/
package com.alipay.sofa.rpc.transport.http;

import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
Expand Down Expand Up @@ -46,6 +57,7 @@
import com.alipay.sofa.rpc.transport.ClientTransportConfig;
import com.alipay.sofa.rpc.transport.netty.NettyChannel;
import com.alipay.sofa.rpc.transport.netty.NettyHelper;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -66,17 +78,6 @@
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
* h2和h2c通用的客户端传输层
*
Expand Down Expand Up @@ -104,10 +105,14 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {
* 服务端提供者信息
*/
protected final ProviderInfo providerInfo;
/**
* Start from 3 (because 1 is setting stream)
*/
private final static int START_STREAM_ID = 3;
/**
* StreamId, start from 3 (because 1 is setting stream)
*/
protected final AtomicInteger streamId = new AtomicInteger(3);
protected final AtomicInteger streamId = new AtomicInteger();
/**
* 正在发送的调用数量
*/
Expand All @@ -131,6 +136,9 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {

@Override
public void connect() {
if (isAvailable()) {
return;
}
EventLoopGroup workerGroup = NettyHelper.getClientIOEventLoopGroup();
Http2ClientInitializer initializer = new Http2ClientInitializer(transportConfig);
try {
Expand All @@ -152,6 +160,8 @@ public void connect() {
http2SettingsHandler.awaitSettings(transportConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);

responseChannelHandler = initializer.responseHandler();
// RESET streamId
streamId.set(START_STREAM_ID);
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_NETWORK, e);
}
Expand Down Expand Up @@ -295,8 +305,8 @@ protected void doSend(final SofaRequest request, AbstractHttpClientHandler callb
TIMEOUT_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry =
responseChannelHandler.removePromise(requestId);
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry = responseChannelHandler
.removePromise(requestId);
if (entry != null) {
ClientHandler handler = entry.getValue();
Exception e = timeoutException(request, timeoutMills, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package com.alipay.sofa.rpc.server.rest;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.resteasy.spi.PropertyInjector;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.jboss.resteasy.spi.ResteasyProviderFactory;

import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.JAXRSProviderManager;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand All @@ -26,12 +33,6 @@
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.server.Server;
import org.jboss.resteasy.spi.PropertyInjector;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.jboss.resteasy.spi.ResteasyProviderFactory;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Rest server base on resteasy.
Expand Down Expand Up @@ -69,6 +70,7 @@ public class RestServer implements Server {
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
httpServer = buildServer();
}

private SofaNettyJaxrsServer buildServer() {
Expand Down Expand Up @@ -136,7 +138,6 @@ public void start() {
}
// 绑定到端口
try {
httpServer = buildServer();
httpServer.start();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Start the http rest server at port {}", serverConfig.getPort());
Expand Down Expand Up @@ -170,7 +171,6 @@ public void stop() {
LOGGER.info("Stop the http rest server at port {}", serverConfig.getPort());
}
httpServer.stop();
httpServer = null;
} catch (Exception e) {
LOGGER.error("Stop the http rest server at port " + serverConfig.getPort() + " error !", e);
}
Expand Down Expand Up @@ -222,6 +222,7 @@ public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNo
@Override
public void destroy() {
stop();
httpServer = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
*/
package com.alipay.sofa.rpc.server.rest;

import static org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder.Protocol.HTTP;
import static org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder.Protocol.HTTPS;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import org.jboss.resteasy.core.SynchronousDispatcher;
import org.jboss.resteasy.plugins.server.embedded.EmbeddedJaxrsServer;
import org.jboss.resteasy.plugins.server.embedded.SecurityDomain;
import org.jboss.resteasy.plugins.server.netty.RequestDispatcher;
import org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder;
import org.jboss.resteasy.plugins.server.netty.RestEasyHttpResponseEncoder;
import org.jboss.resteasy.spi.ResteasyDeployment;

import com.alipay.sofa.rpc.common.SystemInfo;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
Expand All @@ -37,23 +56,6 @@
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.EventExecutor;
import org.jboss.resteasy.core.SynchronousDispatcher;
import org.jboss.resteasy.plugins.server.embedded.EmbeddedJaxrsServer;
import org.jboss.resteasy.plugins.server.embedded.SecurityDomain;
import org.jboss.resteasy.plugins.server.netty.RequestDispatcher;
import org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder;
import org.jboss.resteasy.plugins.server.netty.RestEasyHttpResponseEncoder;
import org.jboss.resteasy.spi.ResteasyDeployment;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder.Protocol.HTTP;
import static org.jboss.resteasy.plugins.server.netty.RestEasyHttpRequestDecoder.Protocol.HTTPS;

/**
* 参考NettyJaxrsServer的实现,增加了自定义功能,区别搜索 CHANGE<br>
Expand All @@ -64,7 +66,7 @@
public class SofaNettyJaxrsServer implements EmbeddedJaxrsServer {

private final ServerConfig serverConfig;
protected ServerBootstrap bootstrap = new ServerBootstrap();
protected ServerBootstrap bootstrap = null;
protected String hostname = null;
protected int port = 8080;
protected ResteasyDeployment deployment = new SofaResteasyDeployment(); // CHANGE: 使用sofa的类
Expand Down Expand Up @@ -224,7 +226,7 @@ public void start() {
serverConfig.isDaemon()));
}
// Configure the server.
bootstrap
bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(
(serverConfig != null && serverConfig.isEpoll()) ? EpollServerSocketChannel.class
Expand Down Expand Up @@ -293,5 +295,6 @@ public void stop() {
eventExecutor.shutdownGracefully().sync();
} catch (Exception ignore) { // NOPMD
}
bootstrap = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test;

import java.util.concurrent.Callable;

/**
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
*/
public class TestUtils {

public static <T> T delayGet(Callable<T> callable, T expect, int period, int times) {
T result = null;
int i = 0;
while (i++ < times) {
try {
Thread.sleep(period);
result = callable.call();
if (result != null && result.equals(expect)) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
}
Loading

0 comments on commit 69ff3d7

Please # to comment.