From 996870937a1c8a1876f42ccbc9c155c05042e174 Mon Sep 17 00:00:00 2001 From: Jaehong Kim Date: Tue, 6 Oct 2015 17:38:23 +0900 Subject: [PATCH] add ver2 table & apply distributed key for statistics self table. --- .../dao/hbase/HbaseMapResponseTimeDao.java | 17 +++- .../hbase/HbaseMapStatisticsCalleeDao.java | 4 +- .../hbase/HbaseMapStatisticsCallerDao.java | 4 +- .../applicationContext-collector.xml | 6 +- .../resources/applicationContext-hbase.xml | 10 +++ .../pinpoint/common/hbase/HBaseTables.java | 9 ++ scripts/hbase-create.hbase | 3 + .../dao/hbase/HbaseMapResponseTimeDao.java | 66 +++++++++++--- .../hbase/HbaseMapStatisticsCalleeDao.java | 65 ++++++++++---- .../hbase/HbaseMapStatisticsCallerDao.java | 66 ++++++++++---- .../web/mapper/MapStatisticsCalleeMapper.java | 1 - .../web/mapper/MapStatisticsCallerMapper.java | 31 ------- .../web/mapper/ResponseTimeMapper.java | 16 +++- ...sponseTimeMapperBackwardCompatibility.java | 86 +++++++++++++++++++ .../resources/applicationContext-hbase.xml | 10 +++ 15 files changed, 299 insertions(+), 95 deletions(-) create mode 100644 web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapperBackwardCompatibility.java diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java index 9dcd9667c349..d8888a4cb389 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java @@ -25,6 +25,7 @@ import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils; import com.navercorp.pinpoint.common.util.TimeSlot; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.client.Increment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,10 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao { @Qualifier("selfMerge") private RowKeyMerge rowKeyMerge; + @Autowired + @Qualifier("statisticsSelfRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + private final boolean useBulk; private final ConcurrentCounterMap counter = new ConcurrentCounterMap(); @@ -99,7 +104,7 @@ public void received(String applicationName, short applicationServiceType, Strin RowInfo rowInfo = new DefaultRowInfo(selfRowKey, selfColumnName); this.counter.increment(rowInfo, 1L); } else { - final byte[] rowKey = selfRowKey.getRowKey(); + final byte[] rowKey = getDistributedKey(selfRowKey.getRowKey()); // column name is the name of caller app. byte[] columnName = selfColumnName.getColumnName(); increment(rowKey, columnName, 1L); @@ -113,7 +118,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) { if (columnName == null) { throw new NullPointerException("columnName must not be null"); } - hbaseTemplate.incrementColumnValue(MAP_STATISTICS_SELF, rowKey, MAP_STATISTICS_SELF_CF_COUNTER, columnName, increment); + hbaseTemplate.incrementColumnValue(MAP_STATISTICS_SELF_VER2, rowKey, MAP_STATISTICS_SELF_VER2_CF_COUNTER, columnName, increment); } @@ -125,13 +130,17 @@ public void flushAll() { // update statistics by rowkey and column for now. need to update it by rowkey later. Map remove = this.counter.remove(); - List merge = rowKeyMerge.createBulkIncrement(remove, null); + List merge = rowKeyMerge.createBulkIncrement(remove, rowKeyDistributorByHashPrefix); if (!merge.isEmpty()) { if (logger.isDebugEnabled()) { logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size()); } - hbaseTemplate.increment(MAP_STATISTICS_SELF, merge); + hbaseTemplate.increment(MAP_STATISTICS_SELF_VER2, merge); } } + + private byte[] getDistributedKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getDistributedKey(rowKey); + } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java index 68369416db0c..9024e64906c6 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java @@ -128,7 +128,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) { if (columnName == null) { throw new NullPointerException("columnName must not be null"); } - hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLER, rowKey, MAP_STATISTICS_CALLER_CF_COUNTER, columnName, increment); + hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLER_VER2, rowKey, MAP_STATISTICS_CALLER_VER2_CF_COUNTER, columnName, increment); } @Override @@ -143,7 +143,7 @@ public void flushAll() { if (logger.isDebugEnabled()) { logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size()); } - hbaseTemplate.increment(MAP_STATISTICS_CALLER, merge); + hbaseTemplate.increment(MAP_STATISTICS_CALLER_VER2, merge); } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java index a59459f4458d..7526805456da 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java @@ -122,7 +122,7 @@ private void increment(byte[] rowKey, byte[] columnName, long increment) { if (columnName == null) { throw new NullPointerException("columnName must not be null"); } - hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLEE, rowKey, MAP_STATISTICS_CALLEE_CF_VER2_COUNTER, columnName, increment); + hbaseTemplate.incrementColumnValue(MAP_STATISTICS_CALLEE_VER2, rowKey, MAP_STATISTICS_CALLEE_VER2_CF_COUNTER, columnName, increment); } @@ -138,7 +138,7 @@ public void flushAll() { if (logger.isDebugEnabled()) { logger.debug("flush {} Increment:{}", this.getClass().getSimpleName(), merge.size()); } - hbaseTemplate.increment(MAP_STATISTICS_CALLEE, merge); + hbaseTemplate.increment(MAP_STATISTICS_CALLEE_VER2, merge); } } diff --git a/collector/src/main/resources/applicationContext-collector.xml b/collector/src/main/resources/applicationContext-collector.xml index 11eae5960b81..1605e89a9bbf 100644 --- a/collector/src/main/resources/applicationContext-collector.xml +++ b/collector/src/main/resources/applicationContext-collector.xml @@ -114,15 +114,15 @@ - + - + - + diff --git a/collector/src/main/resources/applicationContext-hbase.xml b/collector/src/main/resources/applicationContext-hbase.xml index 720599560191..240c3ea27da3 100644 --- a/collector/src/main/resources/applicationContext-hbase.xml +++ b/collector/src/main/resources/applicationContext-hbase.xml @@ -132,4 +132,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseTables.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseTables.java index 74776724b9a5..78696d81917e 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseTables.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HBaseTables.java @@ -77,14 +77,23 @@ public final class HBaseTables { public static final String MAP_STATISTICS_CALLER = "ApplicationMapStatisticsCaller"; public static final byte[] MAP_STATISTICS_CALLER_CF_COUNTER = Bytes.toBytes("C"); + public static final String MAP_STATISTICS_CALLER_VER2 = "ApplicationMapStatisticsCaller_Ver2"; + public static final byte[] MAP_STATISTICS_CALLER_VER2_CF_COUNTER = Bytes.toBytes("C"); + public static final String MAP_STATISTICS_CALLEE = "ApplicationMapStatisticsCallee"; // to be removed - use ver2 instead. remove relevant code as well. public static final byte[] MAP_STATISTICS_CALLEE_CF_COUNTER = Bytes.toBytes("C"); public static final byte[] MAP_STATISTICS_CALLEE_CF_VER2_COUNTER = Bytes.toBytes("D"); + public static final String MAP_STATISTICS_CALLEE_VER2 = "ApplicationMapStatisticsCallee_Ver2"; + public static final byte[] MAP_STATISTICS_CALLEE_VER2_CF_COUNTER = Bytes.toBytes("C"); + public static final String MAP_STATISTICS_SELF = "ApplicationMapStatisticsSelf"; public static final byte[] MAP_STATISTICS_SELF_CF_COUNTER = Bytes.toBytes("C"); + public static final String MAP_STATISTICS_SELF_VER2 = "ApplicationMapStatisticsSelf_Ver2"; + public static final byte[] MAP_STATISTICS_SELF_VER2_CF_COUNTER = Bytes.toBytes("C"); + public static final String HOST_APPLICATION_MAP = "HostApplicationMap"; public static final byte[] HOST_APPLICATION_MAP_CF_MAP = Bytes.toBytes("M"); diff --git a/scripts/hbase-create.hbase b/scripts/hbase-create.hbase index 69c76d39fed4..cf1936dbfc80 100644 --- a/scripts/hbase-create.hbase +++ b/scripts/hbase-create.hbase @@ -12,8 +12,11 @@ create 'Traces', { NAME => 'S', TTL => 5184000 }, { NAME => 'A', TTL => 5184000 create 'ApplicationTraceIndex', { NAME => 'I', TTL => 5184000 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationMapStatisticsCaller', { NAME => 'C', TTL => 5184000, VERSION => 1 } +create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationMapStatisticsCallee', { NAME => 'C', TTL => 5184000, VERSION => 1 }, { NAME => 'D', TTL => 5184000, VERSION => 1 } +create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationMapStatisticsSelf', { NAME => 'C', TTL => 5184000, VERSION => 1 } +create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSION => 1 }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationStatistics', { NAME => 'C', TTL => 5184000, VERSION => 1 } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java index bc95f8b3fffe..a0bd87b6cffa 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java @@ -16,6 +16,7 @@ package com.navercorp.pinpoint.web.dao.hbase; +import com.navercorp.pinpoint.common.hbase.HBaseAdminTemplate; import com.navercorp.pinpoint.common.hbase.HBaseTables; import com.navercorp.pinpoint.common.hbase.HbaseOperations2; import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils; @@ -25,13 +26,16 @@ import com.navercorp.pinpoint.web.vo.RangeFactory; import com.navercorp.pinpoint.web.vo.ResponseTime; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.client.Scan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.stereotype.Repository; +import javax.annotation.PostConstruct; import java.util.*; /** @@ -46,16 +50,46 @@ public class HbaseMapResponseTimeDao implements MapResponseDao { private final String tableName = HBaseTables.MAP_STATISTICS_SELF; private int scanCacheSize = 40; + private boolean backwardCompatibility = false; + private boolean tableExists = false; @Autowired + @Qualifier("responseTimeMapperBackwardCompatibility") + private RowMapper responseTimeMapperBackwardCompatibility; + + @Autowired + @Qualifier("responseTimeMapper") private RowMapper responseTimeMapper; @Autowired private HbaseOperations2 hbaseOperations2; + @Autowired + HBaseAdminTemplate hBaseAdminTemplate; + @Autowired private RangeFactory rangeFactory; + @Autowired + @Qualifier("statisticsSelfRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + @PostConstruct + public void init() { + tableExists = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_SELF_VER2); + if (!tableExists) { + logger.warn("Please create '{}' table.", HBaseTables.MAP_STATISTICS_SELF_VER2); + } + + backwardCompatibility = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_SELF); + if (backwardCompatibility) { + logger.warn("'{}' table exists. Recommend that only use '{}' table.", HBaseTables.MAP_STATISTICS_SELF, HBaseTables.MAP_STATISTICS_SELF_VER2); + } + + if (!tableExists && !backwardCompatibility) { + throw new RuntimeException("Please check for '" + HBaseTables.MAP_STATISTICS_SELF_VER2 + "' table in HBase. Need to create '" + HBaseTables.MAP_STATISTICS_SELF_VER2 + "' table."); + } + } @Override public List selectResponseTime(Application application, Range range) { @@ -65,22 +99,34 @@ public List selectResponseTime(Application application, Range rang if (logger.isDebugEnabled()) { logger.debug("selectResponseTime applicationName:{}, {}", application, range); } - Scan scan = createScan(application, range); - List responseTimeList = hbaseOperations2.find(tableName, scan, responseTimeMapper); - if (logger.isDebugEnabled()) { - logger.debug("row:{}", responseTimeList.size()); - for (ResponseTime responseTime : responseTimeList) { - logger.trace("responseTime:{}", responseTime); + if (tableExists) { + Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_VER2_CF_COUNTER); + + List responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper); + if (logger.isDebugEnabled()) { + logger.debug("Self data {}", responseTimeList); + } + + if (responseTimeList.size() > 0) { + return responseTimeList; } } - return responseTimeList; + if (backwardCompatibility) { + Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER); + List responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF, scan, responseTimeMapperBackwardCompatibility); + if (logger.isDebugEnabled()) { + logger.debug("Self data {}", responseTimeList); + } + return responseTimeList; + } else { + return new ArrayList<>(); + } } - private Scan createScan(Application application, Range range) { + private Scan createScan(Application application, Range range, byte[] family) { range = rangeFactory.createStatisticsRange(range); - if (logger.isDebugEnabled()) { logger.debug("scan time:{} ", range.prettyToString()); } @@ -93,7 +139,7 @@ private Scan createScan(Application application, Range range) { scan.setCaching(this.scanCacheSize); scan.setStartRow(startKey); scan.setStopRow(endKey); - scan.addFamily(HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER); + scan.addFamily(family); scan.setId("ApplicationSelfScan"); return scan; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java index a2717c9a53eb..3e4eca9943a5 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java @@ -18,6 +18,7 @@ import java.util.*; +import com.navercorp.pinpoint.common.hbase.HBaseAdminTemplate; import com.navercorp.pinpoint.common.hbase.HBaseTables; import com.navercorp.pinpoint.common.hbase.HbaseOperations2; import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils; @@ -40,6 +41,8 @@ import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.stereotype.Repository; +import javax.annotation.PostConstruct; + /** * * @author netspider @@ -51,10 +54,15 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { private Logger logger = LoggerFactory.getLogger(this.getClass()); private int scanCacheSize = 40; + private boolean backwardCompatibility = false; + private boolean tableExists = false; @Autowired private HbaseOperations2 hbaseOperations2; + @Autowired + HBaseAdminTemplate hBaseAdminTemplate; + @Autowired @Qualifier("mapStatisticsCalleeMapperBackwardCompatibility") private RowMapper mapStatisticsCalleeMapperBackwardCompatibility; @@ -70,6 +78,23 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { @Qualifier("statisticsCalleeRowKeyDistributor") private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + @PostConstruct + public void init() { + tableExists = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_CALLER_VER2); + if (!tableExists) { + logger.warn("Please create '{}' table.", HBaseTables.MAP_STATISTICS_CALLER_VER2); + } + + backwardCompatibility = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_CALLER); + if (backwardCompatibility) { + logger.warn("'{}' table exists. Recommend that only use '{}' table.", HBaseTables.MAP_STATISTICS_CALLER, HBaseTables.MAP_STATISTICS_CALLER_VER2); + } + + if(!tableExists && !backwardCompatibility) { + throw new RuntimeException("Please check for '" + HBaseTables.MAP_STATISTICS_CALLER_VER2 + "' table in HBase. Need to create '" + HBaseTables.MAP_STATISTICS_CALLER_VER2 + "' table."); + } + } + @Override public LinkDataMap selectCallee(Application calleeApplication, Range range) { if (calleeApplication == null) { @@ -78,25 +103,29 @@ public LinkDataMap selectCallee(Application calleeApplication, Range range) { if (range == null) { throw new NullPointerException("range must not be null"); } - final Scan scan = createScan(calleeApplication, range); final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); - // find distributed key. - ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER, scan, rowKeyDistributorByHashPrefix, resultExtractor); - logger.debug("Callee data. {}, {}", linkDataMap, range); - - if (linkDataMap == null || linkDataMap.size() ==0) { - logger.debug("There's no callee data. {}, {}", calleeApplication, range); - // backward compatibility - non distributed. - resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCalleeMapperBackwardCompatibility, new MapStatisticsTimeWindowReducer(timeWindow)); - linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER, scan, resultExtractor); + + if(tableExists) { + // find distributed key - ver2. + final Scan scan = createScan(calleeApplication, range, HBaseTables.MAP_STATISTICS_CALLER_VER2_CF_COUNTER); + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor); logger.debug("Callee data. {}, {}", linkDataMap, range); - if(linkDataMap == null) { - return new LinkDataMap(); + if (linkDataMap != null && linkDataMap.size() > 0) { + return linkDataMap; } } - return linkDataMap; + if(backwardCompatibility) { + // backward compatibility - non distributed - ver1. + final Scan scan = createScan(calleeApplication, range, HBaseTables.MAP_STATISTICS_CALLER_CF_COUNTER); + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCalleeMapperBackwardCompatibility, new MapStatisticsTimeWindowReducer(timeWindow)); + LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER, scan, resultExtractor); + logger.debug("Callee data. {}, {}", linkDataMap, range); + return linkDataMap != null ? linkDataMap : new LinkDataMap(); + } else { + return new LinkDataMap(); + } } /** @@ -119,15 +148,13 @@ public List selectCalleeStatistics(Application callerApplication, A if (logger.isDebugEnabled()) { logger.debug("selectCalleeStatistics. {}, {}, {}", callerApplication, calleeApplication, range); } - Scan scan = createScan(calleeApplication, range); - - + Scan scan = createScan(calleeApplication, range, HBaseTables.MAP_STATISTICS_CALLER_CF_COUNTER); final LinkFilter filter = new DefaultLinkFilter(callerApplication, calleeApplication); RowMapper mapper = new MapStatisticsCalleeMapperBackwardCompatibility(filter); return hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER, scan, mapper); } - private Scan createScan(Application application, Range range) { + private Scan createScan(Application application, Range range, byte[] family) { range = rangeFactory.createStatisticsRange(range); if (logger.isDebugEnabled()) { @@ -142,7 +169,7 @@ private Scan createScan(Application application, Range range) { scan.setCaching(this.scanCacheSize); scan.setStartRow(startKey); scan.setStopRow(endKey); - scan.addFamily(HBaseTables.MAP_STATISTICS_CALLEE_CF_COUNTER); + scan.addFamily(family); scan.setId("ApplicationStatisticsScan"); return scan; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java index 6a75ea539433..cc3dfbf2dbc4 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java @@ -18,6 +18,7 @@ import java.util.*; +import com.navercorp.pinpoint.common.hbase.HBaseAdminTemplate; import com.navercorp.pinpoint.common.hbase.HBaseTables; import com.navercorp.pinpoint.common.hbase.HbaseOperations2; import com.navercorp.pinpoint.common.util.ApplicationMapStatisticsUtils; @@ -40,6 +41,8 @@ import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.stereotype.Repository; +import javax.annotation.PostConstruct; + /** * * @author netspider @@ -51,10 +54,15 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private int scanCacheSize = 40; + private boolean backwardCompatibility = false; + private boolean tableExists = false; @Autowired private HbaseOperations2 hbaseOperations2; + @Autowired + HBaseAdminTemplate hBaseAdminTemplate; + @Autowired @Qualifier("mapStatisticsCallerMapperBackwardCompatibility") private RowMapper mapStatisticsCallerMapperBackwardCompatibility; @@ -70,6 +78,23 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { @Qualifier("statisticsCallerRowKeyDistributor") private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + @PostConstruct + public void init() { + tableExists = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_CALLEE_VER2); + if (!tableExists) { + logger.warn("Please create '{}' table.", HBaseTables.MAP_STATISTICS_CALLEE_VER2); + } + + backwardCompatibility = hBaseAdminTemplate.tableExists(HBaseTables.MAP_STATISTICS_CALLEE); + if (backwardCompatibility) { + logger.warn("'{}' table exists. Recommend that only use '{}' table.", HBaseTables.MAP_STATISTICS_CALLEE, HBaseTables.MAP_STATISTICS_CALLEE_VER2); + } + + if (!tableExists && !backwardCompatibility) { + throw new RuntimeException("Please check for '" + HBaseTables.MAP_STATISTICS_CALLEE_VER2 + "' table in HBase. Need to create '" + HBaseTables.MAP_STATISTICS_CALLEE_VER2 + "' table."); + } + } + @Override public LinkDataMap selectCaller(Application callerApplication, Range range) { if (callerApplication == null) { @@ -79,26 +104,28 @@ public LinkDataMap selectCaller(Application callerApplication, Range range) { throw new NullPointerException("range must not be null"); } - final Scan scan = createScan(callerApplication, range); final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); - // find distributed key. - ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE, scan, rowKeyDistributorByHashPrefix, resultExtractor); - logger.debug("Caller data. {}, {}", linkDataMap, range); - - if (linkDataMap == null || linkDataMap.size() == 0) { - logger.debug("There's no caller data. {}, {}", callerApplication, range); - - // backward compatibility - non distributed. - resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCallerMapperBackwardCompatibility, new MapStatisticsTimeWindowReducer(timeWindow)); - linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE, scan, resultExtractor); + if(tableExists) { + // find distributed key. + final Scan scan = createScan(callerApplication, range, HBaseTables.MAP_STATISTICS_CALLEE_VER2_CF_COUNTER); + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor); logger.debug("Caller data. {}, {}", linkDataMap, range); - if(linkDataMap == null) { - return new LinkDataMap(); + if(linkDataMap != null && linkDataMap.size() > 0) { + return linkDataMap; } } - return linkDataMap; + if (backwardCompatibility) { + // backward compatibility - non distributed. + final Scan scan = createScan(callerApplication, range, HBaseTables.MAP_STATISTICS_CALLEE_CF_COUNTER, HBaseTables.MAP_STATISTICS_CALLEE_CF_VER2_COUNTER); + ResultsExtractor resultExtractor = new RowMapReduceResultExtractor(mapStatisticsCallerMapperBackwardCompatibility, new MapStatisticsTimeWindowReducer(timeWindow)); + LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE, scan, resultExtractor); + logger.debug("Caller data. {}, {}", linkDataMap, range); + return linkDataMap != null ? linkDataMap : new LinkDataMap(); + } else { + return new LinkDataMap(); + } } /** @@ -122,14 +149,14 @@ public List selectCallerStatistics(Application callerApplication, A if (logger.isDebugEnabled()) { logger.debug("selectCallerStatistics. {}, {}, {}", callerApplication, calleeApplication, range); } - Scan scan = createScan(callerApplication, range); + Scan scan = createScan(callerApplication, range, HBaseTables.MAP_STATISTICS_CALLEE_CF_COUNTER, HBaseTables.MAP_STATISTICS_CALLEE_CF_VER2_COUNTER); final LinkFilter filter = new DefaultLinkFilter(callerApplication, calleeApplication); RowMapper mapper = new MapStatisticsCallerMapperBackwardCompatibility(filter); return hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE, scan, mapper); } - private Scan createScan(Application application, Range range) { + private Scan createScan(Application application, Range range, byte[]... familyArgs) { range = rangeFactory.createStatisticsRange(range); if (logger.isDebugEnabled()) { @@ -144,8 +171,9 @@ private Scan createScan(Application application, Range range) { scan.setCaching(this.scanCacheSize); scan.setStartRow(startKey); scan.setStopRow(endKey); - scan.addFamily(HBaseTables.MAP_STATISTICS_CALLEE_CF_COUNTER); - scan.addFamily(HBaseTables.MAP_STATISTICS_CALLEE_CF_VER2_COUNTER); + for(byte[] family : familyArgs) { + scan.addFamily(family); + } scan.setId("ApplicationStatisticsScan"); return scan; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCalleeMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCalleeMapper.java index 662f77d3a2c0..0f610a55c272 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCalleeMapper.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCalleeMapper.java @@ -93,7 +93,6 @@ public LinkDataMap mapRow(Result result, int rowNum) throws Exception { logger.debug(" Fetched Callee. {} callerHost:{} -> {} (slot:{}/{}), ", callerApplication, callerHost, calleeApplication, histogramSlot, requestCount); } - final short slotTime = (isError) ? (short) -1 : histogramSlot; linkDataMap.addLinkData(callerApplication, callerApplication.getName(), calleeApplication, callerHost, timestamp, slotTime, requestCount); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCallerMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCallerMapper.java index 033018b88aec..1831bd3e49d9 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCallerMapper.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/MapStatisticsCallerMapper.java @@ -83,33 +83,6 @@ public LinkDataMap mapRow(Result result, int rowNum) throws Exception { // key is destApplicationName. final LinkDataMap linkDataMap = new LinkDataMap(); for (Cell cell : result.rawCells()) { - if (CellUtil.matchingFamily(cell, HBaseTables.MAP_STATISTICS_CALLEE_CF_COUNTER)) { - final byte[] qualifier = CellUtil.cloneQualifier(cell); - final Application callee = readCalleeApplication(qualifier); - if (filter.filter(callee)) { - continue; - } - - long requestCount = getValueToLong(cell); - - short histogramSlot = ApplicationMapStatisticsUtils.getHistogramSlotFromColumnName(qualifier); - boolean isError = histogramSlot == (short) -1; - - String calleeHost = ApplicationMapStatisticsUtils.getHost(qualifier); - - if (logger.isDebugEnabled()) { - logger.debug(" Fetched Caller. {} -> {} (slot:{}/{}) calleeHost:{}", caller, callee, histogramSlot, requestCount, calleeHost); - } - - final short slotTime = (isError) ? (short) -1 : histogramSlot; - if (StringUtils.isEmpty(calleeHost)) { - calleeHost = callee.getName(); - } - linkDataMap.addLinkData(caller, caller.getName(), callee, calleeHost, timestamp, slotTime, requestCount); - - - } else if (CellUtil.matchingFamily(cell, HBaseTables.MAP_STATISTICS_CALLEE_CF_VER2_COUNTER)) { - final Buffer buffer = new OffsetFixedBuffer(cell.getQualifierArray(), cell.getQualifierOffset()); final Application callee = readCalleeApplication(buffer); if (filter.filter(callee)) { @@ -133,10 +106,6 @@ public LinkDataMap mapRow(Result result, int rowNum) throws Exception { calleeHost = callee.getName(); } linkDataMap.addLinkData(caller, callerAgentId, callee, calleeHost, timestamp, slotTime, requestCount); - } else { - throw new IllegalArgumentException("unknown ColumnFamily :" + Arrays.toString(CellUtil.cloneFamily(cell))); - } - } return linkDataMap; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapper.java index 0f806adbc929..267ed7d5c99a 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapper.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapper.java @@ -23,12 +23,15 @@ import com.navercorp.pinpoint.common.util.TimeUtils; import com.navercorp.pinpoint.web.vo.ResponseTime; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.stereotype.Component; @@ -42,17 +45,21 @@ public class ResponseTimeMapper implements RowMapper { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired + @Qualifier("statisticsSelfRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + @Override public ResponseTime mapRow(Result result, int rowNum) throws Exception { if (result.isEmpty()) { return null; } - final byte[] rowKey = result.getRow(); + final byte[] rowKey = getOriginalKey(result.getRow()); ResponseTime responseTime = createResponseTime(rowKey); for (Cell cell : result.rawCells()) { - if (CellUtil.matchingFamily(cell, HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER)) { + if (CellUtil.matchingFamily(cell, HBaseTables.MAP_STATISTICS_SELF_VER2_CF_COUNTER)) { recordColumn(responseTime, cell); } @@ -63,8 +70,6 @@ public ResponseTime mapRow(Result result, int rowNum) throws Exception { return responseTime; } - - void recordColumn(ResponseTime responseTime, Cell cell) { final byte[] qArray = cell.getQualifierArray(); final int qOffset = cell.getQualifierOffset(); @@ -84,4 +89,7 @@ private ResponseTime createResponseTime(byte[] rowKey) { return new ResponseTime(applicationName, serviceType, timestamp); } + private byte[] getOriginalKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey); + } } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapperBackwardCompatibility.java b/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapperBackwardCompatibility.java new file mode 100644 index 000000000000..be6b19941e3a --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/mapper/ResponseTimeMapperBackwardCompatibility.java @@ -0,0 +1,86 @@ +/* + * Copyright 2014 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.web.mapper; + +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.buffer.FixedBuffer; +import com.navercorp.pinpoint.common.hbase.HBaseTables; +import com.navercorp.pinpoint.common.util.BytesUtils; +import com.navercorp.pinpoint.common.util.TimeUtils; +import com.navercorp.pinpoint.web.vo.ResponseTime; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.hadoop.hbase.RowMapper; +import org.springframework.stereotype.Component; + +import java.util.Arrays; + +/** + * @author emeroad + */ +@Component +public class ResponseTimeMapperBackwardCompatibility implements RowMapper { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Override + public ResponseTime mapRow(Result result, int rowNum) throws Exception { + if (result.isEmpty()) { + return null; + } + + final byte[] rowKey = result.getRow(); + ResponseTime responseTime = createResponseTime(rowKey); + + for (Cell cell : result.rawCells()) { + if (CellUtil.matchingFamily(cell, HBaseTables.MAP_STATISTICS_SELF_CF_COUNTER)) { + recordColumn(responseTime, cell); + } + + if (logger.isDebugEnabled()) { + logger.debug("unknown column family:{}", Arrays.toString(CellUtil.cloneFamily(cell))); + } + } + return responseTime; + } + + + + void recordColumn(ResponseTime responseTime, Cell cell) { + final byte[] qArray = cell.getQualifierArray(); + final int qOffset = cell.getQualifierOffset(); + short slotNumber = Bytes.toShort(qArray, qOffset); + + // agentId should be added as data. + String agentId = Bytes.toString(qArray, qOffset + BytesUtils.SHORT_BYTE_LENGTH, cell.getQualifierLength() - BytesUtils.SHORT_BYTE_LENGTH); + long count = Bytes.toLong(cell.getValueArray(), cell.getValueOffset()); + responseTime.addResponseTime(agentId, slotNumber, count); + } + + private ResponseTime createResponseTime(byte[] rowKey) { + final Buffer row = new FixedBuffer(rowKey); + String applicationName = row.read2PrefixedString(); + short serviceType = row.readShort(); + final long timestamp = TimeUtils.recoveryTimeMillis(row.readLong()); + return new ResponseTime(applicationName, serviceType, timestamp); + } + +} diff --git a/web/src/main/resources/applicationContext-hbase.xml b/web/src/main/resources/applicationContext-hbase.xml index 93f7faec016e..961a6f0106b8 100644 --- a/web/src/main/resources/applicationContext-hbase.xml +++ b/web/src/main/resources/applicationContext-hbase.xml @@ -135,4 +135,14 @@ + + + + + + + + + + \ No newline at end of file