Skip to content

Commit

Permalink
fix: make awaitTermination and shutdown protected, since we already h…
Browse files Browse the repository at this point in the history
…ave close() method, it is confusing to have 3 shutdown methods (#330)
  • Loading branch information
yirutang committed Jun 9, 2020
1 parent f553253 commit 8856288
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 40 deletions.
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!--TODO: To be removed-->
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<method>boolean awaitTermination(long, java.util.concurrent.TimeUnit) </method>
</difference>
</differences>
Expand Up @@ -164,14 +164,10 @@ private StreamWriter(Builder builder)
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
Expand Down Expand Up @@ -360,7 +356,7 @@ private void writeBatch(final InflightBatch inflightBatch) {
/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
LOG.info("Closing stream writer");
LOG.info("Closing stream writer:" + streamName);
shutdown();
try {
awaitTermination(1, TimeUnit.MINUTES);
Expand Down Expand Up @@ -512,10 +508,12 @@ public RetrySettings getRetrySettings() {
* should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no
* pending messages are lost.
*/
public void shutdown() {
Preconditions.checkState(
!shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
LOG.info("Shutdown called on writer");
protected void shutdown() {
if (shutdown.getAndSet(true)) {
LOG.fine("Already shutdown.");
return;
}
LOG.fine("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
Expand All @@ -535,7 +533,7 @@ public void shutdown() {
*
* <p>Call this method to make sure all resources are freed properly.
*/
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return backgroundResources.awaitTermination(duration, unit);
}

Expand Down
Expand Up @@ -294,8 +294,7 @@ public void testWriteByShutdown() throws Exception {
// Note we are not advancing time or reaching the count threshold but messages should
// still get written by call to shutdown

writer.shutdown();
writer.awaitTermination(10, TimeUnit.SECONDS);
writer.close();

// Verify the appends completed
assertTrue(appendFuture1.isDone());
Expand Down Expand Up @@ -407,8 +406,7 @@ public void run() {
// Wait is necessary for response to be scheduled before timer is advanced.
fakeExecutor.advanceTime(Duration.ofSeconds(10));
t.join();
writer.shutdown();
writer.awaitTermination(1, TimeUnit.MINUTES);
writer.close();
}

@Test
Expand Down Expand Up @@ -527,8 +525,7 @@ public void testStreamReconnectionExceedRetry() throws Exception {
} catch (ExecutionException e) {
assertEquals(transientError.toString(), e.getCause().getCause().toString());
}
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
writer.close();
}

@Test
Expand Down Expand Up @@ -643,7 +640,7 @@ public void testWriterGetters() throws Exception {
.getFlowControlSettings()
.getMaxOutstandingRequestBytes()
.longValue());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -823,29 +820,6 @@ public void testBuilderInvalidArguments() {
}
}

@Test
public void testAwaitTermination() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder("projects/p/datasets/d/tables/t/streams/await").build();
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"AWAIT"});
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
public void testClose() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
writer.close();
try {
writer.shutdown();
fail("Should throw");
} catch (IllegalStateException e) {
LOG.info(e.toString());
assertEquals("Cannot shut down a writer already shut-down.", e.getMessage());
}
}

@Test
public void testExistingClient() throws Exception {
BigQueryWriteSettings settings =
Expand Down

0 comments on commit 8856288

Please sign in to comment.