Skip to content

Commit

Permalink
pinpoint-apm#1819 extract deserializer
Browse files Browse the repository at this point in the history
 - reduce array copy
  • Loading branch information
emeroad committed Jun 22, 2016
1 parent ef5c171 commit 00d208f
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ public void insert(final TSpan span) {

final SpanBo spanBo = new SpanBo(span);

long acceptedTime = acceptedTimeService.getAcceptedTime();
spanBo.setCollectorAcceptTime(acceptedTime);

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(span));
final Put put = new Put(rowKey);
final Put put = new Put(rowKey, acceptedTime);

this.spanSerializer.serialize(spanBo, put, null);
this.annotationSerializer.serialize(spanBo, put, null);
Expand Down Expand Up @@ -114,7 +117,8 @@ private void addNestedSpanEvent(Put put, TSpan span) {
@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
final Put put = new Put(rowKey);
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final Put put = new Put(rowKey, acceptedTime);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.navercorp.pinpoint.common.server.bo;

import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.util.AnnotationTranscoder;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* @author Woonduk Kang(emeroad)
*/
public class AnnotationBoDecoder {

private final AnnotationTranscoder transcoder = new AnnotationTranscoder();


public List<AnnotationBo> decode(Buffer buffer) {

final int size = buffer.readVInt();
if (size == 0) {
// don' fix return Collections.emptyList();
// exist outer add method
return new ArrayList<>();
}

List<AnnotationBo> annotationBoList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {

AnnotationBo annotation = decodeAnnotation(buffer);
annotationBoList.add(annotation);

}

return annotationBoList;
}


public AnnotationBo decodeAnnotation(Buffer buffer) {

final AnnotationBo annotation = new AnnotationBo();

annotation.setVersion(buffer.readByte());
annotation.setKey(buffer.readSVInt());

byte valueType = buffer.readByte();
annotation.setValueType(valueType);

byte[] byteValue = buffer.readPrefixedBytes();
annotation.setByteValue(byteValue);

Object decodeObject = transcoder.decode(valueType, byteValue);
annotation.setValue(decodeObject);

return annotation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/**
* @author emeroad
*/
@Deprecated
public class AnnotationBoList {

private List<AnnotationBo> annotationBoList;
Expand All @@ -40,7 +41,7 @@ public AnnotationBoList(int annotationBoListSize) {

public AnnotationBoList(List<AnnotationBo> annotationBoList) {
if (annotationBoList == null) {
this.annotationBoList = Collections.emptyList();
this.annotationBoList = new ArrayList<>();
return;
}
this.annotationBoList = annotationBoList;
Expand All @@ -64,6 +65,7 @@ public void writeValue(Buffer writer) {
}
}

@Deprecated
public void readValue(Buffer reader) {
int size = reader.readVInt();
if (size == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,11 @@ public short getApplicationServiceType() {
return this.serviceType;
}
}


/**
* @see com.navercorp.pinpoint.common.trace.LoggingInfo
* @return loggingInfo key
*/
public byte getLoggingTransactionInfo() {
return loggingTransactionInfo;
}
Expand Down Expand Up @@ -483,9 +487,6 @@ public int readValue(byte[] bytes, int offset, int length) {

this.version = buffer.readByte();

// this.mostTraceID = buffer.readLong();
// this.leastTraceID = buffer.readLong();

this.agentId = buffer.readPrefixedString();
this.agentStartTime = buffer.readVLong();

Expand Down Expand Up @@ -566,4 +567,4 @@ public String toString() {
sb.append('}');
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ public String getExceptionClass() {
return exceptionClass;
}

public void setExceptionInfo(int exceptionId, String exceptionMessage) {
this.hasException = true;
this.exceptionId = exceptionId;
this.exceptionMessage = exceptionMessage;
}


public void setExceptionClass(String exceptionClass) {
this.exceptionClass = exceptionClass;
}
Expand Down Expand Up @@ -461,6 +468,7 @@ public byte[] writeValue() {
return buffer.getBuffer();
}

@Deprecated
private void writeAnnotation(Buffer buffer) {
AnnotationBoList annotationBo = new AnnotationBoList(this.annotationBoList);
annotationBo.writeValue(buffer);
Expand All @@ -472,9 +480,6 @@ public int readValue(byte[] bytes, int offset, int length) {

this.version = buffer.readByte();

// this.mostTraceID = buffer.readLong();
// this.leastTraceID = buffer.readLong();

this.agentId = buffer.readPrefixedString();
this.applicationId = buffer.readPrefixedString();
this.agentStartTime = buffer.readVLong();
Expand Down Expand Up @@ -509,6 +514,7 @@ public int readValue(byte[] bytes, int offset, int length) {
return buffer.getOffset();
}

@Deprecated
private List<AnnotationBo> readAnnotation(Buffer buffer) {
AnnotationBoList annotationBoList = new AnnotationBoList();
annotationBoList.readValue(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

Expand All @@ -26,23 +27,24 @@ public void serialize(SpanBo spanBo, Put put, SerializationContext context) {

// TODO if we can identify whether the columnName is duplicated or not,
// we can also know whether the span id is duplicated or not.
final byte[] spanId = Bytes.toBytes(spanBo.getSpanId());
final ByteBuffer spanId = ByteBuffer.wrap(Bytes.toBytes(spanBo.getSpanId()));

final List<AnnotationBo> annotations = spanBo.getAnnotationBoList();
if (CollectionUtils.isNotEmpty(annotations)) {
byte[] bytes = writeAnnotationList(annotations);
put.addColumn(TRACES_CF_ANNOTATION, spanId, bytes);
ByteBuffer bytes = writeAnnotationList(annotations);
final long acceptedTime = put.getTimeStamp();
put.addColumn(TRACES_CF_ANNOTATION, spanId, acceptedTime, bytes);
}

}

private byte[] writeAnnotationList(List<AnnotationBo> annotationList) {
private ByteBuffer writeAnnotationList(List<AnnotationBo> annotationList) {
final Buffer buffer = new AutomaticBuffer(64);
return writeAnnotationList(annotationList, buffer);
}

// for test
public byte[] writeAnnotationList(List<AnnotationBo> annotationList, Buffer buffer) {
public ByteBuffer writeAnnotationList(List<AnnotationBo> annotationList, Buffer buffer) {

if (annotationList == null) {
annotationList = Collections.emptyList();
Expand All @@ -54,7 +56,7 @@ public byte[] writeAnnotationList(List<AnnotationBo> annotationList, Buffer buff
writeAnnotation(annotationBo, buffer);
}

return buffer.getBuffer();
return buffer.wrapByteBuffer();
}

// for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import com.navercorp.pinpoint.common.server.bo.AnnotationBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.util.BytesUtils;
import org.apache.hadoop.hbase.client.Put;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.util.List;

import static com.navercorp.pinpoint.common.hbase.HBaseTables.TRACES_CF_TERMINALSPAN;
Expand All @@ -22,31 +22,34 @@ public class SpanEventSerializer implements HbaseSerializer<SpanEventBo, Put> {

private AnnotationSerializer annotationSerializer;

private AcceptedTimeService acceptedTimeService;

@Autowired
public void setAnnotationSerializer(AnnotationSerializer annotationSerializer) {
this.annotationSerializer = annotationSerializer;
}

@Autowired
public void setAcceptedTimeService(AcceptedTimeService acceptedTimeService) {
this.acceptedTimeService = acceptedTimeService;
}

@Override
public void serialize(SpanEventBo spanEventBo, Put put, SerializationContext context) {

byte[] rowId = BytesUtils.add(spanEventBo.getSpanId(), spanEventBo.getSequence(), spanEventBo.getAsyncId(), spanEventBo.getAsyncSequence());
ByteBuffer rowId = writeQualifier(spanEventBo);

final byte[] value = writeValue(spanEventBo);
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final ByteBuffer value = writeValue(spanEventBo);

final long acceptedTime = put.getTimeStamp();

put.addColumn(TRACES_CF_TERMINALSPAN, rowId, acceptedTime, value);

}

public byte[] writeValue(SpanEventBo spanEventBo) {
private ByteBuffer writeQualifier(SpanEventBo spanEventBo) {
final Buffer rowId = new AutomaticBuffer();
rowId.putLong(spanEventBo.getSpanId());
rowId.putShort(spanEventBo.getSequence());
rowId.putInt(spanEventBo.getAsyncId());
rowId.putShort(spanEventBo.getAsyncSequence());
return rowId.wrapByteBuffer();
}

public ByteBuffer writeValue(SpanEventBo spanEventBo) {
final Buffer buffer = new AutomaticBuffer(512);

buffer.putByte(spanEventBo.getVersion());
Expand Down Expand Up @@ -82,7 +85,7 @@ public byte[] writeValue(SpanEventBo spanEventBo) {

buffer.putSVInt(spanEventBo.getNextAsyncId());

return buffer.getBuffer();
return buffer.wrapByteBuffer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,35 @@
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;

import static com.navercorp.pinpoint.common.hbase.HBaseTables.TRACES_CF_SPAN;
/**
* @author Woonduk Kang(emeroad)
*/
@Component
public class SpanSerializer implements HbaseSerializer<SpanBo, Put> {

private AcceptedTimeService acceptedTimeService;

@Autowired
public void setAcceptedTimeService(AcceptedTimeService acceptedTimeService) {
this.acceptedTimeService = acceptedTimeService;
}

@Override
public void serialize(SpanBo spanBo, Put put, SerializationContext context) {

byte[] columnValue = writeColumnValue(spanBo);
ByteBuffer columnValue = writeColumnValue(spanBo);

// TODO if we can identify whether the columnName is duplicated or not,
// we can also know whether the span id is duplicated or not.
byte[] spanId = Bytes.toBytes(spanBo.getSpanId());
ByteBuffer spanId = ByteBuffer.wrap(Bytes.toBytes(spanBo.getSpanId()));

long acceptedTime = acceptedTimeService.getAcceptedTime();
long acceptedTime = put.getTimeStamp();
put.addColumn(TRACES_CF_SPAN, spanId, acceptedTime, columnValue);

}

// Variable encoding has been added in case of write io operation. The data size can be reduced by about 10%.
public byte[] writeColumnValue(SpanBo span) {
public ByteBuffer writeColumnValue(SpanBo span) {
/*
It is difficult to calculate the size of buffer. It's not impossible.
However just use automatic incremental buffer for convenience's sake.
Expand Down Expand Up @@ -92,6 +85,6 @@ public byte[] writeColumnValue(SpanBo span) {
buffer.putByte(span.getLoggingTransactionInfo());
buffer.putPrefixedString(span.getAcceptorHost());

return buffer.getBuffer();
return buffer.wrapByteBuffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,10 @@ public class AnnotationBoTest {

private AnnotationSerializer serializer = new AnnotationSerializer();

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Test
public void testGetVersion() throws Exception {

}
private AnnotationBoDecoder annotationBoDecoder = new AnnotationBoDecoder();

@Test
public void testSetVersion() throws Exception {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

}

@Test
public void testWriteValue() throws Exception {
Expand All @@ -75,6 +68,19 @@ public void testWriteValue() throws Exception {
Assert.assertEquals(annotation.getKey(), bo2.getKey());
Assert.assertEquals(annotation.getValueType(), bo2.getValueType());
Assert.assertArrayEquals(annotation.getByteValue(), bo2.getByteValue());

buffer.setOffset(0);
AnnotationBo decodedAnnotation = annotationBoDecoder.decodeAnnotation(buffer);
Assert.assertEquals(annotation.getKey(), decodedAnnotation.getKey());
Assert.assertEquals(annotation.getValueType(), decodedAnnotation.getValueType());
Assert.assertArrayEquals(annotation.getByteValue(), decodedAnnotation.getByteValue());

int i = 256<<1;
System.out.println(i);
i = i<<2;
System.out.println(i);
i = i<<2;
System.out.println(i);
}


Expand Down
Loading

0 comments on commit 00d208f

Please # to comment.