Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: StreamWriter hang when we reach the inflight limit control and is doing a retry #799

Merged
merged 6 commits into from Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -187,7 +187,14 @@ private StreamWriter(Builder builder)
this.onSchemaUpdateRunnable.setStreamWriter(this);
}

refreshAppend();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
try {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
}
}

/** Stream name we are writing to. */
Expand Down Expand Up @@ -296,9 +303,9 @@ public void flushAll(long timeoutMillis) throws Exception {
/**
* Re-establishes a stream connection.
*
* @throws IOException
* @throws InterruptedException
*/
public void refreshAppend() throws IOException, InterruptedException {
public void refreshAppend() throws InterruptedException {
appendAndRefreshAppendLock.lock();
if (shutdown.get()) {
LOG.warning("Cannot refresh on a already shutdown writer.");
Expand All @@ -313,11 +320,8 @@ public void refreshAppend() throws IOException, InterruptedException {
messagesBatch.resetAttachSchema();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
try {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
} catch (InterruptedException expected) {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
Thread.sleep(this.retrySettings.getInitialRetryDelay().toMillis());
// Can only unlock here since need to sleep the full 7 seconds before stream can allow appends.
Expand Down Expand Up @@ -922,41 +926,43 @@ public void onError(Throwable t) {
}
inflightBatch = this.inflightBatches.poll();
}
try {
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
LOG.info("Resending requests on transient error:" + streamWriter.currentRetries);
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} catch (IOException | InterruptedException e) {
LOG.info("Got exception while retrying.");
inflightBatch.onFailure(e);
abortInflightRequests(e);
LOG.info(
"Try to reestablish connection due to transient error: "
+ t.toString()
+ " retry times: "
+ streamWriter.currentRetries);
streamWriter.refreshAppend();
LOG.info("Resending requests on after connection established");
streamWriter.writeBatch(inflightBatch);
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
} catch (InterruptedException e) {
LOG.info("Got exception while retrying: " + e.toString());
inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED));
abortInflightRequests(new StatusRuntimeException(Status.ABORTED));
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
}
};
Expand Down
Expand Up @@ -855,7 +855,6 @@ public void testFlushAll() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
writer.flushAll(100000);

assertTrue(appendFuture3.isDone());
Expand Down
Expand Up @@ -519,21 +519,30 @@ public void testStreamReconnectionTransient() throws Exception {
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.setFlowControlSettings(
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build())
.build())
.build();

StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE));
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
// Retry is scheduled to be 7 seconds later.
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
future1.get();
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
writer.close();
}

Expand Down