diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index dba04b593f2..e8584504178 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.client.trace.hook; import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.SendStatus; @@ -26,6 +29,9 @@ import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageType; public class SendMessageTraceHookImpl implements SendMessageHook { @@ -48,19 +54,35 @@ public void sendMessageBefore(SendMessageContext context) { } //build the context content of TraceContext TraceContext traceContext = new TraceContext(); - traceContext.setTraceBeans(new ArrayList<>(1)); + Message message = context.getMessage(); + List traceBeans; + if (message instanceof MessageBatch) { + MessageBatch messageBatch = (MessageBatch) message; + traceBeans = new ArrayList<>(messageBatch.getBatchCount()); + for (Message batch : messageBatch) { + traceBeans.add(buildBeforeTraceBean(batch, context.getMsgType(), context.getBrokerAddr())); + } + } else { + traceBeans = new ArrayList<>(1); + traceBeans.add(buildBeforeTraceBean(message, context.getMsgType(), context.getBrokerAddr())); + } + + traceContext.setTraceBeans(traceBeans); context.setMqTraceContext(traceContext); traceContext.setTraceType(TraceType.Pub); traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); + } + + public TraceBean buildBeforeTraceBean(Message message, MessageType msgType, String brokerAddr) { //build the data bean object of message trace TraceBean traceBean = new TraceBean(); - traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); - traceBean.setTags(context.getMessage().getTags()); - traceBean.setKeys(context.getMessage().getKeys()); - traceBean.setStoreHost(context.getBrokerAddr()); - traceBean.setBodyLength(context.getMessage().getBody().length); - traceBean.setMsgType(context.getMsgType()); - traceContext.getTraceBeans().add(traceBean); + traceBean.setTopic(NamespaceUtil.withoutNamespace(message.getTopic())); + traceBean.setTags(message.getTags()); + traceBean.setKeys(message.getKeys()); + traceBean.setStoreHost(brokerAddr); + traceBean.setBodyLength(message.getBody().length); + traceBean.setMsgType(msgType); + return traceBean; } @Override @@ -75,24 +97,38 @@ public void sendMessageAfter(SendMessageContext context) { } if (context.getSendResult().getRegionId() == null - || !context.getSendResult().isTraceOn()) { + || !context.getSendResult().isTraceOn() + || StringUtils.isEmpty(context.getSendResult().getMsgId()) + || StringUtils.isEmpty(context.getSendResult().getOffsetMsgId())) { // if switch is false,skip it return; } TraceContext traceContext = (TraceContext) context.getMqTraceContext(); - TraceBean traceBean = traceContext.getTraceBeans().get(0); - int costTime = (int) ((System.currentTimeMillis() - traceContext.getTimeStamp()) / traceContext.getTraceBeans().size()); - traceContext.setCostTime(costTime); - if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { - traceContext.setSuccess(true); - } else { - traceContext.setSuccess(false); + String[] uniqMsgIds = context.getSendResult().getMsgId().split(","); + String[] offsetMsgIds = context.getSendResult().getOffsetMsgId().split(","); + if (uniqMsgIds.length != traceContext.getTraceBeans().size() || offsetMsgIds.length != traceContext.getTraceBeans().size()) { + return; + } + int costTime = (int) (System.currentTimeMillis() - traceContext.getTimeStamp()); + for (int i = 0; i < traceContext.getTraceBeans().size(); i++) { + // build traceBean + TraceBean traceBean = traceContext.getTraceBeans().get(i); + traceBean.setMsgId(uniqMsgIds[i]); + traceBean.setOffsetMsgId(offsetMsgIds[i]); + traceBean.setStoreTime(traceContext.getTimeStamp() + costTime / 2); + + // build traceContext + TraceContext tmpContext = new TraceContext(); + tmpContext.setTraceType(traceContext.getTraceType()); + tmpContext.setRegionId(context.getSendResult().getRegionId()); + tmpContext.setGroupName(traceContext.getGroupName()); + tmpContext.setCostTime(costTime); + tmpContext.setSuccess(context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)); + tmpContext.setContextCode(traceContext.getContextCode()); + tmpContext.setTraceBeans(new ArrayList<>(1)); + tmpContext.getTraceBeans().add(traceBean); + localDispatcher.append(tmpContext); } - traceContext.setRegionId(context.getSendResult().getRegionId()); - traceBean.setMsgId(context.getSendResult().getMsgId()); - traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); - traceBean.setStoreTime(traceContext.getTimeStamp() + costTime / 2); - localDispatcher.append(traceContext); } } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImplTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImplTest.java new file mode 100644 index 00000000000..23b7a317fa8 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImplTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace.hook; + +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceType; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.RPCHook; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; + +public class SendMessageTraceHookImplTest { + + private static final String TOPIC = "TopicTest"; + private static final String TAGS = "tags"; + private static final String KEYS = "keys"; + private static final String BODY = "bodyLength"; + private static final String BROKER = "127.0.0.1:10911"; + private static final String GROUP = "producer"; + + private static final String MSG_ID = "msgId"; + private static final String OFFSET_MSG_ID = "offsetMsgId"; + private static final String REGION_ID = "testRegion"; + + private SendMessageTraceHookImpl sendMessageTraceHook; + private TestAsyncTraceDispatcher traceDispatcher; + + private static class TestAsyncTraceDispatcher extends AsyncTraceDispatcher { + + private BlockingDeque stashQueue = new LinkedBlockingDeque<>(1024); + + public TestAsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) { + super(group, type, traceTopicName, rpcHook); + } + + @Override + public boolean append(Object ctx) { + return stashQueue.offer((TraceContext) ctx); + } + + public BlockingDeque getStashQueue() { + return stashQueue; + } + } + + @Before + public void init() { + traceDispatcher = new TestAsyncTraceDispatcher(null, TraceDispatcher.Type.PRODUCE, TopicValidator.RMQ_SYS_TRACE_TOPIC, null); + sendMessageTraceHook = new SendMessageTraceHookImpl(traceDispatcher); + } + + @Test + public void testBuildBeforeTraceBean() { + TraceBean traceBean = sendMessageTraceHook.buildBeforeTraceBean(buildTestMessage(), MessageType.Normal_Msg, BROKER); + Assert.assertEquals(traceBean.getTopic(), TOPIC); + Assert.assertEquals(traceBean.getTags(), TAGS); + Assert.assertEquals(traceBean.getKeys(), KEYS); + Assert.assertEquals(traceBean.getBodyLength(), BODY.length()); + Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg); + Assert.assertEquals(traceBean.getStoreHost(), BROKER); + } + + @Test + public void testSendSingleMessageTrace() throws InterruptedException { + SendMessageContext context = new SendMessageContext(); + context.setMessage(buildTestMessage()); + context.setMsgType(MessageType.Normal_Msg); + context.setBrokerAddr(BROKER); + context.setProducerGroup(GROUP); + + sendMessageTraceHook.sendMessageBefore(context); + Thread.sleep(100); + context.setSendResult(buildTestResult()); + sendMessageTraceHook.sendMessageAfter(context); + + Assert.assertEquals(traceDispatcher.getStashQueue().size(), 1); + TraceContext traceContext = traceDispatcher.getStashQueue().poll(); + Assert.assertNotNull(traceContext); + Assert.assertNotNull(traceContext.getTraceBeans()); + Assert.assertEquals(traceContext.getTraceBeans().size(), 1); + Assert.assertEquals(traceContext.getTraceType(), TraceType.Pub); + Assert.assertEquals(traceContext.getRegionId(), REGION_ID); + Assert.assertEquals(traceContext.getGroupName(), GROUP); + Assert.assertTrue(traceContext.getCostTime() >= 100 && traceContext.getCostTime() < 200); + Assert.assertTrue(traceContext.isSuccess()); + Assert.assertEquals(traceContext.getTraceBeans().size(), 1); + + TraceBean traceBean = traceContext.getTraceBeans().get(0); + Assert.assertEquals(traceBean.getTopic(), TOPIC); + Assert.assertEquals(traceBean.getTags(), TAGS); + Assert.assertEquals(traceBean.getKeys(), KEYS); + Assert.assertEquals(traceBean.getBodyLength(), BODY.length()); + Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg); + Assert.assertEquals(traceBean.getStoreHost(), BROKER); + } + + @Test + public void testSendBatchMessageTrace() throws InterruptedException { + int batchSize = 10; + SendMessageContext context = new SendMessageContext(); + context.setMessage(buildTestBatchMessage(batchSize)); + context.setMsgType(MessageType.Normal_Msg); + context.setBrokerAddr(BROKER); + context.setProducerGroup(GROUP); + + sendMessageTraceHook.sendMessageBefore(context); + Thread.sleep(100); + context.setSendResult(buildTestBatchResult(batchSize)); + sendMessageTraceHook.sendMessageAfter(context); + + Assert.assertEquals(traceDispatcher.getStashQueue().size(), batchSize); + for (int i = 0; i < batchSize; i++) { + TraceContext traceContext = traceDispatcher.getStashQueue().poll(); + Assert.assertNotNull(traceContext); + Assert.assertNotNull(traceContext.getTraceBeans()); + Assert.assertEquals(traceContext.getTraceBeans().size(), 1); + Assert.assertEquals(traceContext.getTraceType(), TraceType.Pub); + Assert.assertEquals(traceContext.getRegionId(), REGION_ID); + Assert.assertEquals(traceContext.getGroupName(), GROUP); + Assert.assertTrue(traceContext.getCostTime() >= 100 && traceContext.getCostTime() < 200); + Assert.assertTrue(traceContext.isSuccess()); + Assert.assertEquals(traceContext.getTraceBeans().size(), 1); + + TraceBean traceBean = traceContext.getTraceBeans().get(0); + Assert.assertEquals(traceBean.getTopic(), TOPIC); + Assert.assertEquals(traceBean.getTags(), TAGS + i); + Assert.assertEquals(traceBean.getKeys(), KEYS + i); + Assert.assertEquals(traceBean.getBodyLength(), (BODY + i).length()); + Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg); + Assert.assertEquals(traceBean.getStoreHost(), BROKER); + } + } + + private Message buildTestMessage() { + Message message = new Message(); + message.setTopic(TOPIC); + message.setTags(TAGS); + message.setKeys(KEYS); + message.setBody(BODY.getBytes(StandardCharsets.UTF_8)); + return message; + } + + private SendResult buildTestResult() { + SendResult sendResult = new SendResult(); + sendResult.setMsgId(MSG_ID); + sendResult.setOffsetMsgId(OFFSET_MSG_ID); + sendResult.setSendStatus(SendStatus.SEND_OK); + sendResult.setRegionId(REGION_ID); + return sendResult; + } + + private Message buildTestBatchMessage(int batchSize) { + List messages = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + Message message = new Message(); + message.setTopic(TOPIC); + message.setTags(TAGS + i); + message.setKeys(KEYS + i); + message.setBody((BODY + i).getBytes(StandardCharsets.UTF_8)); + messages.add(message); + } + return MessageBatch.generateFromList(messages); + } + + private SendResult buildTestBatchResult(int batchSize) { + StringBuilder batchMsgId = new StringBuilder(); + StringBuilder batchOffsetMsgId = new StringBuilder(); + for (int i = 0; i < batchSize; i++) { + batchMsgId.append(MSG_ID); + batchOffsetMsgId.append(OFFSET_MSG_ID); + if (i < batchSize - 1) { + batchMsgId.append(","); + batchOffsetMsgId.append(","); + } + } + + SendResult sendResult = new SendResult(); + sendResult.setMsgId(batchMsgId.toString()); + sendResult.setOffsetMsgId(batchOffsetMsgId.toString()); + sendResult.setSendStatus(SendStatus.SEND_OK); + sendResult.setRegionId(REGION_ID); + return sendResult; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java index 30369b8f372..374e5914bce 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -70,4 +70,8 @@ public static MessageBatch generateFromList(Collection messag return messageBatch; } + public int getBatchCount() { + return this.messages.size(); + } + }