Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add flow control support to publisher #119

Merged
merged 2 commits into from Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.CredentialsProvider;
Expand Down Expand Up @@ -55,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -108,6 +111,8 @@ public class Publisher {
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

private MessageFlowController flowController = null;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
return 1000L;
Expand All @@ -122,6 +127,16 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
if (flowControl != null
&& flowControl.getLimitExceededBehavior() != FlowController.LimitExceededBehavior.Ignore) {
this.flowController =
new MessageFlowController(
flowControl.getMaxOutstandingElementCount(),
flowControl.getMaxOutstandingRequestBytes(),
flowControl.getLimitExceededBehavior());
}

this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;

Expand Down Expand Up @@ -221,6 +236,19 @@ public ApiFuture<String> publish(PubsubMessage message) {

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));

if (flowController != null) {
try {
flowController.acquire(outstandingPublish.messageSize);
} catch (Exception e) {
kamalaboulhosn marked this conversation as resolved.
Show resolved Hide resolved
if (!orderingKey.isEmpty()) {
sequentialExecutor.stopPublish(orderingKey);
}
outstandingPublish.publishResult.setException(e);
return outstandingPublish.publishResult;
}
}

List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
Expand Down Expand Up @@ -454,7 +482,7 @@ public ApiFuture<PublishResponse> call() {
ApiFutures.addCallback(future, futureCallback, directExecutor());
}

private static final class OutstandingBatch {
private final class OutstandingBatch {
final List<OutstandingPublish> outstandingPublishes;
final long creationTime;
int attempt;
Expand Down Expand Up @@ -484,14 +512,21 @@ private List<PubsubMessage> getMessages() {

private void onFailure(Throwable t) {
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
if (flowController != null) {
flowController.release(outstandingPublish.messageSize);
}
outstandingPublish.publishResult.setException(t);
}
}

private void onSuccess(Iterable<String> results) {
Iterator<OutstandingPublish> messagesResultsIt = outstandingPublishes.iterator();
for (String messageId : results) {
messagesResultsIt.next().publishResult.set(messageId);
OutstandingPublish nextPublish = messagesResultsIt.next();
if (flowController != null) {
flowController.release(nextPublish.messageSize);
}
nextPublish.publishResult.set(messageId);
}
}
}
Expand Down Expand Up @@ -602,6 +637,10 @@ public static final class Builder {
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build())
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
Expand Down Expand Up @@ -759,7 +798,137 @@ public Publisher build() throws IOException {
}
}

private static class MessagesBatch {
private static class MessageFlowController {
private final Lock lock;
private final Long messageLimit;
private final Long byteLimit;
private final FlowController.LimitExceededBehavior limitBehavior;

private Long outstandingMessages;
private Long outstandingBytes;
private LinkedList<CountDownLatch> awaitingMessageAcquires;
private LinkedList<CountDownLatch> awaitingBytesAcquires;

MessageFlowController(
Long messageLimit, Long byteLimit, FlowController.LimitExceededBehavior limitBehavior) {
this.messageLimit = messageLimit;
this.byteLimit = byteLimit;
this.limitBehavior = limitBehavior;
this.lock = new ReentrantLock();

this.outstandingMessages = 0L;
this.outstandingBytes = 0L;

this.awaitingMessageAcquires = new LinkedList<CountDownLatch>();
this.awaitingBytesAcquires = new LinkedList<CountDownLatch>();
}

void acquire(long messageSize) throws FlowController.FlowControlException {
lock.lock();
try {
if (outstandingMessages >= messageLimit
&& limitBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit);
}
if (outstandingBytes + messageSize >= byteLimit
&& limitBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit);
}

// We can acquire or we should wait until we can acquire.
// Start by acquiring a slot for a message.
CountDownLatch messageWaiter = null;
while (outstandingMessages >= messageLimit) {
if (messageWaiter == null) {
// This message gets added to the back of the line.
messageWaiter = new CountDownLatch(1);
awaitingMessageAcquires.addLast(messageWaiter);
} else {
// This message already in line stays at the head of the line.
messageWaiter = new CountDownLatch(1);
awaitingMessageAcquires.removeFirst();
kamalaboulhosn marked this conversation as resolved.
Show resolved Hide resolved
awaitingMessageAcquires.addFirst(messageWaiter);
}
lock.unlock();
try {
messageWaiter.await();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens");
}
lock.lock();
}
++outstandingMessages;
if (messageWaiter != null) {
awaitingMessageAcquires.removeFirst();
}

// There may be some surplus messages left; let the next message waiting for a token have
// one.
if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) {
awaitingMessageAcquires.getFirst().countDown();
}

// Now acquire space for bytes.
CountDownLatch bytesWaiter = null;
Long bytesRemaining = messageSize;
while (outstandingBytes + bytesRemaining >= byteLimit) {
// Take what is available.
Long available = byteLimit - outstandingBytes;
bytesRemaining -= available;
outstandingBytes = byteLimit;
if (bytesWaiter == null) {
// This message gets added to the back of the line.
bytesWaiter = new CountDownLatch(1);
awaitingBytesAcquires.addLast(bytesWaiter);
} else {
// This message already in line stays at the head of the line.
bytesWaiter = new CountDownLatch(1);
awaitingBytesAcquires.removeFirst();
awaitingBytesAcquires.addFirst(bytesWaiter);
}
lock.unlock();
try {
bytesWaiter.await();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens");
}
lock.lock();
}

outstandingBytes += bytesRemaining;
if (bytesWaiter != null) {
awaitingBytesAcquires.removeFirst();
}
// There may be some surplus bytes left; let the next message waiting for bytes have some.
if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) {
awaitingBytesAcquires.getFirst().countDown();
}
} finally {
lock.unlock();
}
}

private void notifyNextAcquires() {
if (!awaitingMessageAcquires.isEmpty()) {
CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst();
awaitingAcquire.countDown();
}
if (!awaitingBytesAcquires.isEmpty()) {
CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst();
awaitingAcquire.countDown();
}
}

void release(long messageSize) {
lock.lock();
--outstandingMessages;
outstandingBytes -= messageSize;
notifyNextAcquires();
lock.unlock();
}
}

private class MessagesBatch {
private List<OutstandingPublish> messages;
private int batchedBytes;
private String orderingKey;
Expand Down
Expand Up @@ -247,6 +247,10 @@ void resumePublish(String key) {
keysWithErrors.remove(key);
}

void stopPublish(String key) {
keysWithErrors.add(key);
}

/** Cancels every task in the queue associated with {@code key}. */
private void cancelQueuedTasks(final String key, Throwable e) {
keysWithErrors.add(key);
Expand Down