Skip to content

Commit

Permalink
fix: waitForTermination in the manual client #140 (#141)
Browse files Browse the repository at this point in the history
* fix: waitForTermination

* .

* .

* fix: waitForTermination

* reword

* .
  • Loading branch information
yirutang committed Apr 7, 2020
1 parent dbe270f commit bdb8e0f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 131 deletions.
Expand Up @@ -282,9 +282,10 @@ private void writeBatch(final InflightBatch inflightBatch) {
@Override
public void close() {
shutdown();
// There is some problem waiting for resource to shutdown. So comment this statement out since
// it will cause a minute hang.
// awaitTermination(1, TimeUnit.MINUTES);
try {
awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException ignored) {
}
}

// The batch of messages that is being sent/processed.
Expand Down Expand Up @@ -423,6 +424,9 @@ public void shutdown() {
}
writeAllOutstanding();
messagesWaiter.waitComplete();
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
backgroundResources.shutdown();
}

Expand Down Expand Up @@ -735,7 +739,7 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
LOG.info("OnError called");
LOG.fine("OnError called");
if (streamWriter.shutdown.get()) {
return;
}
Expand All @@ -747,39 +751,33 @@ public void onError(Throwable t) {
}
inflightBatch = this.inflightBatches.poll();
}
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
// Currently there is a bug that it took reconnected stream 5 seconds to pick up
// stream count. So wait at least 5 seconds before sending a new request.
Thread.sleep(
Math.min(
streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
Duration.ofSeconds(5).toMillis()));
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} else {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
try {
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
// Currently there is a bug that it took reconnected stream 5 seconds to pick up
// stream count. So wait at least 5 seconds before sending a new request.
Thread.sleep(
Math.min(
streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
Duration.ofSeconds(5).toMillis()));
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} else {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
inflightBatch.onFailure(t);
}
inflightBatch.onFailure(t);
}
} catch (IOException | InterruptedException e) {
streamWriter.currentRetries = 0;
inflightBatch.onFailure(e);
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
} catch (IOException | InterruptedException e) {
streamWriter.currentRetries = 0;
inflightBatch.onFailure(e);
}
}
} else {
LOG.info("Set error response");
try {
} else {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
Expand All @@ -788,16 +786,15 @@ public void onError(Throwable t) {
// Establish a new connection.
streamWriter.refreshAppend();
} catch (IOException e) {
LOG.info("Failed to establish a new connection, shutdown writer");
streamWriter.shutdown();
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
LOG.info("Failed to establish a new connection");
}
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
}
}
}
};
Expand Down
Expand Up @@ -273,7 +273,6 @@ public void testWriteByShutdown() throws Exception {
// still get written by call to shutdown

writer.shutdown();
LOG.info("Wait for termination");
writer.awaitTermination(10, TimeUnit.SECONDS);

// Verify the appends completed
Expand All @@ -285,60 +284,61 @@ public void testWriteByShutdown() throws Exception {

@Test
public void testWriteMixedSizeAndDuration() throws Exception {
StreamWriter writer =
try (StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(5))
.build())
.build();
.build()) {

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});

fakeExecutor.advanceTime(Duration.ofSeconds(2));
assertFalse(appendFuture1.isDone());
fakeExecutor.advanceTime(Duration.ofSeconds(2));
assertFalse(appendFuture1.isDone());

ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B", "C"});
ApiFuture<AppendRowsResponse> appendFuture2 =
sendTestMessage(writer, new String[] {"B", "C"});

// Write triggered by batch size
assertEquals(0L, appendFuture1.get().getOffset());
assertEquals(1L, appendFuture2.get().getOffset());
// Write triggered by batch size
assertEquals(0L, appendFuture1.get().getOffset());
assertEquals(1L, appendFuture2.get().getOffset());

ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"D"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"D"});

assertFalse(appendFuture3.isDone());
assertFalse(appendFuture3.isDone());

// Write triggered by time
fakeExecutor.advanceTime(Duration.ofSeconds(5));
// Write triggered by time
fakeExecutor.advanceTime(Duration.ofSeconds(5));

assertEquals(2L, appendFuture3.get().getOffset());
assertEquals(2L, appendFuture3.get().getOffset());

assertEquals(
3,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
writer.shutdown();
assertEquals(
3,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
}
}

@Test
Expand Down Expand Up @@ -386,11 +386,12 @@ public void run() {
fakeExecutor.advanceTime(Duration.ofSeconds(10));
t.join();
writer.shutdown();
writer.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testFlowControlBehaviorException() throws Exception {
StreamWriter writer =
try (StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
Expand All @@ -404,18 +405,18 @@ public void testFlowControlBehaviorException() throws Exception {
FlowController.LimitExceededBehavior.ThrowException)
.build())
.build())
.build();

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
try {
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Assert.fail("This should fail");
} catch (IllegalStateException e) {
assertEquals("FlowControl limit exceeded: Element count", e.getMessage());
.build()) {

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
try {
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Assert.fail("This should fail");
} catch (IllegalStateException e) {
assertEquals("FlowControl limit exceeded: Element count", e.getMessage());
}
assertEquals(1L, appendFuture1.get().getOffset());
}
assertEquals(1L, appendFuture1.get().getOffset());
writer.shutdown();
}

@Test
Expand Down Expand Up @@ -474,64 +475,64 @@ public void testStreamReconnection() throws Exception {
} catch (ExecutionException e) {
assertEquals(transientError.toString(), e.getCause().getCause().toString());
}
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
public void testOffset() throws Exception {
StreamWriter writer =
try (StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.build())
.build();

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build());
AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(request1);
AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(request2);
AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(request3);
AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L);
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(request4);
assertEquals(10L, appendFuture1.get().getOffset());
assertEquals(11L, appendFuture2.get().getOffset());
assertEquals(13L, appendFuture3.get().getOffset());
assertEquals(15L, appendFuture4.get().getOffset());
writer.shutdown();
.build()) {

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build());
AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(request1);
AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(request2);
AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(request3);
AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L);
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(request4);
assertEquals(10L, appendFuture1.get().getOffset());
assertEquals(11L, appendFuture2.get().getOffset());
assertEquals(13L, appendFuture3.get().getOffset());
assertEquals(15L, appendFuture4.get().getOffset());
}
}

@Test
public void testOffsetMismatch() throws Exception {
StreamWriter writer =
try (StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.build())
.build();
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build());
AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(request1);

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build());
AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(request1);
try {
appendFuture1.get();
fail("Should throw exception");
} catch (Exception e) {
assertEquals(
"java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.",
e.getCause().toString());
}
writer.shutdown();
}

@Test
public void testErrorPropagation() throws Exception {
StreamWriter writer =
try (StreamWriter writer =
getTestStreamWriterBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setBatchingSettings(
Expand All @@ -540,9 +541,8 @@ public void testErrorPropagation() throws Exception {
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.build())
.build();
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
try {
.build()) {
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
sendTestMessage(writer, new String[] {"A"}).get();
fail("should throw exception");
} catch (ExecutionException e) {
Expand Down Expand Up @@ -783,8 +783,7 @@ public void testAwaitTermination() throws Exception {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
writer.shutdown();
// TODO: for some reason, await always returns false.
// assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
Expand Down

0 comments on commit bdb8e0f

Please sign in to comment.