diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryMessage.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryMessage.java index 40ea43709aab..13ef7c3ac506 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryMessage.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryMessage.java @@ -16,22 +16,60 @@ package com.navercorp.pinpoint.profiler.sender; +import org.apache.commons.lang.StringUtils; + /** * @author emeroad */ public class RetryMessage { - private int retryCount; - private byte[] bytes; - public RetryMessage(int retryCount, byte[] bytes) { + private int retryCount = 0; + private final int maxRetryCount; + + private final byte[] bytes; + private final String messageDescription; + + public RetryMessage(int maxRetryCount, byte[] bytes) { + this(0, maxRetryCount, bytes, ""); + } + + public RetryMessage(int retryCount, int maxRetryCount, byte[] bytes) { + this(retryCount, maxRetryCount, bytes, ""); + } + + public RetryMessage(int maxRetryCount, byte[] bytes, String messageDescription) { + this(0, maxRetryCount, bytes, messageDescription); + } + + public RetryMessage(int retryCount, int maxRetryCount, byte[] bytes, String messageDescription) { + if (retryCount < 0) { + throw new IllegalArgumentException("retryCount:" + retryCount + " must be positive number"); + } + if (maxRetryCount < 0) { + throw new IllegalArgumentException("maxRetryCount:" + maxRetryCount + " must be positive number"); + } + if (retryCount > maxRetryCount) { + throw new IllegalArgumentException("maxRetryCount(" + maxRetryCount + ") must be greater than retryCount(" + retryCount + ")"); + } + this.retryCount = retryCount; + this.maxRetryCount = maxRetryCount; this.bytes = bytes; + this.messageDescription = messageDescription; } public int getRetryCount() { return retryCount; } + public int getMaxRetryCount() { + return maxRetryCount; + } + + public boolean isRetryAvailable() { + return retryCount < maxRetryCount; + } + public byte[] getBytes() { return bytes; } @@ -39,4 +77,19 @@ public byte[] getBytes() { public int fail() { return ++retryCount; } + + @Override + public String toString() { + StringBuffer toString = new StringBuffer(); + toString.append("RetryMessage{"); + if (!StringUtils.isEmpty(messageDescription)) { + toString.append("message:" + messageDescription + ", "); + } + toString.append("size=" + bytes.length + ", "); + toString.append("retry=" + retryCount + "/" + maxRetryCount); + toString.append("}"); + + return toString.toString(); + } + } diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryQueue.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryQueue.java index 30b203628850..8fbeaf7d5874 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryQueue.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryQueue.java @@ -31,15 +31,15 @@ public class RetryQueue { // But PriorityQueue of JDK has no size limit, so let's do it without priority for now. private final BlockingQueue queue; private final int capacity; - private final int maxRetry; + private final int maxRetryCount; private final int halfCapacity; - public RetryQueue(int capacity, int maxRetry) { + public RetryQueue(int capacity, int maxRetryCount) { this.queue = new LinkedBlockingQueue(); this.capacity = capacity; this.halfCapacity = capacity / 2; - this.maxRetry = maxRetry; + this.maxRetryCount = maxRetryCount; } public RetryQueue() { @@ -51,9 +51,13 @@ public void add(RetryMessage retryMessage) { throw new NullPointerException("retryMessage must not be null"); } - final int retryCount = retryMessage.getRetryCount(); - if (retryCount >= this.maxRetry) { - logger.warn("discard retry message. retryCount:{}", retryCount); + if (!retryMessage.isRetryAvailable()) { + logger.warn("discard retry message({}).", retryMessage); + return; + } + int retryCount = retryMessage.getRetryCount(); + if (retryCount >= this.maxRetryCount) { + logger.warn("discard retry message({}). queue-maxRetryCount:{}", retryMessage, maxRetryCount); return; } final int queueSize = queue.size(); diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/TcpDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/TcpDataSender.java index 3f565e639cb1..2e0e8479db0d 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/TcpDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/TcpDataSender.java @@ -17,6 +17,19 @@ package com.navercorp.pinpoint.profiler.sender; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.thrift.TBase; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.navercorp.pinpoint.rpc.Future; import com.navercorp.pinpoint.rpc.FutureListener; import com.navercorp.pinpoint.rpc.ResponseMessage; @@ -28,18 +41,6 @@ import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializerFactory; import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer; import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializerFactory; -import org.apache.thrift.TBase; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * @author emeroad @@ -142,7 +143,7 @@ protected void sendPacket(Object message) { if (copy == null) { return; } - + if (futureListener != null) { doRequest(copy, futureListener); } else { @@ -162,7 +163,8 @@ private void doSend(byte[] copy) { write.setListener(writeFailFutureListener); } - private void doRequest(final byte[] requestPacket, final int retryCount, final Object targetClass) { + // Separate doRequest method to avoid creating unnecessary objects. (Generally, sending message is successed when firt attempt.) + private void doRequest(final byte[] requestPacket, final int maxRetryCount, final Object targetClass) { FutureListener futureListener = (new FutureListener() { @Override public void onComplete(Future future) { @@ -175,18 +177,20 @@ public void onComplete(Future future) { if (result.isSuccess()) { logger.debug("result success"); } else { - logger.warn("request fail. clazz:{} Caused:{}", targetClass, result.getMessage()); - retryRequest(requestPacket, retryCount, targetClass.getClass().getSimpleName()); + logger.info("request fail. request:{} Caused:{}", targetClass, result.getMessage()); + RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName()); + retryRequest(retryMessage); } } else { - logger.warn("Invalid ResponseMessage. {}", response); + logger.warn("Invalid respose:{}", response); // This is not retransmission. need to log for debugging // it could be null // retryRequest(requestPacket); } } else { - logger.warn("request fail. clazz:{} Caused:{}", targetClass, future.getCause().getMessage(), future.getCause()); - retryRequest(requestPacket, retryCount, targetClass.getClass().getSimpleName()); + logger.info("request fail. request:{} Caused:{}", targetClass, future.getCause().getMessage(), future.getCause()); + RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName()); + retryRequest(retryMessage); } } }); @@ -194,8 +198,40 @@ public void onComplete(Future future) { doRequest(requestPacket, futureListener); } - private void retryRequest(byte[] requestPacket, int retryCount, final String className) { - RetryMessage retryMessage = new RetryMessage(retryCount, requestPacket); + // Separate doRequest method to avoid creating unnecessary objects. (Generally, sending message is successed when firt attempt.) + private void doRequest(final RetryMessage retryMessage) { + FutureListener futureListener = (new FutureListener() { + @Override + public void onComplete(Future future) { + if (future.isSuccess()) { + // Should cache? + HeaderTBaseDeserializer deserializer = HeaderTBaseDeserializerFactory.DEFAULT_FACTORY.createDeserializer(); + TBase response = deserialize(deserializer, future.getResult()); + if (response instanceof TResult) { + TResult result = (TResult) response; + if (result.isSuccess()) { + logger.debug("result success"); + } else { + logger.info("request fail. request:{}, Caused:{}", retryMessage, result.getMessage()); + retryRequest(retryMessage); + } + } else { + logger.warn("Invalid response:{}", response); + // This is not retransmission. need to log for debugging + // it could be null +// retryRequest(requestPacket); + } + } else { + logger.info("request fail. request:{}, caused:{}", retryMessage, future.getCause().getMessage(), future.getCause()); + retryRequest(retryMessage); + } + } + }); + + doRequest(retryMessage.getBytes(), futureListener); + } + + private void retryRequest(RetryMessage retryMessage) { retryQueue.add(retryMessage); if (fireTimeout()) { timer.newTimeout(new TimerTask() { @@ -209,7 +245,7 @@ public void run(Timeout timeout) throws Exception { return; } int fail = retryMessage.fail(); - doRequest(retryMessage.getBytes(), fail, className); + doRequest(retryMessage); } } }, 1000 * 10, TimeUnit.MILLISECONDS); diff --git a/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryMessageTest.java b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryMessageTest.java new file mode 100644 index 000000000000..ebe5e2d124fb --- /dev/null +++ b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryMessageTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.profiler.sender; + +import org.junit.Assert; +import org.junit.Test; + +/** + * @author Taejin Koo + */ +public class RetryMessageTest { + + @Test + public void availableTest1() throws Exception { + RetryMessage retryMessage = new RetryMessage(1, new byte[0]); + Assert.assertTrue(retryMessage.isRetryAvailable()); + + retryMessage.fail(); + Assert.assertFalse(retryMessage.isRetryAvailable()); + } + + @Test + public void availableTest2() throws Exception { + RetryMessage retryMessage = new RetryMessage(1, 2, new byte[0]); + Assert.assertTrue(retryMessage.isRetryAvailable()); + + retryMessage.fail(); + Assert.assertFalse(retryMessage.isRetryAvailable()); + } + + @Test + public void availableTest3() throws Exception { + RetryMessage retryMessage = new RetryMessage(2, 2, new byte[0]); + Assert.assertFalse(retryMessage.isRetryAvailable()); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgumentTest1() { + RetryMessage retryMessage = new RetryMessage(-1, new byte[0]); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgumentTest2() { + RetryMessage retryMessage = new RetryMessage(-1, 5, new byte[0]); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgumentTest3() { + RetryMessage retryMessage = new RetryMessage(10, 9, new byte[0]); + } + + +} diff --git a/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryQueueTest.java b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryQueueTest.java index 0a6ac48e7090..77b97cb6c503 100644 --- a/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryQueueTest.java +++ b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/RetryQueueTest.java @@ -17,25 +17,19 @@ package com.navercorp.pinpoint.profiler.sender; import org.junit.Assert; - import org.junit.Test; -import com.navercorp.pinpoint.profiler.sender.RetryMessage; -import com.navercorp.pinpoint.profiler.sender.RetryQueue; - /** * @author emeroad */ public class RetryQueueTest { @Test public void size() { - RetryQueue retryQueue = new RetryQueue(1, 1); - retryQueue.add(new RetryMessage(0, new byte[0])); - retryQueue.add(new RetryMessage(0, new byte[0])); + retryQueue.add(new RetryMessage(1, new byte[0])); + retryQueue.add(new RetryMessage(1, new byte[0])); Assert.assertEquals(1, retryQueue.size()); - } @Test @@ -48,7 +42,6 @@ public void size2() { @Test public void maxRetryTest() { - RetryQueue retryQueue = new RetryQueue(3, 2); RetryMessage retryMessage = new RetryMessage(0, new byte[0]); retryMessage.fail(); @@ -62,12 +55,25 @@ public void maxRetryTest() { } @Test - public void add() { + public void maxRetryTest2() { + RetryQueue retryQueue = new RetryQueue(3, 1); + RetryMessage retryMessage = new RetryMessage(5, new byte[0]); + retryMessage.fail(); + retryMessage.fail(); + + retryQueue.add(retryMessage); + retryQueue.add(retryMessage); + + Assert.assertEquals(retryQueue.size(), 0); + } + + @Test + public void add() { RetryQueue retryQueue = new RetryQueue(3, 2); - retryQueue.add(new RetryMessage(0, new byte[0])); + retryQueue.add(new RetryMessage(1, new byte[0])); // If we add a failed message and it makes the queue filled more than half, the queue must discard it. - RetryMessage retryMessage = new RetryMessage(0, new byte[0]); + RetryMessage retryMessage = new RetryMessage(1, new byte[0]); retryMessage.fail(); retryQueue.add(retryMessage); diff --git a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java index eec6ac3e743c..683c4d8dbe63 100644 --- a/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java +++ b/rpc/src/main/java/com/navercorp/pinpoint/rpc/client/PinpointClientFactory.java @@ -17,6 +17,35 @@ package com.navercorp.pinpoint.rpc.client; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineException; +import org.jboss.netty.channel.socket.nio.NioClientBossPool; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioWorkerPool; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.navercorp.pinpoint.common.util.PinpointThreadFactory; import com.navercorp.pinpoint.rpc.MessageListener; import com.navercorp.pinpoint.rpc.PinpointSocketException; @@ -28,24 +57,6 @@ import com.navercorp.pinpoint.rpc.util.AssertUtils; import com.navercorp.pinpoint.rpc.util.LoggerFactorySetup; import com.navercorp.pinpoint.rpc.util.TimerFactory; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.nio.NioClientBossPool; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioWorkerPool; -import org.jboss.netty.util.*; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * @author emeroad @@ -352,7 +363,7 @@ public void run(Timeout timeout) { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { Channel channel = future.getChannel(); - logger.warn("reconnect success {}, {}", socketAddress, channel); + logger.info("reconnect success {}, {}", socketAddress, channel); pinpointClient.reconnectSocketHandler(pinpointClientHandler); } else { if (!pinpointClient.isClosed()) {