Skip to content

Commit

Permalink
Merge pull request #1225 from koo-taejin/#560
Browse files Browse the repository at this point in the history
Implement realtime span delivery service. #560
  • Loading branch information
koo-taejin committed Nov 19, 2015
2 parents 520eb76 + 8c1cb10 commit ff494fd
Showing 1 changed file with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.navercorp.pinpoint.web.websocket;

import com.navercorp.pinpoint.common.util.PinpointThreadFactory;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
import com.navercorp.pinpoint.rpc.util.MapUtils;
import com.navercorp.pinpoint.rpc.util.StringUtils;
Expand Down Expand Up @@ -64,16 +65,19 @@ public class ActiveThreadCountHandler extends TextWebSocketHandler implements Pi
private static final String DEFAULT_REQUEST_MAPPING = "/agent/activeThread";
private final String requestMapping;

private Timer timer;
private final AtomicBoolean onTimerTask = new AtomicBoolean(false);

private Timer flushTimer;
private static final long DEFAULT_FLUSH_DELAY = 1000;
private static final long DEFAULT_MIN_FLUSH_DELAY = 500;
private final long flushDelay;

private Timer healthCheckTimer;
private static final long DEFAULT_HEALTH_CHECk_DELAY = 60 * 1000;
private final long healthCheckDelay;

private Timer reactiveTimer;

public ActiveThreadCountHandler(AgentService agentSerivce) {
this(DEFAULT_REQUEST_MAPPING, agentSerivce);
}
Expand All @@ -95,7 +99,10 @@ public ActiveThreadCountHandler(String requestMapping, AgentService agentSerivce

@Override
public void start() {
this.timer = TimerFactory.createHashedWheelTimer(ClassUtils.simpleClassName(this) + "-Timer", 100, TimeUnit.MILLISECONDS, 512);
PinpointThreadFactory threadFactory = new PinpointThreadFactory(ClassUtils.simpleClassName(this) + "-Timer", true);
this.flushTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
this.healthCheckTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
this.reactiveTimer = TimerFactory.createHashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
}

@Override
Expand All @@ -107,8 +114,16 @@ public void stop() {
}
aggregatorRepository.clear();

if (timer != null) {
timer.stop();
if (flushTimer != null) {
flushTimer.stop();
}

if (healthCheckTimer != null) {
healthCheckTimer.stop();
}

if (reactiveTimer != null) {
reactiveTimer.stop();
}
}

Expand All @@ -126,8 +141,8 @@ public void afterConnectionEstablished(WebSocketSession newSession) throws Excep
sessionRepository.add(newSession);
boolean turnOn = onTimerTask.compareAndSet(false, true);
if (turnOn) {
timer.newTimeout(new ActiveThreadTimerTask(), flushDelay, TimeUnit.MILLISECONDS);
timer.newTimeout(new HealthCheckTimerTask(), DEFAULT_HEALTH_CHECk_DELAY, TimeUnit.MILLISECONDS);
flushTimer.newTimeout(new ActiveThreadTimerTask(), flushDelay, TimeUnit.MILLISECONDS);
healthCheckTimer.newTimeout(new HealthCheckTimerTask(), DEFAULT_HEALTH_CHECk_DELAY, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -210,7 +225,7 @@ private void bindingResponseAggregator(WebSocketSession webSocketSession, String

PinpointWebSocketResponseAggregator responseAggregator = aggregatorRepository.get(applicationName);
if (responseAggregator == null) {
responseAggregator = new ActiveThreadCountResponseAggregator(applicationName, agentSerivce, timer);
responseAggregator = new ActiveThreadCountResponseAggregator(applicationName, agentSerivce, reactiveTimer);
responseAggregator.start();
aggregatorRepository.put(applicationName, responseAggregator);
}
Expand Down Expand Up @@ -254,14 +269,14 @@ public void run(Timeout timeout) throws Exception {
}
}
} finally {
if (timer != null && onTimerTask.get()) {
if (flushTimer != null && onTimerTask.get()) {
long execTime = System.currentTimeMillis() - startTime;

long nextFlushDelay = flushDelay - execTime;
if (nextFlushDelay < DEFAULT_MIN_FLUSH_DELAY) {
timer.newTimeout(this, DEFAULT_MIN_FLUSH_DELAY, TimeUnit.MILLISECONDS);
flushTimer.newTimeout(this, DEFAULT_MIN_FLUSH_DELAY, TimeUnit.MILLISECONDS);
} else {
timer.newTimeout(this, nextFlushDelay, TimeUnit.MILLISECONDS);
flushTimer.newTimeout(this, nextFlushDelay, TimeUnit.MILLISECONDS);
}
}
}
Expand Down Expand Up @@ -313,8 +328,8 @@ public void run(Timeout timeout) throws Exception {
session.sendMessage(pingMessage);
}
} finally {
if (timer != null && onTimerTask.get()) {
timer.newTimeout(this, healthCheckDelay, TimeUnit.MILLISECONDS);
if (healthCheckTimer != null && onTimerTask.get()) {
healthCheckTimer.newTimeout(this, healthCheckDelay, TimeUnit.MILLISECONDS);
}
}
}
Expand Down

0 comments on commit ff494fd

Please # to comment.