From 67862a0e9d20001bff0933c26755c32bc7aabda0 Mon Sep 17 00:00:00 2001 From: HyunGil Jeong Date: Wed, 23 Dec 2015 17:19:17 +0900 Subject: [PATCH] #1362 Allow select tables to use parallel scanner when scanning for data --- .../dao/hbase/HbaseApplicationTraceIndexDao.java | 14 ++++++++------ .../web/dao/hbase/HbaseMapResponseTimeDao.java | 5 ++++- .../web/dao/hbase/HbaseMapStatisticsCalleeDao.java | 4 +++- .../web/dao/hbase/HbaseMapStatisticsCallerDao.java | 4 +++- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseApplicationTraceIndexDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseApplicationTraceIndexDao.java index 4468b4893cbd..3be7a820d5fa 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseApplicationTraceIndexDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseApplicationTraceIndexDao.java @@ -65,6 +65,8 @@ @Repository public class HbaseApplicationTraceIndexDao implements ApplicationTraceIndexDao { + private static final int APPLICATION_TRACE_INDEX_NUM_PARTITIONS = 32; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired @@ -104,8 +106,8 @@ public LimitedScanResult> scanTraceIndex(final String applic final LimitedScanResult> limitedScanResult = new LimitedScanResult<>(); LastRowAccessor lastRowAccessor = new LastRowAccessor(); - List> traceIndexList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, - scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor); + List> traceIndexList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, + scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor, APPLICATION_TRACE_INDEX_NUM_PARTITIONS); List transactionIdSum = new ArrayList<>(128); for(List transactionId: traceIndexList) { @@ -145,8 +147,8 @@ public LimitedScanResult> scanTraceIndex(final String applic final LimitedScanResult> limitedScanResult = new LimitedScanResult<>(); LastRowAccessor lastRowAccessor = new LastRowAccessor(); - List> traceIndexList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, - scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor); + List> traceIndexList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, + scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor, APPLICATION_TRACE_INDEX_NUM_PARTITIONS); List transactionIdSum = new ArrayList<>(128); for(List transactionId: traceIndexList) { @@ -244,7 +246,7 @@ public List scanTraceScatter(String applicationName, Range range, final int logger.debug("scanTraceScatter"); Scan scan = createScan(applicationName, range); - List> dotListList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, traceIndexScatterMapper); + List> dotListList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, traceIndexScatterMapper, APPLICATION_TRACE_INDEX_NUM_PARTITIONS); List mergeList = new ArrayList<>(limit + 10); for(List dotList : dotListList) { mergeList.addAll(dotList); @@ -277,7 +279,7 @@ public List scanTraceScatter(String applicationName, SelectedScatterArea ar ResponseTimeRange responseTimeRange = area.getResponseTimeRange(); TraceIndexScatterMapper2 mapper = new TraceIndexScatterMapper2(responseTimeRange.getFrom(), responseTimeRange.getTo()); - List> dotListList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, mapper); + List> dotListList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, mapper, APPLICATION_TRACE_INDEX_NUM_PARTITIONS); List result = new ArrayList<>(); for(List dotList : dotListList) { diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java index eefb8c82a858..653df73429e8 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapResponseTimeDao.java @@ -44,6 +44,9 @@ */ @Repository public class HbaseMapResponseTimeDao implements MapResponseDao { + + private static final int MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS = 8; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private int scanCacheSize = 40; @@ -101,7 +104,7 @@ public List selectResponseTime(Application application, Range rang if (tableExists) { Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_VER2_CF_COUNTER); - List responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper); + List responseTimeList = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS); if (logger.isDebugEnabled()) { logger.debug("Self data {}", responseTimeList); } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java index 43753002cc27..c0310ee92087 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCalleeDao.java @@ -50,6 +50,8 @@ @Repository public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { + private static final int MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS = 32; + private Logger logger = LoggerFactory.getLogger(this.getClass()); private int scanCacheSize = 40; private boolean backwardCompatibility = false; @@ -106,7 +108,7 @@ public LinkDataMap selectCallee(Application calleeApplication, Range range) { // find distributed key - ver2. final Scan scan = createScan(calleeApplication, range, HBaseTables.MAP_STATISTICS_CALLER_VER2_CF_COUNTER); ResultsExtractor resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor); + LinkDataMap linkDataMap = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_CALLER_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS); logger.debug("Callee data. {}, {}", linkDataMap, range); if (linkDataMap != null && linkDataMap.size() > 0) { return linkDataMap; diff --git a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java index 1308c939cff5..f51a2f9b947c 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/dao/hbase/HbaseMapStatisticsCallerDao.java @@ -50,6 +50,8 @@ @Repository public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { + private static final int MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS = 32; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private int scanCacheSize = 40; private boolean backwardCompatibility = false; @@ -107,7 +109,7 @@ public LinkDataMap selectCaller(Application callerApplication, Range range) { // find distributed key. final Scan scan = createScan(callerApplication, range, HBaseTables.MAP_STATISTICS_CALLEE_VER2_CF_COUNTER); ResultsExtractor resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow)); - LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor); + LinkDataMap linkDataMap = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_CALLEE_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS); logger.debug("Caller data. {}, {}", linkDataMap, range); if(linkDataMap != null && linkDataMap.size() > 0) { return linkDataMap;