Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add reconnect support to v1 client lib. #1446

Merged
merged 10 commits into from Dec 20, 2021
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:24.0.0')
implementation platform('com.google.cloud:libraries-bom:24.1.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.6.3'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.7.0'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.6.3"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.7.0"
```

## Authentication
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
Expand Down Expand Up @@ -90,6 +91,26 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private long inflightBytes = 0;

/*
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
* count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
* connection. Keep track of the number of reconnections in succession. This will be reset if
* a row is successfully called back.
*/
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

/*
* If false, streamConnection needs to be reset.
*/
@GuardedBy("lock")
private boolean streamConnectionIsConnected = false;

/*
* Retry threshold, limits how often the connection is retried before processing halts.
*/
private static final long RETRY_THRESHOLD = 3;

/*
* Indicates whether user has called Close() or not.
*/
Expand Down Expand Up @@ -173,6 +194,18 @@ private StreamWriter(Builder builder) throws IOException {
this.ownsBigQueryWriteClient = false;
}

this.appendThread =
new Thread(
new Runnable() {
@Override
public void run() {
appendLoop();
}
});
this.appendThread.start();
}

private void resetConnection() {
this.streamConnection =
new StreamConnection(
this.client,
Expand All @@ -188,15 +221,6 @@ public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
this.appendThread =
new Thread(
new Runnable() {
@Override
public void run() {
appendLoop();
}
});
this.appendThread.start();
}

/**
Expand Down Expand Up @@ -331,12 +355,27 @@ public void close() {
* It takes requests from waiting queue and sends them to server.
*/
private void appendLoop() {
boolean isFirstRequestInConnection = true;
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
boolean streamNeedsConnecting = false;
// Set firstRequestInConnection to true immediately after connecting the steam,
// indicates then next row sent, needs the schema and other metadata.
boolean isFirstRequestInConnection = true;
while (!waitingQueueDrained()) {
this.lock.lock();
try {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
// Copy the streamConnectionIsConnected guarded by lock to a local variable.
// In addition, only reconnect if there is a retriable error.
streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null;
if (streamNeedsConnecting) {
// If the stream connection is broken, any requests on inflightRequestQueue will need
// to be resent, as the new connection has no knowledge of the requests. Copy the requests
// from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be
// prepended as they need to be sent before new requests.
while (!inflightRequestQueue.isEmpty()) {
waitingRequestQueue.addFirst(inflightRequestQueue.pollLast());
}
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
this.inflightRequestQueue.addLast(requestWrapper);
Expand All @@ -355,12 +394,34 @@ private void appendLoop() {
if (localQueue.isEmpty()) {
continue;
}

// TODO: Add reconnection here.
if (streamNeedsConnecting) {
// Set streamConnectionIsConnected to true, to indicate the stream has been connected. This
// should happen before the call to resetConnection. As it is unknown when the connection
// could be closed and the doneCallback called, and thus clearing the flag.
lock.lock();
try {
this.streamConnectionIsConnected = true;
} finally {
lock.unlock();
}
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata.
isFirstRequestInConnection = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest preparedRequest =
prepareRequestBasedOnPosition(
localQueue.pollFirst().message, isFirstRequestInConnection);
// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
// Otherwise send will return:
// SUCCESS: Message was sent, wait for the callback.
// STREAM_CLOSED: Stream was closed, normally or due to en error
// NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota.
// TODO: Handle NOT_ENOUGH_QUOTA.
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
this.streamConnection.send(preparedRequest);
isFirstRequestInConnection = false;
}
Expand All @@ -369,8 +430,10 @@ private void appendLoop() {
log.fine("Cleanup starts. Stream: " + streamName);
// At this point, the waiting queue is drained, so no more requests.
// We can close the stream connection and handle the remaining inflight requests.
this.streamConnection.close();
waitForDoneCallback();
if (streamConnection != null) {
this.streamConnection.close();
waitForDoneCallback();
}

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
log.fine(
Expand Down Expand Up @@ -455,6 +518,12 @@ private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
// successful records sent, will cause the stream to fail.
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
requestWrapper = pollInflightRequestQueue();
} finally {
this.lock.unlock();
Expand All @@ -476,6 +545,14 @@ private void requestCallback(AppendRowsResponse response) {
}
}

private boolean isRetriableError(Throwable t) {
Status status = Status.fromThrowable(t);
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
return status.getCode() == Status.Code.ABORTED || status.getCode() == Status.Code.UNAVAILABLE;
}

private void doneCallback(Throwable finalStatus) {
log.fine(
"Received done callback. Stream: "
Expand All @@ -484,7 +561,26 @@ private void doneCallback(Throwable finalStatus) {
+ finalStatus.toString());
this.lock.lock();
try {
this.connectionFinalStatus = finalStatus;
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus)
&& conectionRetryCountWithoutCallback < RETRY_THRESHOLD
&& !userClosed) {
this.conectionRetryCountWithoutCallback++;
log.fine(
"Retriable error "
+ finalStatus.toString()
+ " received, retry count "
+ conectionRetryCountWithoutCallback
+ " for stream "
+ streamName);
} else {
this.connectionFinalStatus = finalStatus;
log.info(
"Stream finished with error " + finalStatus.toString() + " for stream " + streamName);
}
}
} finally {
this.lock.unlock();
}
Expand Down
Expand Up @@ -79,14 +79,22 @@ public void reset() {
serviceImpl.reset();
}

public void setResponseDelay(Duration delay) {
serviceImpl.setResponseDelay(delay);
}

public void setResponseSleep(Duration sleep) {
serviceImpl.setResponseSleep(sleep);
}

public void setCloseEveryNAppends(long closeAfter) {
serviceImpl.setCloseEveryNAppends(closeAfter);
}

public void setTimesToClose(long numberTimesToClose) {
serviceImpl.setTimesToClose(numberTimesToClose);
}

public long getConnectionCount() {
return serviceImpl.getConnectionCount();
}

public void setExecutor(ScheduledExecutorService executor) {
serviceImpl.setExecutor(executor);
}
Expand Down
Expand Up @@ -17,6 +17,7 @@

import com.google.common.base.Optional;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -45,11 +46,16 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
private Duration responseDelay = Duration.ZERO;

private Duration responseSleep = Duration.ZERO;
private Semaphore responseSemaphore = new Semaphore(0, true);

private long numberTimesToClose = 0;
private long closeAfter = 0;
private long recordCount = 0;
private long connectionCount = 0;
private boolean firstRecord = false;

/** Class used to save the state of a possible response. */
private static class Response {
Optional<AppendRowsResponse> appendResponse;
Expand Down Expand Up @@ -120,38 +126,51 @@ public void waitForResponseScheduled() throws InterruptedException {
responseSemaphore.acquire();
}

/* Return the number of times the stream was connected. */
public long getConnectionCount() {
return connectionCount;
}

@Override
public StreamObserver<AppendRowsRequest> appendRows(
final StreamObserver<AppendRowsResponse> responseObserver) {
this.connectionCount++;
this.firstRecord = true;
StreamObserver<AppendRowsRequest> requestObserver =
new StreamObserver<AppendRowsRequest>() {
@Override
public void onNext(AppendRowsRequest value) {
LOG.fine("Get request:" + value.toString());
final Response response = responses.remove();
requests.add(value);
recordCount++;
if (responseSleep.compareTo(Duration.ZERO) > 0) {
LOG.info("Sleeping before response for " + responseSleep.toString());
LOG.fine("Sleeping before response for " + responseSleep.toString());
Uninterruptibles.sleepUninterruptibly(
responseSleep.toMillis(), TimeUnit.MILLISECONDS);
}
if (responseDelay == Duration.ZERO) {
sendResponse(response, responseObserver);
if (firstRecord) {
if (!value.getProtoRows().hasWriterSchema() || value.getWriteStream().isEmpty()) {
LOG.info(
String.valueOf(
!value.getProtoRows().hasWriterSchema()
|| value.getWriteStream().isEmpty()));
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("Unexpected first request: " + value.toString())
.asException());
return;
}
}
firstRecord = false;
if (closeAfter > 0
&& recordCount % closeAfter == 0
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else {
final Response responseToSend = response;
// TODO(yirutang): This is very wrong because it messes up response/complete ordering.
LOG.fine("Schedule a response to be sent at delay");
executor.schedule(
new Runnable() {
@Override
public void run() {
sendResponse(responseToSend, responseObserver);
}
},
responseDelay.toMillis(),
TimeUnit.MILLISECONDS);
final Response response = responses.remove();
sendResponse(response, responseObserver);
}
responseSemaphore.release();
}

@Override
Expand Down Expand Up @@ -183,12 +202,6 @@ public FakeBigQueryWriteImpl setExecutor(ScheduledExecutorService executor) {
return this;
}

/** Set an amount of time by which to delay publish responses. */
public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) {
this.responseDelay = responseDelay;
return this;
}

/** Set an amount of time by which to sleep before publishing responses. */
public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) {
this.responseSleep = responseSleep;
Expand Down Expand Up @@ -231,4 +244,29 @@ public void reset() {
requests.clear();
responses.clear();
}

/* Abort the stream after N records. The primary use case is to test the retry logic. After N
* records are sent, the stream will be aborted with Code.ABORTED. This is a retriable error.
* The abort will call the onDone callback immediately, and thus potentially losing some messages
* that have already been sent. If the value of closeAfter is too small, the client might not get
* a chance to process any records before a subsequent abort is sent. Which means multiple retries
* in a row on the client side. After 3 retries in a row the write will fail.
* closeAfter should be large enough to give the client some opportunity to receive some of the
* messages.
**/
public void setCloseEveryNAppends(long closeAfter) {
this.closeAfter = closeAfter;
}
/* If setCloseEveryNAppends is greater than 0, then the stream will be aborted every N appends.
* setTimesToClose will limit the number of times to do the abort. If it is set to 0, it will
* abort every N appends.
* The primary use cases is, send a couple of records, then abort. But if there are only a couple
* of records, it is possible these two records are sent, then the abort happens before those two
* records are processed by the client, requiring them to be sent again, and thus a potential
* infinite loop. Therefore set the times to close to 1. This will send the two records, force
* an abort an retry, and then reprocess the records to completion.
**/
public void setTimesToClose(long numberTimesToClose) {
this.numberTimesToClose = numberTimesToClose;
}
}