From bdb8e0f6b0b8ab9b1e2e92d6e41ea3298964dd3e Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 7 Apr 2020 09:18:42 -0700 Subject: [PATCH] fix: waitForTermination in the manual client #140 (#141) * fix: waitForTermination * . * . * fix: waitForTermination * reword * . --- .../storage/v1alpha2/StreamWriter.java | 83 +++++---- .../storage/v1alpha2/StreamWriterTest.java | 161 +++++++++--------- .../it/ITBigQueryWriteManualClientTest.java | 7 - 3 files changed, 120 insertions(+), 131 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index 921fb2be83..5b8a07177b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -282,9 +282,10 @@ private void writeBatch(final InflightBatch inflightBatch) { @Override public void close() { shutdown(); - // There is some problem waiting for resource to shutdown. So comment this statement out since - // it will cause a minute hang. - // awaitTermination(1, TimeUnit.MINUTES); + try { + awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException ignored) { + } } // The batch of messages that is being sent/processed. @@ -423,6 +424,9 @@ public void shutdown() { } writeAllOutstanding(); messagesWaiter.waitComplete(); + if (clientStream.isSendReady()) { + clientStream.closeSend(); + } backgroundResources.shutdown(); } @@ -735,7 +739,7 @@ public void onComplete() { @Override public void onError(Throwable t) { - LOG.info("OnError called"); + LOG.fine("OnError called"); if (streamWriter.shutdown.get()) { return; } @@ -747,39 +751,33 @@ public void onError(Throwable t) { } inflightBatch = this.inflightBatches.poll(); } - if (isRecoverableError(t)) { - try { - if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() - && !streamWriter.shutdown.get()) { - streamWriter.refreshAppend(); - // Currently there is a bug that it took reconnected stream 5 seconds to pick up - // stream count. So wait at least 5 seconds before sending a new request. - Thread.sleep( - Math.min( - streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(), - Duration.ofSeconds(5).toMillis())); - streamWriter.writeBatch(inflightBatch); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries++; - } - } else { - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; + try { + if (isRecoverableError(t)) { + try { + if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() + && !streamWriter.shutdown.get()) { + streamWriter.refreshAppend(); + // Currently there is a bug that it took reconnected stream 5 seconds to pick up + // stream count. So wait at least 5 seconds before sending a new request. + Thread.sleep( + Math.min( + streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(), + Duration.ofSeconds(5).toMillis())); + streamWriter.writeBatch(inflightBatch); + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries++; + } + } else { + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries = 0; + } + inflightBatch.onFailure(t); } - inflightBatch.onFailure(t); - } - } catch (IOException | InterruptedException e) { - streamWriter.currentRetries = 0; - inflightBatch.onFailure(e); - synchronized (streamWriter.messagesWaiter) { - streamWriter.messagesWaiter.incrementPendingCount(-1); - streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); - streamWriter.messagesWaiter.notifyAll(); + } catch (IOException | InterruptedException e) { + streamWriter.currentRetries = 0; + inflightBatch.onFailure(e); } - } - } else { - LOG.info("Set error response"); - try { + } else { synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } @@ -788,16 +786,15 @@ public void onError(Throwable t) { // Establish a new connection. streamWriter.refreshAppend(); } catch (IOException e) { - LOG.info("Failed to establish a new connection, shutdown writer"); - streamWriter.shutdown(); - } - } finally { - synchronized (streamWriter.messagesWaiter) { - streamWriter.messagesWaiter.incrementPendingCount(-1); - streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); - streamWriter.messagesWaiter.notifyAll(); + LOG.info("Failed to establish a new connection"); } } + } finally { + synchronized (streamWriter.messagesWaiter) { + streamWriter.messagesWaiter.incrementPendingCount(-1); + streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); + streamWriter.messagesWaiter.notifyAll(); + } } } }; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 324599a05a..11bbd66010 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -273,7 +273,6 @@ public void testWriteByShutdown() throws Exception { // still get written by call to shutdown writer.shutdown(); - LOG.info("Wait for termination"); writer.awaitTermination(10, TimeUnit.SECONDS); // Verify the appends completed @@ -285,7 +284,7 @@ public void testWriteByShutdown() throws Exception { @Test public void testWriteMixedSizeAndDuration() throws Exception { - StreamWriter writer = + try (StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS @@ -293,52 +292,53 @@ public void testWriteMixedSizeAndDuration() throws Exception { .setElementCountThreshold(2L) .setDelayThreshold(Duration.ofSeconds(5)) .build()) - .build(); + .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - fakeExecutor.advanceTime(Duration.ofSeconds(2)); - assertFalse(appendFuture1.isDone()); + fakeExecutor.advanceTime(Duration.ofSeconds(2)); + assertFalse(appendFuture1.isDone()); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B", "C"}); + ApiFuture appendFuture2 = + sendTestMessage(writer, new String[] {"B", "C"}); - // Write triggered by batch size - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Write triggered by batch size + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); - assertFalse(appendFuture3.isDone()); + assertFalse(appendFuture3.isDone()); - // Write triggered by time - fakeExecutor.advanceTime(Duration.ofSeconds(5)); + // Write triggered by time + fakeExecutor.advanceTime(Duration.ofSeconds(5)); - assertEquals(2L, appendFuture3.get().getOffset()); + assertEquals(2L, appendFuture3.get().getOffset()); - assertEquals( - 3, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - writer.shutdown(); + assertEquals( + 3, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + } } @Test @@ -386,11 +386,12 @@ public void run() { fakeExecutor.advanceTime(Duration.ofSeconds(10)); t.join(); writer.shutdown(); + writer.awaitTermination(1, TimeUnit.MINUTES); } @Test public void testFlowControlBehaviorException() throws Exception { - StreamWriter writer = + try (StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS @@ -404,18 +405,18 @@ public void testFlowControlBehaviorException() throws Exception { FlowController.LimitExceededBehavior.ThrowException) .build()) .build()) - .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - try { - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - Assert.fail("This should fail"); - } catch (IllegalStateException e) { - assertEquals("FlowControl limit exceeded: Element count", e.getMessage()); + .build()) { + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + try { + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + Assert.fail("This should fail"); + } catch (IllegalStateException e) { + assertEquals("FlowControl limit exceeded: Element count", e.getMessage()); + } + assertEquals(1L, appendFuture1.get().getOffset()); } - assertEquals(1L, appendFuture1.get().getOffset()); - writer.shutdown(); } @Test @@ -474,51 +475,52 @@ public void testStreamReconnection() throws Exception { } catch (ExecutionException e) { assertEquals(transientError.toString(), e.getCause().getCause().toString()); } + writer.shutdown(); + assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); } @Test public void testOffset() throws Exception { - StreamWriter writer = + try (StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) .build()) - .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); - AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); - ApiFuture appendFuture1 = writer.append(request1); - AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); - ApiFuture appendFuture2 = writer.append(request2); - AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L); - ApiFuture appendFuture3 = writer.append(request3); - AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); - ApiFuture appendFuture4 = writer.append(request4); - assertEquals(10L, appendFuture1.get().getOffset()); - assertEquals(11L, appendFuture2.get().getOffset()); - assertEquals(13L, appendFuture3.get().getOffset()); - assertEquals(15L, appendFuture4.get().getOffset()); - writer.shutdown(); + .build()) { + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); + ApiFuture appendFuture1 = writer.append(request1); + AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); + ApiFuture appendFuture2 = writer.append(request2); + AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L); + ApiFuture appendFuture3 = writer.append(request3); + AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); + ApiFuture appendFuture4 = writer.append(request4); + assertEquals(10L, appendFuture1.get().getOffset()); + assertEquals(11L, appendFuture2.get().getOffset()); + assertEquals(13L, appendFuture3.get().getOffset()); + assertEquals(15L, appendFuture4.get().getOffset()); + } } @Test public void testOffsetMismatch() throws Exception { - StreamWriter writer = + try (StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) .build()) - .build(); + .build()) { + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); + ApiFuture appendFuture1 = writer.append(request1); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); - AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); - ApiFuture appendFuture1 = writer.append(request1); - try { appendFuture1.get(); fail("Should throw exception"); } catch (Exception e) { @@ -526,12 +528,11 @@ public void testOffsetMismatch() throws Exception { "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", e.getCause().toString()); } - writer.shutdown(); } @Test public void testErrorPropagation() throws Exception { - StreamWriter writer = + try (StreamWriter writer = getTestStreamWriterBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -540,9 +541,8 @@ public void testErrorPropagation() throws Exception { .setElementCountThreshold(1L) .setDelayThreshold(Duration.ofSeconds(5)) .build()) - .build(); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - try { + .build()) { + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); sendTestMessage(writer, new String[] {"A"}).get(); fail("should throw exception"); } catch (ExecutionException e) { @@ -783,8 +783,7 @@ public void testAwaitTermination() throws Exception { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); writer.shutdown(); - // TODO: for some reason, await always returns false. - // assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); } @Test diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index 4b5976f45b..ba07e2b5b9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -199,10 +199,6 @@ public void testBatchWriteWithCommittedStream() assertEquals("ccc", iter.next().get(0).getStringValue()); assertEquals("ddd", iter.next().get(0).getStringValue()); assertEquals(false, iter.hasNext()); - - LOG.info("Waiting for termination"); - // The awaitTermination always returns false. - // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS)); } @Test @@ -229,7 +225,6 @@ public void testComplicateSchemaWithPendingStream() .setOffset(Int64Value.of(1L)) .build()); assertEquals(1, response2.get().getOffset()); - } finally { } // Nothing showed up since rows are not committed. @@ -333,7 +328,6 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec .setOffset(Int64Value.of(0L)) .build()); assertEquals(0L, response.get().getOffset()); - } finally { } try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { @@ -345,7 +339,6 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec .setOffset(Int64Value.of(1L)) .build()); assertEquals(1L, response.get().getOffset()); - } finally { } } }