Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

RpcClient#shutdown 极端情况可能会出现hang死情况 #65

Open
iSenninha opened this issue May 12, 2020 · 2 comments
Open

RpcClient#shutdown 极端情况可能会出现hang死情况 #65

iSenninha opened this issue May 12, 2020 · 2 comments

Comments

@iSenninha
Copy link

RpcClient建立连接过程中是同步等待在这里:

 at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:286)
        - locked <0x00000006506433d0> (a io.netty.channel.DefaultChannelPromise)
        at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:135)
        at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:28)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPoolObjectFactory.wrap(ChannelPoolObjectFactory.java:75)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPoolObjectFactory.wrap(ChannelPoolObjectFactory.java:35)
        at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
        at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:868)
        at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
        at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPool.getChannel(ChannelPool.java:61)
        at com.baidu.jprotobuf.pbrpc.transport.RpcChannel.getConnection(RpcChannel.java:64)
        at com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy.invoke(ProtobufRpcProxy.java:366)

如果上述连接未建立完成/连接失败,并且RpcClient过早暴露并被其他线程执行shutdown,上面的线程无法再被唤醒。

无法被唤醒的具体原因是依赖的Netty 4.0.27.Final shutdown 的时候没有处理未完成连接的channel等待的DefaultChannelPromise。最新版的Netty已经修复这个bug。

Bug复现:

环境信息

  • Jprotobuf-rpc-socket:4.1.0
  • 操作系统版本: Debian 4.19.98-1
  • JDK版本:Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode)
import com.baidu.jprotobuf.pbrpc.EchoService;
import com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy;
import com.baidu.jprotobuf.pbrpc.transport.RpcClient;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Coded by senninha on 2020/5/12
 */
public class TestRpcHang {
    @Test
    public void testRpcHang() throws InterruptedException {
        while (true) {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            final RpcClient rpcClient = new RpcClient();
            // 创建EchoService代理
            ProtobufRpcProxy<EchoService> pbrpcProxy = new ProtobufRpcProxy(rpcClient, EchoService.class);
            pbrpcProxy.setPort(1080);
            pbrpcProxy.setLookupStubOnStartup(true);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    rpcClient.shutdown();
                }
            });
            try {
                // 动态生成代理实例
                EchoService echoService = pbrpcProxy.proxy();
            } catch (Exception e) {
                // ignore
            }
            System.err.println("Not hang!");
            Thread.sleep(1000);
        }
    }
}

纯Netty版复现:

    @Test
    public void testNetty() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        while (true) {
            final Bootstrap bootstrap = new Bootstrap();
            final NioEventLoopGroup group = new NioEventLoopGroup(1);
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            ChannelFuture connect = bootstrap.connect("localhost", 1080);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    group.shutdownGracefully();
                }
            });
            try {
                connect.awaitUninterruptibly();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg; // (1)
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

@jhunters
Copy link
Collaborator

感谢反馈,我们review 一下这个情况

@jhunters
Copy link
Collaborator

确认, 需要升级netty最版本
Thread [main] (Suspended)
owns: ProtobufRpcProxy (id=83)
waiting for: DefaultChannelPromise (id=99)
Object.wait(long) line: not available [native method] [local variables unavailable]
DefaultChannelPromise(Object).wait() line: 502 [local variables unavailable]
DefaultChannelPromise(DefaultPromise).awaitUninterruptibly() line: 286
DefaultChannelPromise.awaitUninterruptibly() line: 135
DefaultChannelPromise.awaitUninterruptibly() line: 28
ChannelPoolObjectFactory.wrap(Connection) line: 88
ChannelPoolObjectFactory.wrap(Object) line: 1
ChannelPoolObjectFactory(BasePooledObjectFactory).makeObject() line: 60
GenericObjectPool.create() line: 861
GenericObjectPool.borrowObject(long) line: 435
GenericObjectPool.borrowObject() line: 363
ChannelPool.getChannel() line: 79
RpcChannel.testChannlConnect() line: 49
ProtobufRpcProxy.proxy() line: 326
TestRpcHang.testRpcHang() line: 30
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
FrameworkMethod$1.runReflectiveCall() line: 50
FrameworkMethod$1(ReflectiveCallable).run() line: 12
FrameworkMethod.invokeExplosively(Object, Object...) line: 47
InvokeMethod.evaluate() line: 17
BlockJUnit4ClassRunner(ParentRunner).runLeaf(Statement, Description, RunNotifier) line: 325
BlockJUnit4ClassRunner.runChild(FrameworkMethod, RunNotifier) line: 78
BlockJUnit4ClassRunner.runChild(Object, RunNotifier) line: 57
ParentRunner$3.run() line: 290
ParentRunner$1.schedule(Runnable) line: 71
BlockJUnit4ClassRunner(ParentRunner).runChildren(RunNotifier) line: 288
ParentRunner.access$000(ParentRunner, RunNotifier) line: 58
ParentRunner$2.evaluate() line: 268
BlockJUnit4ClassRunner(ParentRunner).run(RunNotifier) line: 363
JUnit4TestReference.run(TestExecution) line: 89
TestExecution.run(ITestReference[]) line: 41
RemoteTestRunner.runTests(String[], String, TestExecution) line: 541
RemoteTestRunner.runTests(TestExecution) line: 763
RemoteTestRunner.run() line: 463
RemoteTestRunner.main(String[]) line: 209

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants