Skip to content

Commit

Permalink
pinpoint-apm#1362 Allow select tables to use parallel scanner when sc…
Browse files Browse the repository at this point in the history
…anning for data
  • Loading branch information
Xylus committed Dec 23, 2015
1 parent 0c68949 commit 67862a0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
@Repository
public class HbaseApplicationTraceIndexDao implements ApplicationTraceIndexDao {

private static final int APPLICATION_TRACE_INDEX_NUM_PARTITIONS = 32;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
Expand Down Expand Up @@ -104,8 +106,8 @@ public LimitedScanResult<List<TransactionId>> scanTraceIndex(final String applic

final LimitedScanResult<List<TransactionId>> limitedScanResult = new LimitedScanResult<>();
LastRowAccessor lastRowAccessor = new LastRowAccessor();
List<List<TransactionId>> traceIndexList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX,
scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor);
List<List<TransactionId>> traceIndexList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX,
scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor, APPLICATION_TRACE_INDEX_NUM_PARTITIONS);

List<TransactionId> transactionIdSum = new ArrayList<>(128);
for(List<TransactionId> transactionId: traceIndexList) {
Expand Down Expand Up @@ -145,8 +147,8 @@ public LimitedScanResult<List<TransactionId>> scanTraceIndex(final String applic

final LimitedScanResult<List<TransactionId>> limitedScanResult = new LimitedScanResult<>();
LastRowAccessor lastRowAccessor = new LastRowAccessor();
List<List<TransactionId>> traceIndexList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX,
scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor);
List<List<TransactionId>> traceIndexList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX,
scan, traceIdRowKeyDistributor, limit, traceIndexMapper, lastRowAccessor, APPLICATION_TRACE_INDEX_NUM_PARTITIONS);

List<TransactionId> transactionIdSum = new ArrayList<>(128);
for(List<TransactionId> transactionId: traceIndexList) {
Expand Down Expand Up @@ -244,7 +246,7 @@ public List<Dot> scanTraceScatter(String applicationName, Range range, final int
logger.debug("scanTraceScatter");
Scan scan = createScan(applicationName, range);

List<List<Dot>> dotListList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, traceIndexScatterMapper);
List<List<Dot>> dotListList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, traceIndexScatterMapper, APPLICATION_TRACE_INDEX_NUM_PARTITIONS);
List<Dot> mergeList = new ArrayList<>(limit + 10);
for(List<Dot> dotList : dotListList) {
mergeList.addAll(dotList);
Expand Down Expand Up @@ -277,7 +279,7 @@ public List<Dot> scanTraceScatter(String applicationName, SelectedScatterArea ar
ResponseTimeRange responseTimeRange = area.getResponseTimeRange();
TraceIndexScatterMapper2 mapper = new TraceIndexScatterMapper2(responseTimeRange.getFrom(), responseTimeRange.getTo());

List<List<Dot>> dotListList = hbaseOperations2.find(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, mapper);
List<List<Dot>> dotListList = hbaseOperations2.findParallel(HBaseTables.APPLICATION_TRACE_INDEX, scan, traceIdRowKeyDistributor, limit, mapper, APPLICATION_TRACE_INDEX_NUM_PARTITIONS);

List<Dot> result = new ArrayList<>();
for(List<Dot> dotList : dotListList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
*/
@Repository
public class HbaseMapResponseTimeDao implements MapResponseDao {

private static final int MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS = 8;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private int scanCacheSize = 40;
Expand Down Expand Up @@ -101,7 +104,7 @@ public List<ResponseTime> selectResponseTime(Application application, Range rang
if (tableExists) {
Scan scan = createScan(application, range, HBaseTables.MAP_STATISTICS_SELF_VER2_CF_COUNTER);

List<ResponseTime> responseTimeList = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper);
List<ResponseTime> responseTimeList = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_SELF_VER2, scan, rowKeyDistributorByHashPrefix, responseTimeMapper, MAP_STATISTICS_SELF_VER2_NUM_PARTITIONS);
if (logger.isDebugEnabled()) {
logger.debug("Self data {}", responseTimeList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
@Repository
public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {

private static final int MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS = 32;

private Logger logger = LoggerFactory.getLogger(this.getClass());
private int scanCacheSize = 40;
private boolean backwardCompatibility = false;
Expand Down Expand Up @@ -106,7 +108,7 @@ public LinkDataMap selectCallee(Application calleeApplication, Range range) {
// find distributed key - ver2.
final Scan scan = createScan(calleeApplication, range, HBaseTables.MAP_STATISTICS_CALLER_VER2_CF_COUNTER);
ResultsExtractor<LinkDataMap> resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow));
LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLER_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor);
LinkDataMap linkDataMap = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_CALLER_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLER_VER2_NUM_PARTITIONS);
logger.debug("Callee data. {}, {}", linkDataMap, range);
if (linkDataMap != null && linkDataMap.size() > 0) {
return linkDataMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
@Repository
public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao {

private static final int MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS = 32;

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private int scanCacheSize = 40;
private boolean backwardCompatibility = false;
Expand Down Expand Up @@ -107,7 +109,7 @@ public LinkDataMap selectCaller(Application callerApplication, Range range) {
// find distributed key.
final Scan scan = createScan(callerApplication, range, HBaseTables.MAP_STATISTICS_CALLEE_VER2_CF_COUNTER);
ResultsExtractor<LinkDataMap> resultExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow));
LinkDataMap linkDataMap = hbaseOperations2.find(HBaseTables.MAP_STATISTICS_CALLEE_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor);
LinkDataMap linkDataMap = hbaseOperations2.findParallel(HBaseTables.MAP_STATISTICS_CALLEE_VER2, scan, rowKeyDistributorByHashPrefix, resultExtractor, MAP_STATISTICS_CALLEE_VER2_NUM_PARTITIONS);
logger.debug("Caller data. {}, {}", linkDataMap, range);
if(linkDataMap != null && linkDataMap.size() > 0) {
return linkDataMap;
Expand Down

0 comments on commit 67862a0

Please # to comment.