diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index cd27c741e5..b9dd306935 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -165,10 +165,14 @@ private StreamWriter(Builder builder) Instant.ofEpochSecond( stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos()); if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) { + backgroundResources.shutdown(); + backgroundResources.awaitTermination(1, TimeUnit.MINUTES); throw new IllegalStateException( "Cannot write to a stream that is already committed: " + streamName); } if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) { + backgroundResources.shutdown(); + backgroundResources.awaitTermination(1, TimeUnit.MINUTES); throw new IllegalStateException( "Cannot write to a stream that is already expired: " + streamName); } @@ -247,7 +251,7 @@ public ApiFuture append(AppendRowsRequest message) { */ public void refreshAppend() throws IOException, InterruptedException { synchronized (this) { - Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); + Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer."); // There could be a moment, stub is not yet initialized. if (clientStream != null) { clientStream.closeSend(); @@ -475,6 +479,7 @@ public RetrySettings getRetrySettings() { public void shutdown() { Preconditions.checkState( !shutdown.getAndSet(true), "Cannot shut down a writer already shut-down."); + LOG.info("Shutdown called on writer"); if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { currentAlarmFuture.cancel(false); } @@ -684,10 +689,6 @@ > getApiMaxInflightRequests()) { */ public Builder setRetrySettings(RetrySettings retrySettings) { Preconditions.checkNotNull(retrySettings); - Preconditions.checkArgument( - retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); - Preconditions.checkArgument( - retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); this.retrySettings = retrySettings; return this; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index 00c9a69050..68eb59d4af 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -18,6 +18,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.protobuf.Descriptors.Descriptor; import java.io.IOException; import java.util.concurrent.ConcurrentMap; @@ -53,6 +55,15 @@ private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact c writerCache = CacheBuilder.newBuilder() .maximumSize(maxTableEntry) + .removalListener( + new RemovalListener>() { + @Override + public void onRemoval( + RemovalNotification> + removalNotification) { + removalNotification.getValue().invalidateAll(); + } + }) .>build(); } @@ -135,6 +146,14 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema) tableEntry = CacheBuilder.newBuilder() .maximumSize(MAX_WRITERS_PER_TABLE) + .removalListener( + new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification removalNotification) { + removalNotification.getValue().close(); + } + }) .build(); writer = CreateNewWriter(streamName); tableEntry.put(userSchema, writer); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index f558503914..3dba7524d0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -33,6 +33,7 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.DataLossException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; @@ -439,7 +440,7 @@ public void testFlowControlBehaviorException() throws Exception { } @Test - public void testStreamReconnection() throws Exception { + public void testStreamReconnectionTransient() throws Exception { StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( @@ -450,7 +451,6 @@ public void testStreamReconnection() throws Exception { .build()) .build(); - // Case 1: Request succeeded after retry since the error is transient. StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); @@ -458,9 +458,20 @@ public void testStreamReconnection() throws Exception { assertEquals(false, future1.isDone()); // Retry is scheduled to be 7 seconds later. assertEquals(0L, future1.get().getOffset()); + writer.close(); + } - LOG.info("======CASE II"); - // Case 2 : Request failed since the error is not transient. + @Test + public void testStreamReconnectionPermanant() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100000)) + .setElementCountThreshold(1L) + .build()) + .build(); StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT); testBigQueryWrite.addException(permanentError); ApiFuture future2 = sendTestMessage(writer, new String[] {"m2"}); @@ -470,11 +481,26 @@ public void testStreamReconnection() throws Exception { } catch (ExecutionException e) { assertEquals(permanentError.toString(), e.getCause().getCause().toString()); } + writer.close(); + } - LOG.info("======CASE III"); - // Case 3: Failed after retried max retry times. - testBigQueryWrite.addException(transientError); - testBigQueryWrite.addException(transientError); + @Test + public void testStreamReconnectionExceedRetry() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100000)) + .setElementCountThreshold(1L) + .build()) + .setRetrySettings( + RetrySettings.newBuilder() + .setMaxRetryDelay(Duration.ofMillis(100)) + .setMaxAttempts(1) + .build()) + .build(); + StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); testBigQueryWrite.addException(transientError); ApiFuture future3 = sendTestMessage(writer, new String[] {"m3"}); @@ -814,5 +840,7 @@ public void testExistingClient() throws Exception { StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build(); writer.close(); assertFalse(client.isShutdown()); + client.shutdown(); + client.awaitTermination(1, TimeUnit.MINUTES); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java index 8f6f7a4bd0..eb249ddd39 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.junit.*; import org.junit.runner.RunWith; @@ -285,5 +286,11 @@ public void run() { } }); } + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } } }