Skip to content

Commit

Permalink
Merge pull request #1037 from jaehong-kim/1.1.2
Browse files Browse the repository at this point in the history
add ver2 table & apply distributed key for statistics self table.
  • Loading branch information
jaehong-kim committed Oct 6, 2015
2 parents 2a7449b + 9968709 commit a8ab6b9
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils;
import com.navercorp.pinpoint.common.util.TimeSlot;

import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.client.Increment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,6 +62,10 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao {
@Qualifier("selfMerge")
private RowKeyMerge rowKeyMerge;

@Autowired
@Qualifier("statisticsSelfRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final boolean useBulk;

private final ConcurrentCounterMap<RowInfo> counter = new ConcurrentCounterMap<RowInfo>();
Expand Down Expand Up @@ -99,7 +104,7 @@ public void received(String applicationName, short applicationServiceType, Strin
RowInfo rowInfo = new DefaultRowInfo(selfRowKey, selfColumnName);
this.counter.increment(rowInfo, 1L);
} else {
final byte[] rowKey = selfRowKey.getRowKey();
final byte[] rowKey = getDistributedKey(selfRowKey.getRowKey());
// column name is the name of caller app.
byte[] columnName = selfColumnName.getColumnName();
increment(rowKey, columnName, 1L);
Expand All @@ -113,7 +118,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
if (columnName == null) {
throw new NullPointerException("columnName must not be null");
}
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_SELF, rowKey, MAP_STATISTICS_SELF_CF_COUNTER, columnName, increment);
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_SELF_VER2, rowKey, MAP_STATISTICS_SELF_VER2_CF_COUNTER, columnName, increment);
}


Expand All @@ -125,13 +130,17 @@ public void flushAll() {

// update statistics by rowkey and column for now. need to update it by rowkey later.
Map<RowInfo,ConcurrentCounterMap.LongAdder> remove = this.counter.remove();
List<Increment> merge = rowKeyMerge.createBulkIncrement(remove, null);
List<Increment> merge = rowKeyMerge.createBulkIncrement(remove, rowKeyDistributorByHashPrefix);
if (!merge.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size());
}
hbaseTemplate.increment(MAP_STATISTICS_SELF, merge);
hbaseTemplate.increment(MAP_STATISTICS_SELF_VER2, merge);
}

}

private byte[] getDistributedKey(byte[] rowKey) {
return rowKeyDistributorByHashPrefix.getDistributedKey(rowKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
if (columnName == null) {
throw new NullPointerException("columnName must not be null");
}
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLER, rowKey, MAP_STATISTICS_CALLER_CF_COUNTER, columnName, increment);
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLER_VER2, rowKey, MAP_STATISTICS_CALLER_VER2_CF_COUNTER, columnName, increment);
}

@Override
Expand All @@ -143,7 +143,7 @@ public void flushAll() {
if (logger.isDebugEnabled()) {
logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size());
}
hbaseTemplate.increment(MAP_STATISTICS_CALLER, merge);
hbaseTemplate.increment(MAP_STATISTICS_CALLER_VER2, merge);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) {
if (columnName == null) {
throw new NullPointerException("columnName must not be null");
}
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLEE, rowKey, MAP_STATISTICS_CALLEE_CF_VER2_COUNTER, columnName, increment);
hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLEE_VER2, rowKey, MAP_STATISTICS_CALLEE_VER2_CF_COUNTER, columnName, increment);
}


Expand All @@ -138,7 +138,7 @@ public void flushAll() {
if (logger.isDebugEnabled()) {
logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size());
}
hbaseTemplate.increment(MAP_STATISTICS_CALLEE, merge);
hbaseTemplate.increment(MAP_STATISTICS_CALLEE_VER2, merge);
}

}
Expand Down
6 changes: 3 additions & 3 deletions collector/src/main/resources/applicationContext-collector.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@
</bean>

<bean id="callerMerge" class="com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKeyMerge">
<constructor-arg value="#{hTable.MAP_STATISTICS_CALLEE_CF_VER2_COUNTER}"/>
<constructor-arg value="#{hTable.MAP_STATISTICS_CALLEE_VER2_CF_COUNTER}"/>
</bean>

<bean id="calleeMerge" class="com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKeyMerge">
<constructor-arg value="#{hTable.MAP_STATISTICS_CALLER_CF_COUNTER}"/>
<constructor-arg value="#{hTable.MAP_STATISTICS_CALLER_VER2_CF_COUNTER}"/>
</bean>

<bean id="selfMerge" class="com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKeyMerge">
<constructor-arg value="#{hTable.MAP_STATISTICS_SELF_CF_COUNTER}"/>
<constructor-arg value="#{hTable.MAP_STATISTICS_SELF_VER2_CF_COUNTER}"/>
</bean>

<bean id="timeSlot" class="com.navercorp.pinpoint.common.util.DefaultTimeSlot">
Expand Down
10 changes: 10 additions & 0 deletions collector/src/main/resources/applicationContext-hbase.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,14 @@
<constructor-arg type="int" value="36"/>
<constructor-arg type="int" value="32"/>
</bean>

<bean id="statisticsSelfRowKeyDistributor" class="com.sematext.hbase.wd.RowKeyDistributorByHashPrefix">
<constructor-arg ref="statisticsSelfHasher"/>
</bean>

<bean id="statisticsSelfHasher" class="com.navercorp.pinpoint.common.hbase.distributor.RangeOneByteSimpleHash">
<constructor-arg type="int" value="0"/>
<constructor-arg type="int" value="32"/>
<constructor-arg type="int" value="8"/>
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,23 @@ public final class HBaseTables {
public static final String MAP_STATISTICS_CALLER = "ApplicationMapStatisticsCaller";
public static final byte[] MAP_STATISTICS_CALLER_CF_COUNTER = Bytes.toBytes("C");

public static final String MAP_STATISTICS_CALLER_VER2 = "ApplicationMapStatisticsCaller_Ver2";
public static final byte[] MAP_STATISTICS_CALLER_VER2_CF_COUNTER = Bytes.toBytes("C");

public static final String MAP_STATISTICS_CALLEE = "ApplicationMapStatisticsCallee";
// to be removed - use ver2 instead. remove relevant code as well.
public static final byte[] MAP_STATISTICS_CALLEE_CF_COUNTER = Bytes.toBytes("C");
public static final byte[] MAP_STATISTICS_CALLEE_CF_VER2_COUNTER = Bytes.toBytes("D");

public static final String MAP_STATISTICS_CALLEE_VER2 = "ApplicationMapStatisticsCallee_Ver2";
public static final byte[] MAP_STATISTICS_CALLEE_VER2_CF_COUNTER = Bytes.toBytes("C");

public static final String MAP_STATISTICS_SELF = "ApplicationMapStatisticsSelf";
public static final byte[] MAP_STATISTICS_SELF_CF_COUNTER = Bytes.toBytes("C");

public static final String MAP_STATISTICS_SELF_VER2 = "ApplicationMapStatisticsSelf_Ver2";
public static final byte[] MAP_STATISTICS_SELF_VER2_CF_COUNTER = Bytes.toBytes("C");

public static final String HOST_APPLICATION_MAP = "HostApplicationMap";
public static final byte[] HOST_APPLICATION_MAP_CF_MAP = Bytes.toBytes("M");

Expand Down
3 changes: 3 additions & 0 deletions scripts/hbase-create.hbase
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ create 'Traces', { NAME => 'S', TTL => 5184000 }, { NAME => 'A', TTL => 5184000
create 'ApplicationTraceIndex', { NAME => 'I', TTL => 5184000 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}

create 'ApplicationMapStatisticsCaller', { NAME => 'C', TTL => 5184000, VERSION => 1 }
create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
create 'ApplicationMapStatisticsCallee', { NAME => 'C', TTL => 5184000, VERSION => 1 }, { NAME => 'D', TTL => 5184000, VERSION => 1 }
create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
create 'ApplicationMapStatisticsSelf', { NAME => 'C', TTL => 5184000, VERSION => 1 }
create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}

create 'ApplicationStatistics', { NAME => 'C', TTL => 5184000, VERSION => 1 }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.web.dao.hbase;

import com.navercorp.pinpoint.common.hbase.HBaseAdminTemplate;
import com.navercorp.pinpoint.common.hbase.HBaseTables;
import com.navercorp.pinpoint.common.hbase.HbaseOperations2;
import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils;
Expand All @@ -25,13 +26,16 @@
import com.navercorp.pinpoint.web.vo.RangeFactory;
import com.navercorp.pinpoint.web.vo.ResponseTime;

import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.client.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.stereotype.Repository;

import javax.annotation.PostConstruct;
import java.util.*;

/**
Expand All @@ -46,16 +50,46 @@ public class HbaseMapResponseTimeDao implements MapResponseDao {
private final String tableName = HBaseTables.MAP_STATISTICS_SELF;

private int scanCacheSize = 40;
private boolean backwardCompatibility = false;
private boolean tableExists = false;

@Autowired
@Qualifier("responseTimeMapperBackwardCompatibility")
private RowMapper<ResponseTime> responseTimeMapperBackwardCompatibility;

@Autowired
@Qualifier("responseTimeMapper")
private RowMapper<ResponseTime> responseTimeMapper;

@Autowired
private HbaseOperations2 hbaseOperations2;

@Autowired
HBaseAdminTemplate hBaseAdminTemplate;

@Autowired
private RangeFactory rangeFactory;

@Autowired
@Qualifier("statisticsSelfRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

@PostConstruct
public void init() {
tableExists = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_SELF_VER2);
if (!tableExists) {
logger.warn("Please create '{}' table.", HBaseTables.MAP_STATISTICS_SELF_VER2);
}

backwardCompatibility = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_SELF);
if (backwardCompatibility) {
logger.warn("'{}' table exists. Recommend that only use '{}' table.", HBaseTables.MAP_STATISTICS_SELF, HBaseTables.MAP_STATISTICS_SELF_VER2);
}

if (!tableExists && !backwardCompatibility) {
throw new RuntimeException("Please check for '" + HBaseTables.MAP_STATISTICS_SELF_VER2 + "' table in HBase. Need to create '" + HBaseTables.MAP_STATISTICS_SELF_VER2 + "' table.");
}
}

@Override
public List<ResponseTime> selectResponseTime(Application application, Range range) {
Expand All @@ -65,22 +99,34 @@ public List<ResponseTime> selectResponseTime(Application application, Range rang
if (logger.isDebugEnabled()) {
logger.debug("selectResponseTime applicationName:{}, {}", application, range);
}
Scan scan = createScan(application, range);
List<ResponseTime> responseTimeList = hbaseOperations2.find(tableName, scan, responseTimeMapper);
if (logger.isDebugEnabled()) {
logger.debug("row:{}", responseTimeList.size());
for (ResponseTime responseTime : responseTimeList) {
logger.trace("responseTime:{}", responseTime);
if (tableExists) {
Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_VER2_CF_COUNTER);

List<ResponseTime> responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper);
if (logger.isDebugEnabled()) {
logger.debug("Self data {}", responseTimeList);
}

if (responseTimeList.size() > 0) {
return responseTimeList;
}
}

return responseTimeList;
if (backwardCompatibility) {
Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER);
List<ResponseTime> responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF, scan, responseTimeMapperBackwardCompatibility);
if (logger.isDebugEnabled()) {
logger.debug("Self data {}", responseTimeList);
}
return responseTimeList;
} else {
return new ArrayList<>();
}
}

private Scan createScan(Application application, Range range) {
private Scan createScan(Application application, Range range, byte[] family) {
range = rangeFactory.createStatisticsRange(range);


if (logger.isDebugEnabled()) {
logger.debug("scan time:{} ", range.prettyToString());
}
Expand All @@ -93,7 +139,7 @@ private Scan createScan(Application application, Range range) {
scan.setCaching(this.scanCacheSize);
scan.setStartRow(startKey);
scan.setStopRow(endKey);
scan.addFamily(HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER);
scan.addFamily(family);
scan.setId("ApplicationSelfScan");

return scan;
Expand Down
Loading

0 comments on commit a8ab6b9

Please # to comment.