From 8c1cb10d801b861102f48390a4047bee3eb0f0ed Mon Sep 17 00:00:00 2001 From: "koo.taejin" Date: Thu, 19 Nov 2015 11:17:22 +0900 Subject: [PATCH] Implement realtime span delivery service. #560 reduce timer delay to use more timer. --- .../websocket/ActiveThreadCountHandler.java | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java b/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java index a71f4699e230..749a8cc15438 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java @@ -19,6 +19,7 @@ package com.navercorp.pinpoint.web.websocket; +import com.navercorp.pinpoint.common.util.PinpointThreadFactory; import com.navercorp.pinpoint.rpc.util.ClassUtils; import com.navercorp.pinpoint.rpc.util.MapUtils; import com.navercorp.pinpoint.rpc.util.StringUtils; @@ -64,16 +65,19 @@ public class ActiveThreadCountHandler extends TextWebSocketHandler implements Pi private static final String DEFAULT_REQUEST_MAPPING = "/agent/activeThread"; private final String requestMapping; - private Timer timer; private final AtomicBoolean onTimerTask = new AtomicBoolean(false); + private Timer flushTimer; private static final long DEFAULT_FLUSH_DELAY = 1000; private static final long DEFAULT_MIN_FLUSH_DELAY = 500; private final long flushDelay; + private Timer healthCheckTimer; private static final long DEFAULT_HEALTH_CHECk_DELAY = 60 * 1000; private final long healthCheckDelay; + private Timer reactiveTimer; + public ActiveThreadCountHandler(AgentService agentSerivce) { this(DEFAULT_REQUEST_MAPPING, agentSerivce); } @@ -95,7 +99,10 @@ public ActiveThreadCountHandler(String requestMapping, AgentService agentSerivce @Override public void start() { - this.timer = TimerFactory.createHashedWheelTimer(ClassUtils.simpleClassName(this) + "-Timer", 100, TimeUnit.MILLISECONDS, 512); + PinpointThreadFactory threadFactory = new PinpointThreadFactory(ClassUtils.simpleClassName(this) + "-Timer", true); + this.flushTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512); + this.healthCheckTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512); + this.reactiveTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512); } @Override @@ -107,8 +114,16 @@ public void stop() { } aggregatorRepository.clear(); - if (timer != null) { - timer.stop(); + if (flushTimer != null) { + flushTimer.stop(); + } + + if (healthCheckTimer != null) { + healthCheckTimer.stop(); + } + + if (reactiveTimer != null) { + reactiveTimer.stop(); } } @@ -126,8 +141,8 @@ public void afterConnectionEstablished(WebSocketSession newSession) throws Excep sessionRepository.add(newSession); boolean turnOn = onTimerTask.compareAndSet(false, true); if (turnOn) { - timer.newTimeout(new ActiveThreadTimerTask(), flushDelay, TimeUnit.MILLISECONDS); - timer.newTimeout(new HealthCheckTimerTask(), DEFAULT_HEALTH_CHECk_DELAY, TimeUnit.MILLISECONDS); + flushTimer.newTimeout(new ActiveThreadTimerTask(), flushDelay, TimeUnit.MILLISECONDS); + healthCheckTimer.newTimeout(new HealthCheckTimerTask(), DEFAULT_HEALTH_CHECk_DELAY, TimeUnit.MILLISECONDS); } } @@ -210,7 +225,7 @@ private void bindingResponseAggregator(WebSocketSession webSocketSession, String PinpointWebSocketResponseAggregator responseAggregator = aggregatorRepository.get(applicationName); if (responseAggregator == null) { - responseAggregator = new ActiveThreadCountResponseAggregator(applicationName, agentSerivce, timer); + responseAggregator = new ActiveThreadCountResponseAggregator(applicationName, agentSerivce, reactiveTimer); responseAggregator.start(); aggregatorRepository.put(applicationName, responseAggregator); } @@ -254,14 +269,14 @@ public void run(Timeout timeout) throws Exception { } } } finally { - if (timer != null && onTimerTask.get()) { + if (flushTimer != null && onTimerTask.get()) { long execTime = System.currentTimeMillis() - startTime; long nextFlushDelay = flushDelay - execTime; if (nextFlushDelay < DEFAULT_MIN_FLUSH_DELAY) { - timer.newTimeout(this, DEFAULT_MIN_FLUSH_DELAY, TimeUnit.MILLISECONDS); + flushTimer.newTimeout(this, DEFAULT_MIN_FLUSH_DELAY, TimeUnit.MILLISECONDS); } else { - timer.newTimeout(this, nextFlushDelay, TimeUnit.MILLISECONDS); + flushTimer.newTimeout(this, nextFlushDelay, TimeUnit.MILLISECONDS); } } } @@ -313,8 +328,8 @@ public void run(Timeout timeout) throws Exception { session.sendMessage(pingMessage); } } finally { - if (timer != null && onTimerTask.get()) { - timer.newTimeout(this, healthCheckDelay, TimeUnit.MILLISECONDS); + if (healthCheckTimer != null && onTimerTask.get()) { + healthCheckTimer.newTimeout(this, healthCheckDelay, TimeUnit.MILLISECONDS); } } }