diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml new file mode 100644 index 0000000000..c1328feea2 --- /dev/null +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -0,0 +1,15 @@ + + + + + + 7009 + com/google/cloud/bigquery/storage/v1alpha2/StreamWriter + void shutdown() + + + 7009 + com/google/cloud/bigquery/storage/v1alpha2/StreamWriter + boolean awaitTermination(long, java.util.concurrent.TimeUnit) + + \ No newline at end of file diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index 06f753b6c4..a82486276e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -164,14 +164,10 @@ private StreamWriter(Builder builder) Instant.ofEpochSecond( stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos()); if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) { - backgroundResources.shutdown(); - backgroundResources.awaitTermination(1, TimeUnit.MINUTES); throw new IllegalStateException( "Cannot write to a stream that is already committed: " + streamName); } if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) { - backgroundResources.shutdown(); - backgroundResources.awaitTermination(1, TimeUnit.MINUTES); throw new IllegalStateException( "Cannot write to a stream that is already expired: " + streamName); } @@ -360,7 +356,7 @@ private void writeBatch(final InflightBatch inflightBatch) { /** Close the stream writer. Shut down all resources. */ @Override public void close() { - LOG.info("Closing stream writer"); + LOG.info("Closing stream writer:" + streamName); shutdown(); try { awaitTermination(1, TimeUnit.MINUTES); @@ -512,10 +508,12 @@ public RetrySettings getRetrySettings() { * should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no * pending messages are lost. */ - public void shutdown() { - Preconditions.checkState( - !shutdown.getAndSet(true), "Cannot shut down a writer already shut-down."); - LOG.info("Shutdown called on writer"); + protected void shutdown() { + if (shutdown.getAndSet(true)) { + LOG.fine("Already shutdown."); + return; + } + LOG.fine("Shutdown called on writer"); if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { currentAlarmFuture.cancel(false); } @@ -535,7 +533,7 @@ public void shutdown() { * *

Call this method to make sure all resources are freed properly. */ - public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { return backgroundResources.awaitTermination(duration, unit); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 3bab929111..372b4ad97f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -294,8 +294,7 @@ public void testWriteByShutdown() throws Exception { // Note we are not advancing time or reaching the count threshold but messages should // still get written by call to shutdown - writer.shutdown(); - writer.awaitTermination(10, TimeUnit.SECONDS); + writer.close(); // Verify the appends completed assertTrue(appendFuture1.isDone()); @@ -407,8 +406,7 @@ public void run() { // Wait is necessary for response to be scheduled before timer is advanced. fakeExecutor.advanceTime(Duration.ofSeconds(10)); t.join(); - writer.shutdown(); - writer.awaitTermination(1, TimeUnit.MINUTES); + writer.close(); } @Test @@ -527,8 +525,7 @@ public void testStreamReconnectionExceedRetry() throws Exception { } catch (ExecutionException e) { assertEquals(transientError.toString(), e.getCause().getCause().toString()); } - writer.shutdown(); - assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); + writer.close(); } @Test @@ -643,7 +640,7 @@ public void testWriterGetters() throws Exception { .getFlowControlSettings() .getMaxOutstandingRequestBytes() .longValue()); - writer.shutdown(); + writer.close(); } @Test @@ -823,29 +820,6 @@ public void testBuilderInvalidArguments() { } } - @Test - public void testAwaitTermination() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder("projects/p/datasets/d/tables/t/streams/await").build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"AWAIT"}); - writer.shutdown(); - assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); - } - - @Test - public void testClose() throws Exception { - StreamWriter writer = getTestStreamWriterBuilder().build(); - writer.close(); - try { - writer.shutdown(); - fail("Should throw"); - } catch (IllegalStateException e) { - LOG.info(e.toString()); - assertEquals("Cannot shut down a writer already shut-down.", e.getMessage()); - } - } - @Test public void testExistingClient() throws Exception { BigQueryWriteSettings settings =