Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exponential backoff implementation to avoid over-polling resources #847

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions rts/src/main/java/eta/runtime/concurrent/Concurrent.java
Expand Up @@ -75,6 +75,7 @@ public static Closure takeMVar(StgContext context, MVar mvar) {
cap.blockedLoop();
val = mvar.tryTake();
} while (val == null);
cap.lastBlockCounter = 0;
} finally {
tso.whyBlocked = NotBlocked;
tso.blockInfo = null;
Expand All @@ -101,6 +102,7 @@ public static Closure readMVar(StgContext context, MVar mvar) {
cap.blockedLoop();
val = mvar.tryRead();
} while (val == null);
cap.lastBlockCounter = 0;
} finally {
tso.whyBlocked = NotBlocked;
tso.blockInfo = null;
Expand All @@ -127,6 +129,7 @@ public static void putMVar(StgContext context, MVar mvar, Closure val) {
cap.blockedLoop();
success = mvar.tryPut(val);
} while (!success);
cap.lastBlockCounter = 0;
} finally {
tso.blockInfo = null;
tso.whyBlocked = NotBlocked;
Expand Down Expand Up @@ -180,7 +183,7 @@ public static void yield(StgContext context) {
TSO tso = context.currentTSO;
tso.whyBlocked = BlockedOnYield;
tso.blockInfo = null;
cap.blockedLoop();
cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos());
}

/* In Eta, all the threads are bound, so this always returns true. */
Expand Down Expand Up @@ -288,6 +291,7 @@ public static Closure threadWaitFuture(StgContext context, Future future) {
}
cap.blockedLoop();
} while (!future.isDone());
cap.lastBlockCounter = 0;
Object exception = null;
Object result = null;
if (tso.blockInfo != null) {
Expand Down Expand Up @@ -357,7 +361,7 @@ public static void threadWaitIO(StgContext context, Channel channel, int ops) {
tso.whyBlocked = blocked;
tso.blockInfo = selectKey;
do {
cap.blockedLoop();
cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos());
} while (selectKey.isValid());
} catch (ClosedChannelException e) {
throw new RuntimeException("threadWaitIO: ClosedChannelException", e);
Expand Down
1 change: 1 addition & 0 deletions rts/src/main/java/eta/runtime/exception/Exception.java
Expand Up @@ -115,6 +115,7 @@ public static void killThread(StgContext context, TSO target, Closure exception)
do {
cap.blockedLoop();
} while (msg.isValid());
cap.lastBlockCounter = 0;
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions rts/src/main/java/eta/runtime/stg/Capability.java
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;

import java.lang.ref.WeakReference;

Expand Down Expand Up @@ -101,6 +102,8 @@ public static void setNumCapabilities(int n) {
public Deque<TSO> runQueue = new LinkedList<TSO>();
public int lastWorkSize;
public long lastBlockCheck;
public int lastBlockCounter = 0;
private ThreadLocalRandom tlr = ThreadLocalRandom.current();
public Deque<Message> inbox = new ConcurrentLinkedDeque<Message>();

/* MemoryManager related stuff */
Expand Down Expand Up @@ -163,8 +166,9 @@ public final Closure schedule(TSO tso) throws java.lang.Exception {
}

do {
blockedLoop(Runtime.getMinWorkerCapabilityIdleTimeNanos());
blockedLoop();
} while (blockedCapabilities.contains(this));
lastBlockCounter = 0;
continue;
}
}
Expand Down Expand Up @@ -595,7 +599,8 @@ public final void detectSTMDeadlock(WhyBlocked whyBlocked, Object blockInfo) {

/* Blocked Loop */
public final void blockedLoop() {
blockedLoop(Runtime.getMaxTSOBlockTimeNanos());
lastBlockCounter = lastBlockCounter % 10 + 1; /* should be plenty */
blockedLoop(Runtime.getMaxTSOBlockTimeNanos() * tlr.nextInt(0, 1 << lastBlockCounter));
}

public final void blockedLoop(long nanos) {
Expand Down
1 change: 1 addition & 0 deletions rts/src/main/java/eta/runtime/stm/STM.java
Expand Up @@ -139,6 +139,7 @@ public static Closure atomically(StgContext context, Closure code) {
cap.blockedLoop();
valid = trec.reWait(tso);
} while (valid);
cap.lastBlockCounter = 0;
}
/* If the transaction is invalid, retry. */
trec = TransactionRecord.start(null);
Expand Down
2 changes: 1 addition & 1 deletion rts/src/main/java/eta/runtime/thunk/Thunk.java
Expand Up @@ -110,7 +110,7 @@ public final void handleBlackHole(StgContext context) {
tso.whyBlocked = BlockedOnBlackHole;
tso.blockInfo = this;
}
cap.blockedLoop();
cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos());
}
}

Expand Down