Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16843: Replace timeNs by epochTimeNs in most of autoscaling #2679

Open
wants to merge 1 commit into
base: branch_8_11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -413,7 +413,7 @@ private AutoScalingConfig handleSuspendTrigger(SolrQueryRequest req, SolrQueryRe
if (timeout != null) {
try {
int timeoutSeconds = parseHumanTime(timeout);
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS)
resumeTime = new Date(TimeUnit.MILLISECONDS.convert(timeSource.getEpochTimeNs(), TimeUnit.NANOSECONDS)
+ TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
} catch (IllegalArgumentException e) {
op.addError("Invalid 'timeout' value for suspend trigger: " + triggerName);
Expand Down
Expand Up @@ -353,7 +353,7 @@ public void run() {
return;
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();

// now check thresholds

Expand Down
Expand Up @@ -159,7 +159,7 @@ public void run() {
values.forEach((tag, rate) -> rates.computeIfAbsent(node, s -> (Number) rate));
}

long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Number> hotNodes = rates.entrySet().stream()
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
Expand Down
Expand Up @@ -93,7 +93,7 @@ public void init() throws Exception {
if (lastLiveNodes.contains(n) && !nodeNameVsTimeAdded.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeAdded for a node may also be restored
log.debug("Adding node from marker path: {}", n);
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -192,7 +192,7 @@ public void run() {
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = cloudManager.getTimeSource().getTimeNs();
long eventTime = cloudManager.getTimeSource().getEpochTimeNs();
log.debug("Tracking new node: {} at time {}", n, eventTime);
nodeNameVsTimeAdded.put(n, eventTime);
});
Expand All @@ -204,7 +204,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeAdded);
Expand All @@ -215,7 +215,7 @@ public void run() {
if (processor != null) {
if (log.isDebugEnabled()) {
log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
nodeNames, times, cloudManager.getTimeSource().getTimeNs());
nodeNames, times, cloudManager.getTimeSource().getEpochTimeNs());
}
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp, replicaType))) {
// remove from tracking set only if the fire was accepted
Expand Down
Expand Up @@ -90,7 +90,7 @@ public void init() throws Exception {
if (!lastLiveNodes.contains(n) && !nodeNameVsTimeRemoved.containsKey(n)) {
// since {@code #restoreState(AutoScaling.Trigger)} is called first, the timeRemoved for a node may also be restored
log.debug("Adding lost node from marker path: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
}
});
} catch (NoSuchElementException e) {
Expand Down Expand Up @@ -184,7 +184,7 @@ public void run() {
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
log.debug("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getEpochTimeNs());
});

// has enough time expired to trigger events for a node?
Expand All @@ -194,7 +194,7 @@ public void run() {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
if (te >= getWaitForSecond()) {
nodeNames.add(nodeName);
Expand Down
Expand Up @@ -192,7 +192,7 @@ public void run() {
}
// Even though we are skipping the event, we need to notify any listeners of the IGNORED stage
// so we create a dummy event with the ignored=true flag and ScheduledTriggers will do the rest
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli(), true))) {
lastRunAt = nextRunTime;
return;
Expand All @@ -204,7 +204,7 @@ public void run() {
log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
nextRunTime, now);
}
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getEpochTimeNs(),
preferredOp, now.toEpochMilli()))) {
lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
}
Expand Down
Expand Up @@ -405,7 +405,7 @@ public void run() {
});
});
}
long now = cloudManager.getTimeSource().getTimeNs();
long now = cloudManager.getTimeSource().getEpochTimeNs();
Map<String, Double> hotNodes = new HashMap<>();
Map<String, Double> coldNodes = new HashMap<>();

Expand Down
Expand Up @@ -53,7 +53,7 @@ public TriggerEventQueue(SolrCloudManager cloudManager, String triggerName, Stat
}

public boolean offerEvent(TriggerEvent event) {
event.getProperties().put(ENQUEUE_TIME, timeSource.getTimeNs());
event.getProperties().put(ENQUEUE_TIME, timeSource.getEpochTimeNs());
try {
byte[] data = Utils.toJSON(event);
delegate.offer(data);
Expand Down Expand Up @@ -116,7 +116,7 @@ public TriggerEvent pollEvent() {

private TriggerEvent fromMap(Map<String, Object> map) {
TriggerEvent res = TriggerEvent.fromMap(map);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs());
res.getProperties().put(DEQUEUE_TIME, timeSource.getEpochTimeNs());
return res;
}
}
Expand Up @@ -183,7 +183,7 @@ public void setAsyncId(String asyncId) {
List<CollectionAdminRequest.AsyncCollectionAdminRequest> operations = Lists.asList(moveReplica, new CollectionAdminRequest.AsyncCollectionAdminRequest[]{mockRequest});
NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent
(TriggerEventType.NODELOST, "mock_trigger_name",
Collections.singletonList(cloudManager.getTimeSource().getTimeNs()),
Collections.singletonList(cloudManager.getTimeSource().getEpochTimeNs()),
Collections.singletonList(sourceNodeName),
CollectionParams.CollectionAction.MOVEREPLICA.toLower());
ActionContext actionContext = new ActionContext(survivor.getCoreContainer().getZkController().getSolrCloudManager(), null,
Expand Down
Expand Up @@ -117,7 +117,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Expand Up @@ -119,7 +119,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down
Expand Up @@ -179,7 +179,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -244,7 +244,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
CapturedEvent ev = new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message);
log.info("=======> {}", ev);
lst.add(ev);
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public void testSplitConfig() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long currentTimeNanos = timeSource.getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -154,7 +154,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
CapturedEvent ev = listenerEvents.get("srt").get(0);
long now = timeSource.getTimeNs();
long now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testMetricTrigger() throws Exception {
Thread.sleep(2000);
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
ev = listenerEvents.get("srt").get(0);
now = timeSource.getTimeNs();
now = timeSource.getEpochTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
assertEquals(collectionName, ev.event.getProperties().get("collection"));
Expand All @@ -210,7 +210,7 @@ public static class MetricAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
try {
long currentTimeNanos = context.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = context.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand All @@ -236,7 +236,7 @@ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager,
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
lst.add(new CapturedEvent(timeSource.getEpochTimeNs(), context, config, stage, actionName, event, message));

}
}
Expand Down
Expand Up @@ -164,7 +164,7 @@ public void testNodeAddedTriggerRestoreState() throws Exception {
// since we know the nodeAdded event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

//
// now replace the trigger with a new instance to test that the state gets copied over correctly
Expand Down Expand Up @@ -282,7 +282,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -97,7 +97,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testTrigger() throws Exception {
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testRestoreState() throws Exception {
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {
//the processor may get called 2 times, for newly added node and initial nodes
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -180,7 +180,7 @@ public void testNodeLostTriggerRestoreState() throws Exception {
// since we know the nodeLost event has been detected, we can recored the current timestamp
// (relative to the cluster's time source) and later assert that (restored state) correctly
// tracked that the event happened prior to "now"
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
final long maxEventTimeNs = cloudManager.getTimeSource().getEpochTimeNs();

// even though our trigger has detected a lost node, the *action* we registered should not have
// been run yet, due to the large waitFor configuration...
Expand Down Expand Up @@ -317,7 +317,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -99,7 +99,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testTrigger() throws Exception {
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testRestoreState() throws Exception {
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long currentTimeNanos = cloudManager.getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down
Expand Up @@ -143,7 +143,7 @@ public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
events.add(event);
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getTimeNs();
long currentTimeNanos = actionContext.getCloudManager().getTimeSource().getEpochTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
Expand Down