Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: StreamWriter hang when we reach the inflight limit control and is doing a retry #799

Merged
merged 6 commits into from Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -922,41 +922,43 @@ public void onError(Throwable t) {
}
inflightBatch = this.inflightBatches.poll();
}
try {
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
LOG.info("Resending requests on transient error:" + streamWriter.currentRetries);
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} catch (IOException | InterruptedException e) {
LOG.info("Got exception while retrying.");
inflightBatch.onFailure(e);
abortInflightRequests(e);
LOG.info(
"Try to reestablish connection due to transient error: "
+ t.toString()
+ " retry times: "
+ streamWriter.currentRetries);
streamWriter.refreshAppend();
LOG.info("Resending requests on after connection established");
streamWriter.writeBatch(inflightBatch);
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
} catch (IOException | InterruptedException e) {
LOG.info("Got exception while retrying: " + e.toString());
inflightBatch.onFailure(e);
abortInflightRequests(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add test coverage for catching these lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I tried this morning but couldn't figure out a good way to hit the code. I will play around a little more, but could we proceed with this first. Datalfow is waiting for a fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
}
};
Expand Down
Expand Up @@ -519,21 +519,31 @@ public void testStreamReconnectionTransient() throws Exception {
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.setFlowControlSettings(
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build())
.build())
.build();

StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
// Retry is scheduled to be 7 seconds later.
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
future1.get();
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
writer.close();
}

Expand Down