Skip to content

Commit

Permalink
SOLR-16843: Replace timeNs by epochTimeNs in most of autoscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre Salagnac committed Oct 4, 2023
1 parent 43739e2 commit 72bf807
Show file tree
Hide file tree
Showing 28 changed files with 59 additions and 59 deletions.
Expand Up @@ -413,7 +413,7 @@ private AutoScalingConfig handleSuspendTrigger(SolrQueryRequest req, SolrQueryRe
if (timeout != null) {
try {
int timeoutSeconds = parseHumanTime(timeout);
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS)
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getEpochTimeNs(), TimeUnit.NANOSECONDS)
+ TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
} catch (IllegalArgumentException e) {
op.addError("Invalid 'timeout' value for suspend trigger: " + triggerName);
Expand Down
Expand Up @@ -353,7 +353,7 @@ public void run() {
return;
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();

// now check thresholds

Expand Down
Expand Up @@ -159,7 +159,7 @@ public void run() {
values.forEach((tag, rate) -> rates.computeIfAbsent(node, s -> (Number) rate));
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Number> hotNodes = rates.entrySet().stream()
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
Expand Down
Expand Up @@ -93,7 +93,7 @@ public void init() throws Exception {
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
log.debug("Adding node from marker path: {}", n);
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -192,7 +192,7 @@ public void run() {
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = cloudManager.getTimeSource().getTimeNs();
long eventTime = cloudManager.getTimeSource().getEpochTimeNs();
log.debug("Tracking new node: {} at time {}", n, eventTime);
nodeNameVsTimeAdded.put(n, eventTime);
});
Expand All @@ -204,7 +204,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeAdded);
Expand All @@ -215,7 +215,7 @@ public void run() {
if (processor != null) {
if (log.isDebugEnabled()) {
log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
nodeNames, times, cloudManager.getTimeSource().getTimeNs());
nodeNames, times, cloudManager.getTimeSource().getEpochTimeNs());
}
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp, replicaType))) {
// remove from tracking set only if the fire was accepted
Expand Down
Expand Up @@ -90,7 +90,7 @@ public void init() throws Exception {
if (!lastLiveNodes.contains(n) && !nodeNameVsTimeRemoved.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeRemoved for a node may also be restored
log.debug("Adding lost node from marker path: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -184,7 +184,7 @@ public void run() {
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
log.debug("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
});

// has enough time expired to trigger events for a node?
Expand All @@ -194,7 +194,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
if (te >= getWaitForSecond()) {
nodeNames.add(nodeName);
Expand Down
Expand Up @@ -192,7 +192,7 @@ public void run() {
}
// Even though we are skipping the event, we need to notify any listeners of the IGNORED stage
// so we create a dummy event with the ignored=true flag and ScheduledTriggers will do the rest
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli(), true))) {
lastRunAt = nextRunTime;
return;
Expand All @@ -204,7 +204,7 @@ public void run() {
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
nextRunTime, now);
}
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli()))) {
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
}
Expand Down
Expand Up @@ -405,7 +405,7 @@ public void run() {
});
});
}
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
Map<String, Double> hotNodes = new HashMap<>();
Map<String, Double> coldNodes = new HashMap<>();

Expand Down
Expand Up @@ -53,7 +53,7 @@ public TriggerEventQueue(SolrCloudManager cloudManager, String triggerName, Stat
}

public boolean offerEvent(TriggerEvent event) {
event.getProperties().put(ENQUEUE_TIME, timeSource.getTimeNs());
event.getProperties().put(ENQUEUE_TIME, timeSource.getEpochTimeNs());
try {
byte[] data = Utils.toJSON(event);
delegate.offer(data);
Expand Down Expand Up @@ -116,7 +116,7 @@ public TriggerEvent pollEvent() {

private TriggerEvent fromMap(Map<String, Object> map) {
TriggerEvent res = TriggerEvent.fromMap(map);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs());
res.getProperties().put(DEQUEUE_TIME, timeSource.getEpochTimeNs());
return res;
}
}
Expand Up @@ -183,7 +183,7 @@ public void setAsyncId(String asyncId) {
List<CollectionAdminRequest.AsyncCollectionAdminRequest> operations = Lists.asList(moveReplica, new CollectionAdminRequest.AsyncCollectionAdminRequest[]{mockRequest});
NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent
(TriggerEventType.NODELOST, "mock_trigger_name",
Collections.singletonList(cloudManager.getTimeSource().getTimeNs()),
Collections.singletonList(cloudManager.getTimeSource().getEpochTimeNs()),
Collections.singletonList(sourceNodeName),
CollectionParams.CollectionAction.MOVEREPLICA.toLower());
ActionContext actionContext = new ActionContext(survivor.getCoreContainer().getZkController().getSolrCloudManager(), null,
Expand Down
Expand Up @@ -117,7 +117,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Expand Up @@ -119,7 +119,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Expand Up @@ -179,7 +179,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -244,7 +244,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public void testSplitConfig() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -154,7 +154,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
CapturedEvent ev = listenerEvents.get("srt").get(0);
long now = timeSource.getTimeNs();
long now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
ev = listenerEvents.get("srt").get(0);
now = timeSource.getTimeNs();
now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand All @@ -210,7 +210,7 @@ public static class MetricAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
try {
long currentTimeNanos = context.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = context.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand All @@ -236,7 +236,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
lst.add(new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message));

}
}
Expand Down
Expand Up @@ -164,7 +164,7 @@ public void testNodeAddedTriggerRestoreState() throws Exception {
// since we know the nodeAdded event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

//
// now replace the trigger with a new instance to test that the state gets copied over correctly
Expand Down Expand Up @@ -282,7 +282,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -97,7 +97,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testTrigger() throws Exception {
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testRestoreState() throws Exception {
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {
//the processor may get called 2 times, for newly added node and initial nodes
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -180,7 +180,7 @@ public void testNodeLostTriggerRestoreState() throws Exception {
// since we know the nodeLost event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

// even though our trigger has detected a lost node, the *action* we registered should not have
// been run yet, due to the large waitFor configuration...
Expand Down Expand Up @@ -317,7 +317,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -99,7 +99,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testRestoreState() throws Exception {
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -143,7 +143,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down

0 comments on commit 72bf807

Please sign in to comment.