Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make awaitTermination and shutdown protected, since we already h… #330

Merged
merged 1 commit into from Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!--TODO: To be removed-->
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<method>boolean awaitTermination(long, java.util.concurrent.TimeUnit) </method>
</difference>
</differences>
Expand Up @@ -164,14 +164,10 @@ private StreamWriter(Builder builder)
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
Expand Down Expand Up @@ -360,7 +356,7 @@ private void writeBatch(final InflightBatch inflightBatch) {
/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
LOG.info("Closing stream writer");
LOG.info("Closing stream writer:" + streamName);
shutdown();
try {
awaitTermination(1, TimeUnit.MINUTES);
Expand Down Expand Up @@ -512,10 +508,12 @@ public RetrySettings getRetrySettings() {
* should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no
* pending messages are lost.
*/
public void shutdown() {
Preconditions.checkState(
!shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
LOG.info("Shutdown called on writer");
protected void shutdown() {
if (shutdown.getAndSet(true)) {
LOG.fine("Already shutdown.");
return;
}
LOG.fine("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
Expand All @@ -535,7 +533,7 @@ public void shutdown() {
*
* <p>Call this method to make sure all resources are freed properly.
*/
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return backgroundResources.awaitTermination(duration, unit);
}

Expand Down
Expand Up @@ -294,8 +294,7 @@ public void testWriteByShutdown() throws Exception {
// Note we are not advancing time or reaching the count threshold but messages should
// still get written by call to shutdown

writer.shutdown();
writer.awaitTermination(10, TimeUnit.SECONDS);
writer.close();

// Verify the appends completed
assertTrue(appendFuture1.isDone());
Expand Down Expand Up @@ -407,8 +406,7 @@ public void run() {
// Wait is necessary for response to be scheduled before timer is advanced.
fakeExecutor.advanceTime(Duration.ofSeconds(10));
t.join();
writer.shutdown();
writer.awaitTermination(1, TimeUnit.MINUTES);
writer.close();
}

@Test
Expand Down Expand Up @@ -527,8 +525,7 @@ public void testStreamReconnectionExceedRetry() throws Exception {
} catch (ExecutionException e) {
assertEquals(transientError.toString(), e.getCause().getCause().toString());
}
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
writer.close();
}

@Test
Expand Down Expand Up @@ -643,7 +640,7 @@ public void testWriterGetters() throws Exception {
.getFlowControlSettings()
.getMaxOutstandingRequestBytes()
.longValue());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -823,29 +820,6 @@ public void testBuilderInvalidArguments() {
}
}

@Test
public void testAwaitTermination() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder("projects/p/datasets/d/tables/t/streams/await").build();
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"AWAIT"});
writer.shutdown();
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
}

@Test
public void testClose() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
writer.close();
try {
writer.shutdown();
fail("Should throw");
} catch (IllegalStateException e) {
LOG.info(e.toString());
assertEquals("Cannot shut down a writer already shut-down.", e.getMessage());
}
}

@Test
public void testExistingClient() throws Exception {
BigQueryWriteSettings settings =
Expand Down