Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Fix possible deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Mar 4, 2021
1 parent 830da70 commit 868e6aa
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 298 deletions.
Expand Up @@ -141,16 +141,16 @@ public BatcherImpl(
// to avoid deadlocking
if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(
flowController.getMaxOutstandingElementCount() == null
flowController.getMaxElementCountLimit() == null
|| batchingSettings.getElementCountThreshold() == null
|| flowController.getMaxOutstandingElementCount()
|| flowController.getMaxElementCountLimit()
>= batchingSettings.getElementCountThreshold(),
"If throttling and batching on element count are enabled, FlowController"
+ "#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
Preconditions.checkArgument(
flowController.getMaxOutstandingRequestBytes() == null
flowController.getMaxRequestBytesLimit() == null
|| batchingSettings.getRequestByteThreshold() == null
|| flowController.getMaxOutstandingRequestBytes()
|| flowController.getMaxRequestBytesLimit()
>= batchingSettings.getRequestByteThreshold(),
"If throttling and batching on request bytes are enabled, FlowController"
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
Expand Down
Expand Up @@ -35,6 +35,9 @@
/** A {@link Semaphore64} that blocks until permits become available. */
class BlockingSemaphore implements Semaphore64 {
private long currentPermits;
private long limit;
private Object updateLock = new Object();
private Object waitLock = new Object();

private static void checkNotNegative(long l) {
Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l);
Expand All @@ -43,37 +46,100 @@ private static void checkNotNegative(long l) {
BlockingSemaphore(long permits) {
checkNotNegative(permits);
this.currentPermits = permits;
this.limit = permits;
}

public synchronized void release(long permits) {
public void release(long permits) {
checkNotNegative(permits);

currentPermits += permits;
notifyAll();
synchronized (updateLock) {
// If more permits are returned then what was originally set, we need to add these extra
// permits to the limit too
currentPermits += permits;
if (currentPermits > limit) {
limit = currentPermits;
}
}
synchronized (waitLock) {
waitLock.notifyAll();
}
}

public synchronized boolean acquire(long permits) {
public boolean acquire(long permits) {
checkNotNegative(permits);

boolean interrupted = false;
while (currentPermits < permits) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
while (!interrupted) {
synchronized (waitLock) {
if (currentPermits < permits) {
try {
waitLock.wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
synchronized (updateLock) {
if (currentPermits < permits) {
continue;
}
currentPermits -= permits;
break;
}
}
currentPermits -= permits;

if (interrupted) {
Thread.currentThread().interrupt();
}
return true;
}

public synchronized void reducePermits(long reduction) {
checkNotNegative(reduction);
public boolean laxAcquire(long permits) {
checkNotNegative(permits);

boolean interrupted = false;
while (!interrupted) {
long toAcquire;
synchronized (updateLock) {
if (permits > limit) {
toAcquire = limit;
} else {
toAcquire = permits;
}
}

synchronized (waitLock) {
if (currentPermits < toAcquire) {
try {
waitLock.wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
}

currentPermits -= reduction;
// Give out permits as long as there are more currentPermits than the max of (limit, permits).
// currentPermits could be negative after the permits are given out, which marks how many
// permits are owed.
synchronized (updateLock) {
if (currentPermits < toAcquire) {
continue;
}
currentPermits -= permits;
break;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return true;
}

public void reducePermits(long reduction) {
checkNotNegative(reduction);
synchronized (updateLock) {
checkNotNegative(limit - reduction);
currentPermits -= reduction;
limit -= reduction;
}
}
}
Expand Up @@ -107,17 +107,48 @@ public abstract static class Builder {

public DynamicFlowControlSettings build() {
DynamicFlowControlSettings settings = autoBuild();
Preconditions.checkArgument(
(settings.getInitialOutstandingElementCount() != null
&& settings.getMinOutstandingElementCount() != null
&& settings.getMaxOutstandingElementCount() != null)
|| (settings.getInitialOutstandingElementCount() == null
&& settings.getMinOutstandingElementCount() == null
&& settings.getMaxOutstandingElementCount() == null),

verifyElementCountSettings(settings);
verifyRequestBytesSettings(settings);

return settings;
}

private void verifyElementCountSettings(DynamicFlowControlSettings settings) {
boolean isEnabled =
settings.getInitialOutstandingElementCount() != null
|| settings.getMinOutstandingElementCount() != null
|| settings.getMaxOutstandingElementCount() != null;
if (!isEnabled) {
return;
}
Preconditions.checkState(
settings.getInitialOutstandingElementCount() != null
&& settings.getMinOutstandingElementCount() != null
&& settings.getMaxOutstandingElementCount() != null,
"Throttling on element count is disabled by default. To enable this setting,"
+ " minOutstandingElementCount, initialOutstandingElementCount, and "
+ "maxOutstandingElementCount must all be set.");
Preconditions.checkArgument(
Preconditions.checkState(
settings.getMinOutstandingElementCount() > 0
&& settings.getInitialOutstandingElementCount()
<= settings.getMaxOutstandingElementCount()
&& settings.getInitialOutstandingElementCount()
>= settings.getMinOutstandingElementCount(),
"If throttling on element count is set, minOutstandingElementCount must be"
+ " greater than 0, and minOutstandingElementCount <= "
+ "initialOutstandingElementCount <= maxOutstandingElementCount");
}

private void verifyRequestBytesSettings(DynamicFlowControlSettings settings) {
boolean isEnabled =
settings.getInitialOutstandingRequestBytes() != null
|| settings.getMinOutstandingRequestBytes() != null
|| settings.getMaxOutstandingRequestBytes() != null;
if (!isEnabled) {
return;
}
Preconditions.checkState(
(settings.getInitialOutstandingRequestBytes() != null
&& settings.getMinOutstandingRequestBytes() != null
&& settings.getMaxOutstandingRequestBytes() != null)
Expand All @@ -127,29 +158,15 @@ public DynamicFlowControlSettings build() {
"Throttling on number of bytes is disabled by default. To enable this "
+ "setting, minOutstandingRequestBytes, initialOutstandingRequestBytes, and "
+ "maxOutstandingRequestBytes must all be set");
if (settings.getInitialOutstandingElementCount() != null) {
Preconditions.checkArgument(
settings.getMinOutstandingElementCount() > 0
&& settings.getInitialOutstandingElementCount()
<= settings.getMaxOutstandingElementCount()
&& settings.getInitialOutstandingElementCount()
>= settings.getMinOutstandingElementCount(),
"If throttling on element count is set, minOutstandingElementCount must be"
+ " greater than 0, and minOutstandingElementCount <= "
+ "initialOutstandingElementCount <= maxOutstandingElementCount");
}
if (settings.getInitialOutstandingRequestBytes() != null) {
Preconditions.checkArgument(
settings.getMinOutstandingRequestBytes() > 0
&& settings.getInitialOutstandingRequestBytes()
<= settings.getMaxOutstandingRequestBytes()
&& settings.getInitialOutstandingRequestBytes()
>= settings.getMinOutstandingRequestBytes(),
"If throttling on number of bytes is set, minOutstandingRequestBytes must "
+ "be greater than 0, and minOutstandingRequestBytes <= "
+ "initialOutstandingRequestBytes <= maxOutstandingRequestBytes");
}
return settings;
Preconditions.checkState(
settings.getMinOutstandingRequestBytes() > 0
&& settings.getInitialOutstandingRequestBytes()
<= settings.getMaxOutstandingRequestBytes()
&& settings.getInitialOutstandingRequestBytes()
>= settings.getMinOutstandingRequestBytes(),
"If throttling on number of bytes is set, minOutstandingRequestBytes must "
+ "be greater than 0, and minOutstandingRequestBytes <= "
+ "initialOutstandingRequestBytes <= maxOutstandingRequestBytes");
}
}
}

0 comments on commit 868e6aa

Please sign in to comment.