Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

#1819 trace format v2 #1938

Merged
merged 1 commit into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.navercorp.pinpoint.collector.dao;

import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;

/**
* @author Woonduk Kang(emeroad)
*/
public interface TraceDao {
void insert(TSpan span);
void insert(SpanBo span);

void insertSpanChunk(TSpanChunk spanChunk);
void insertSpanChunk(SpanChunkBo spanChunk);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +29,7 @@ public DualWriteHbaseTraceDao(TraceDao master, TraceDao slave) {
}

@Override
public void insert(TSpan span) {
public void insert(SpanBo span) {
Throwable masterException = null;
try {
master.insert(span);
Expand All @@ -45,15 +45,15 @@ public void insert(TSpan span) {
}

@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
Throwable masterException = null;
try {
master.insertSpanChunk(spanChunk);
master.insertSpanChunk(spanChunkBo);
} catch (Throwable e) {
masterException = e;
}
try {
slave.insertSpanChunk(spanChunk);
slave.insertSpanChunk(spanChunkBo);
} catch (Throwable e) {
logger.warn("slave insertSpanChunk(TSpanChunk) Error:{}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
import com.navercorp.pinpoint.common.buffer.Buffer;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TracesDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;

import com.navercorp.pinpoint.common.server.bo.BasicSpan;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import com.navercorp.pinpoint.common.server.bo.filter.SpanEventFilter;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.AnnotationSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanEventEncodingContext;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanEventSerializer;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v1.SpanSerializer;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import static com.navercorp.pinpoint.common.hbase.HBaseTables.*;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.TSpanEvent;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -72,24 +73,22 @@ public class HbaseTraceDao implements TracesDao {
private AbstractRowKeyDistributor rowKeyDistributor;

@Override
public void insert(final TSpan span) {
if (span == null) {
public void insert(final SpanBo spanBo) {
if (spanBo == null) {
throw new NullPointerException("span must not be null");
}

final SpanBo spanBo = new SpanBo(span);

long acceptedTime = acceptedTimeService.getAcceptedTime();
spanBo.setCollectorAcceptTime(acceptedTime);
long acceptedTime = spanBo.getCollectorAcceptTime();

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(span));
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanBo));
final Put put = new Put(rowKey, acceptedTime);

this.spanSerializer.serialize(spanBo, put, null);
this.annotationSerializer.serialize(spanBo, put, null);


addNestedSpanEvent(put, span);
addNestedSpanEvent(put, spanBo);

boolean success = hbaseTemplate.asyncPut(TRACES, put);
if (!success) {
Expand All @@ -101,34 +100,32 @@ private byte[] getDistributeRowKey(byte[] transactionId) {
return rowKeyDistributor.getDistributedKey(transactionId);
}

private void addNestedSpanEvent(Put put, TSpan span) {
final List<TSpanEvent> spanEventBoList = span.getSpanEventList();
private void addNestedSpanEvent(Put put, SpanBo span) {
final List<SpanEventBo> spanEventBoList = span.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}


for (TSpanEvent spanEvent : spanEventBoList) {
final SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
addColumn(put, spanEventBo);
for (SpanEventBo spanEvent : spanEventBoList) {
addColumn(put, span, spanEvent);
}
}

@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
public void insertSpanChunk(SpanChunkBo spanChunkBo) {
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunkBo));
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final Put put = new Put(rowKey, acceptedTime);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}


for (TSpanEvent spanEvent : spanEventBoList) {
final SpanEventBo spanEventBo = new SpanEventBo(spanChunk, spanEvent);
addColumn(put, spanEventBo);
for (SpanEventBo spanEventBo : spanEventBoList) {
addColumn(put, spanChunkBo, spanEventBo);
}

if (!put.isEmpty()) {
Expand All @@ -139,11 +136,12 @@ public void insertSpanChunk(TSpanChunk spanChunk) {
}
}

private void addColumn(Put put, SpanEventBo spanEventBo) {
private void addColumn(Put put, BasicSpan basicSpan, SpanEventBo spanEventBo) {
if (!spanEventFilter.filter(spanEventBo)) {
return;
}
this.spanEventSerializer.serialize(spanEventBo, put, null);
SpanEventEncodingContext spanEventEncodingContext = new SpanEventEncodingContext(basicSpan.getSpanId(), spanEventBo);
this.spanEventSerializer.serialize(spanEventEncodingContext, put, null);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package com.navercorp.pinpoint.collector.dao.hbase;

import com.navercorp.pinpoint.collector.dao.TraceDao;
import com.navercorp.pinpoint.collector.dao.hbase.filter.SpanEventFilter;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.SpanChunkSerializerV2;
import com.navercorp.pinpoint.common.server.bo.serializer.trace.v2.SpanSerializerV2;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.util.SpanUtils;
import com.navercorp.pinpoint.common.util.TransactionId;
import com.navercorp.pinpoint.common.util.TransactionIdUtils;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.TSpanEvent;
import com.navercorp.pinpoint.common.server.util.SpanUtils;
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
Expand All @@ -24,8 +17,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static com.navercorp.pinpoint.common.hbase.HBaseTables.TRACE_V2;
Expand All @@ -41,11 +32,6 @@ public class HbaseTraceDaoV2 implements TraceDao {
@Autowired
private HbaseOperations2 hbaseTemplate;

@Autowired
private AcceptedTimeService acceptedTimeService;

@Autowired
private SpanEventFilter spanEventFilter;

@Autowired
private SpanSerializerV2 spanSerializer;
Expand All @@ -59,19 +45,15 @@ public class HbaseTraceDaoV2 implements TraceDao {


@Override
public void insert(final TSpan span) {
if (span == null) {
throw new NullPointerException("span must not be null");
public void insert(final SpanBo spanBo) {
if (spanBo == null) {
throw new NullPointerException("spanBo must not be null");
}

final SpanBo spanBo = new SpanBo(span);
List<SpanEventBo> spanEventBoList = buildSpanEventList(span);
spanBo.addSpanEventBoList(spanEventBoList);

long acceptedTime = acceptedTimeService.getAcceptedTime();
spanBo.setCollectorAcceptTime(acceptedTime);
long acceptedTime = spanBo.getCollectorAcceptTime();

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(span));
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanBo));
final Put put = new Put(rowKey, acceptedTime);

this.spanSerializer.serialize(spanBo, put, null);
Expand All @@ -84,41 +66,7 @@ public void insert(final TSpan span) {

}

private List<SpanEventBo> buildSpanEventList(TSpan span) {
final List<TSpanEvent> spanEventList = span.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventList)) {
return Collections.emptyList();
}

List<SpanEventBo> spanEventBoList = new ArrayList<>(spanEventList.size());
for (TSpanEvent spanEvent : spanEventList) {
final SpanEventBo spanEventBo = new SpanEventBo(span, spanEvent);
if (!spanEventFilter.filter(spanEventBo)) {
continue;
}
spanEventBoList.add(spanEventBo);
}


return spanEventBoList;
}

private List<SpanEventBo> buildSpanEventBoList(TSpanChunk tSpanChunk) {
List<TSpanEvent> spanEventList = tSpanChunk.getSpanEventList();
if (CollectionUtils.isEmpty(spanEventList)) {
return new ArrayList<>();
}
List<SpanEventBo> spanEventBoList = new ArrayList<>(spanEventList.size());
for (TSpanEvent tSpanEvent : spanEventList) {
SpanEventBo spanEventBo = new SpanEventBo(tSpanChunk, tSpanEvent);
if (!spanEventFilter.filter(spanEventBo)) {
continue;
}
spanEventBoList.add(spanEventBo);
}

return spanEventBoList;
}

private byte[] getDistributeRowKey(byte[] transactionId) {
byte[] distributedKey = rowKeyDistributor.getDistributedKey(transactionId);
Expand All @@ -128,14 +76,14 @@ private byte[] getDistributeRowKey(byte[] transactionId) {


@Override
public void insertSpanChunk(TSpanChunk spanChunk) {
SpanChunkBo spanChunkBo = buildSpanChunkBo(spanChunk);
public void insertSpanChunk(SpanChunkBo spanChunkBo) {

final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunk));
final long acceptedTime = acceptedTimeService.getAcceptedTime();
final byte[] rowKey = getDistributeRowKey(SpanUtils.getTransactionId(spanChunkBo));

final long acceptedTime = spanChunkBo.getCollectorAcceptTime();
final Put put = new Put(rowKey, acceptedTime);

final List<TSpanEvent> spanEventBoList = spanChunk.getSpanEventList();
final List<SpanEventBo> spanEventBoList = spanChunkBo.getSpanEventBoList();
if (CollectionUtils.isEmpty(spanEventBoList)) {
return;
}
Expand All @@ -150,28 +98,6 @@ public void insertSpanChunk(TSpanChunk spanChunk) {
}
}

public SpanChunkBo buildSpanChunkBo(TSpanChunk tSpanChunk) {
SpanChunkBo spanChunkBo = new SpanChunkBo();
spanChunkBo.setAgentId(tSpanChunk.getAgentId());
spanChunkBo.setApplicationId(tSpanChunk.getApplicationName());
spanChunkBo.setAgentStartTime(tSpanChunk.getAgentStartTime());

final TransactionId transactionId = TransactionIdUtils.parseTransactionId(tSpanChunk.getTransactionId());
final String traceAgentId = transactionId.getAgentId();
if (traceAgentId == null) {
spanChunkBo.setTraceAgentId(spanChunkBo.getAgentId());
} else {
spanChunkBo.setTraceAgentId(traceAgentId);
}
spanChunkBo.setTraceAgentStartTime(transactionId.getAgentStartTime());
spanChunkBo.setTraceTransactionSequence(transactionId.getTransactionSequence());

spanChunkBo.setSpanId(tSpanChunk.getSpanId());

List<SpanEventBo> spanEventBoList = buildSpanEventBoList(tSpanChunk);
spanChunkBo.addSpanEventBoList(spanEventBoList);
return spanChunkBo;
}



Expand Down
Loading