Skip to content

Commit

Permalink
SOLR-16843: Replace timeNs by epochTimeNs in most of autoscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
psalagnac committed Oct 4, 2023
1 parent 43739e2 commit 72bf807
Show file tree
Hide file tree
Showing 28 changed files with 59 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ private AutoScalingConfig handleSuspendTrigger(SolrQueryRequest req, SolrQueryRe
if (timeout != null) {
try {
int timeoutSeconds = parseHumanTime(timeout);
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS)
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getEpochTimeNs(), TimeUnit.NANOSECONDS)
+ TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
} catch (IllegalArgumentException e) {
op.addError("Invalid 'timeout' value for suspend trigger: " + triggerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public void run() {
return;
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();

// now check thresholds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void run() {
values.forEach((tag, rate) -> rates.computeIfAbsent(node, s -> (Number) rate));
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Number> hotNodes = rates.entrySet().stream()
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void init() throws Exception {
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
log.debug("Adding node from marker path: {}", n);
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -192,7 +192,7 @@ public void run() {
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = cloudManager.getTimeSource().getTimeNs();
long eventTime = cloudManager.getTimeSource().getEpochTimeNs();
log.debug("Tracking new node: {} at time {}", n, eventTime);
nodeNameVsTimeAdded.put(n, eventTime);
});
Expand All @@ -204,7 +204,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeAdded);
Expand All @@ -215,7 +215,7 @@ public void run() {
if (processor != null) {
if (log.isDebugEnabled()) {
log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
nodeNames, times, cloudManager.getTimeSource().getTimeNs());
nodeNames, times, cloudManager.getTimeSource().getEpochTimeNs());
}
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp, replicaType))) {
// remove from tracking set only if the fire was accepted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void init() throws Exception {
if (!lastLiveNodes.contains(n) && !nodeNameVsTimeRemoved.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeRemoved for a node may also be restored
log.debug("Adding lost node from marker path: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -184,7 +184,7 @@ public void run() {
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
log.debug("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
});

// has enough time expired to trigger events for a node?
Expand All @@ -194,7 +194,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
if (te >= getWaitForSecond()) {
nodeNames.add(nodeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void run() {
}
// Even though we are skipping the event, we need to notify any listeners of the IGNORED stage
// so we create a dummy event with the ignored=true flag and ScheduledTriggers will do the rest
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli(), true))) {
lastRunAt = nextRunTime;
return;
Expand All @@ -204,7 +204,7 @@ public void run() {
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
nextRunTime, now);
}
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli()))) {
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void run() {
});
});
}
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
Map<String, Double> hotNodes = new HashMap<>();
Map<String, Double> coldNodes = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TriggerEventQueue(SolrCloudManager cloudManager, String triggerName, Stat
}

public boolean offerEvent(TriggerEvent event) {
event.getProperties().put(ENQUEUE_TIME, timeSource.getTimeNs());
event.getProperties().put(ENQUEUE_TIME, timeSource.getEpochTimeNs());
try {
byte[] data = Utils.toJSON(event);
delegate.offer(data);
Expand Down Expand Up @@ -116,7 +116,7 @@ public TriggerEvent pollEvent() {

private TriggerEvent fromMap(Map<String, Object> map) {
TriggerEvent res = TriggerEvent.fromMap(map);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs());
res.getProperties().put(DEQUEUE_TIME, timeSource.getEpochTimeNs());
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void setAsyncId(String asyncId) {
List<CollectionAdminRequest.AsyncCollectionAdminRequest> operations = Lists.asList(moveReplica, new CollectionAdminRequest.AsyncCollectionAdminRequest[]{mockRequest});
NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent
(TriggerEventType.NODELOST, "mock_trigger_name",
Collections.singletonList(cloudManager.getTimeSource().getTimeNs()),
Collections.singletonList(cloudManager.getTimeSource().getEpochTimeNs()),
Collections.singletonList(sourceNodeName),
CollectionParams.CollectionAction.MOVEREPLICA.toLower());
ActionContext actionContext = new ActionContext(survivor.getCoreContainer().getZkController().getSolrCloudManager(), null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -244,7 +244,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public void testSplitConfig() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
CapturedEvent ev = listenerEvents.get("srt").get(0);
long now = timeSource.getTimeNs();
long now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
ev = listenerEvents.get("srt").get(0);
now = timeSource.getTimeNs();
now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand All @@ -210,7 +210,7 @@ public static class MetricAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
try {
long currentTimeNanos = context.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = context.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand All @@ -236,7 +236,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
lst.add(new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testNodeAddedTriggerRestoreState() throws Exception {
// since we know the nodeAdded event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

//
// now replace the trigger with a new instance to test that the state gets copied over correctly
Expand Down Expand Up @@ -282,7 +282,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testTrigger() throws Exception {
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testRestoreState() throws Exception {
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {
//the processor may get called 2 times, for newly added node and initial nodes
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testNodeLostTriggerRestoreState() throws Exception {
// since we know the nodeLost event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

// even though our trigger has detected a lost node, the *action* we registered should not have
// been run yet, due to the large waitFor configuration...
Expand Down Expand Up @@ -317,7 +317,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testRestoreState() throws Exception {
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Loading

0 comments on commit 72bf807

Please # to comment.