Skip to content

Commit

Permalink
fix: Avoid setting error on response future twice (#261)
Browse files Browse the repository at this point in the history
* fix:Avoid setting error to future twice

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

* test fix
  • Loading branch information
yirutang committed May 8, 2020
1 parent 5b99ddd commit 35ef0ed
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
Expand Up @@ -250,10 +250,15 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
* @throws IOException
*/
public void refreshAppend() throws IOException, InterruptedException {
LOG.info("Establish a write connection.");
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer.");
if (shutdown.get()) {
LOG.warning("Cannot refresh on a already shutdown writer.");
return;
}
// There could be a moment, stub is not yet initialized.
if (clientStream != null) {
LOG.info("Closing the stream");
clientStream.closeSend();
}
messagesBatch.resetAttachSchema();
Expand Down Expand Up @@ -348,13 +353,15 @@ private static final class InflightBatch {
final List<AppendRequestAndFutureResponse> inflightRequests;
// A list tracks expected offset for each AppendRequest. Used to reconstruct the Response
// future.
final ArrayList<Long> offsetList;
final long creationTime;
int attempt;
long batchSizeBytes;
long expectedOffset;
Boolean attachSchema;
String streamName;
private final ArrayList<Long> offsetList;
private final long creationTime;
private int attempt;
private long batchSizeBytes;
private long expectedOffset;
private Boolean attachSchema;
private String streamName;

private final AtomicBoolean failed;

InflightBatch(
List<AppendRequestAndFutureResponse> inflightRequests,
Expand All @@ -376,6 +383,7 @@ private static final class InflightBatch {
this.batchSizeBytes = batchSizeBytes;
this.attachSchema = attachSchema;
this.streamName = streamName;
this.failed = new AtomicBoolean(false);
}

int count() {
Expand Down Expand Up @@ -417,6 +425,13 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
}

private void onFailure(Throwable t) {
if (failed.getAndSet(true)) {
// Error has been set already.
LOG.warning("Ignore " + t.toString() + " since error has already been set");
return;
} else {
LOG.fine("Setting " + t.toString() + " on response");
}
for (AppendRequestAndFutureResponse request : inflightRequests) {
request.appendResult.setException(t);
}
Expand Down Expand Up @@ -838,8 +853,10 @@ public void onError(Throwable t) {
}
inflightBatch.onFailure(t);
try {
// Establish a new connection.
streamWriter.refreshAppend();
if (!streamWriter.shutdown.get()) {
// Establish a new connection.
streamWriter.refreshAppend();
}
} catch (IOException | InterruptedException e) {
LOG.info("Failed to establish a new connection");
}
Expand Down
Expand Up @@ -424,18 +424,25 @@ public void testFlowControlBehaviorException() throws Exception {
.build())
.build())
.build()) {
assertEquals(
1L,
writer
.getBatchingSettings()
.getFlowControlSettings()
.getMaxOutstandingElementCount()
.longValue());

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
// Wait is necessary for response to be scheduled before timer is advanced.
Thread.sleep(5000L);
fakeExecutor.advanceTime(Duration.ofSeconds(10));
try {
appendFuture2.get();
Assert.fail("This should fail");
} catch (Exception e) {
if (!e.getMessage().equals("The maximum number of batch elements: 1 have been reached.")) {
LOG.info("More error info:");
e.printStackTrace();
}
assertEquals(
"java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.",
e.toString());
Expand Down Expand Up @@ -505,6 +512,7 @@ public void testStreamReconnectionExceedRetry() throws Exception {
.setMaxAttempts(1)
.build())
.build();
assertEquals(1, writer.getRetrySettings().getMaxAttempts());
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
Expand Down Expand Up @@ -818,7 +826,7 @@ public void testAwaitTermination() throws Exception {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
assertTrue(writer.awaitTermination(2, TimeUnit.MINUTES));
}

@Test
Expand Down

0 comments on commit 35ef0ed

Please sign in to comment.