diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java index 295638f74f..4b3032b615 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -102,4 +102,9 @@ public static void testSetStub( BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) { cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck); } + + /** Clears the underlying cache and all the transport connections. */ + public static void clearCache() { + cache.clear(); + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index 9b7cb1fd18..00c9a69050 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; import com.google.protobuf.Descriptors.Descriptor; import java.io.IOException; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -144,6 +145,22 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema) return writer; } + /** Clear the cache and close all the writers in the cache. */ + public void clear() { + synchronized (this) { + ConcurrentMap> map = writerCache.asMap(); + for (String key : map.keySet()) { + Cache entry = writerCache.getIfPresent(key); + ConcurrentMap entryMap = entry.asMap(); + for (Descriptor descriptor : entryMap.keySet()) { + StreamWriter writer = entry.getIfPresent(descriptor); + writer.close(); + } + } + writerCache.cleanUp(); + } + } + @VisibleForTesting public long cachedTableCount() { synchronized (writerCache) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java index f6a0948802..c3b059c777 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -24,15 +24,15 @@ import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.Test.*; +import com.google.common.collect.Sets; import com.google.protobuf.AbstractMessage; import com.google.protobuf.Timestamp; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ExecutionException; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,6 +42,8 @@ @RunWith(JUnit4.class) public class DirectWriterTest { + private static final Logger LOG = Logger.getLogger(DirectWriterTest.class.getName()); + private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2"; @@ -86,7 +88,7 @@ public void tearDown() throws Exception { } /** Response mocks for create a new writer */ - void WriterCreationResponseMock(String testStreamName, List responseOffsets) { + void WriterCreationResponseMock(String testStreamName, Set responseOffsets) { // Response from CreateWriteStream Stream.WriteStream expectedResponse = Stream.WriteStream.newBuilder().setName(testStreamName).build(); @@ -117,7 +119,7 @@ public void testWriteSuccess() throws Exception { FooType m1 = FooType.newBuilder().setFoo("m1").build(); FooType m2 = FooType.newBuilder().setFoo("m2").build(); - WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L))); + WriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L))); ApiFuture ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor()); assertEquals(Long.valueOf(0L), ret.get()); @@ -159,7 +161,7 @@ public void testWriteSuccess() throws Exception { assertEquals(expectRequest.toString(), actualRequests.get(3).toString()); // Write with a different schema. - WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L))); + WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L))); AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build(); ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m3)); verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); @@ -181,6 +183,8 @@ public void testWriteSuccess() throws Exception { ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType()); assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName()); assertEquals(expectRequest.toString(), actualRequests.get(6).toString()); + + DirectWriter.clearCache(); } @Test @@ -195,15 +199,17 @@ public void testWriteBadTableName() throws Exception { } catch (IllegalArgumentException expected) { assertEquals("Invalid table name: abc", expected.getMessage()); } + + DirectWriter.clearCache(); } @Test public void testConcurrentAccess() throws Exception { - WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck); + DirectWriter.testSetStub(client, 2, schemaCheck); final FooType m1 = FooType.newBuilder().setFoo("m1").build(); final FooType m2 = FooType.newBuilder().setFoo("m2").build(); - final List expectedOffset = - Arrays.asList( + final Set expectedOffset = + Sets.newHashSet( Long.valueOf(0L), Long.valueOf(2L), Long.valueOf(4L), @@ -221,12 +227,21 @@ public void run() { try { ApiFuture result = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); - assertTrue(expectedOffset.remove(result.get())); - } catch (IOException | InterruptedException | ExecutionException e) { - fail(e.getMessage()); + synchronized (expectedOffset) { + assertTrue(expectedOffset.remove(result.get())); + } + } catch (Exception e) { + fail(e.toString()); } } }); } + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } + DirectWriter.clearCache(); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 950419fdc9..f558503914 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -135,8 +135,9 @@ private ApiFuture sendTestMessage(StreamWriter writer, Strin @Test public void testTableName() throws Exception { - StreamWriter writer = getTestStreamWriterBuilder().build(); - assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + try (StreamWriter writer = getTestStreamWriterBuilder().build()) { + assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + } } @Test @@ -175,7 +176,7 @@ public void testAppendByDuration() throws Exception { .getSerializedRowsCount()); assertEquals( true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - writer.shutdown(); + writer.close(); } @Test @@ -228,7 +229,7 @@ public void testAppendByNumBatchedMessages() throws Exception { .getSerializedRowsCount()); assertEquals( false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - writer.shutdown(); + writer.close(); } @Test @@ -264,7 +265,7 @@ public void testAppendByNumBytes() throws Exception { assertEquals(3, testBigQueryWrite.getAppendRequests().size()); - writer.shutdown(); + writer.close(); } @Test @@ -429,7 +430,8 @@ public void testFlowControlBehaviorException() throws Exception { try { appendFuture2.get(); Assert.fail("This should fail"); - } catch (ExecutionException e) { + } catch (Exception e) { + LOG.info("ControlFlow test exception: " + e.toString()); assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage()); } assertEquals(1L, appendFuture1.get().getOffset()); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java index 47ad647e66..8f6f7a4bd0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -143,6 +143,7 @@ public void testCreateNewWriter() throws Exception { assertEquals(TEST_TABLE, writer.getTableNameString()); assertEquals(TEST_STREAM, writer.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); + cache.clear(); } @Test @@ -173,6 +174,7 @@ public void testWriterExpired() throws Exception { "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s", e.getMessage()); } + cache.clear(); } @Test @@ -216,6 +218,7 @@ public void testWriterWithNewSchema() throws Exception { assertEquals(TEST_STREAM_3, writer4.getStreamNameString()); assertEquals(TEST_STREAM_4, writer5.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); + cache.clear(); } @Test @@ -259,6 +262,7 @@ public void testWriterWithDifferentTable() throws Exception { assertEquals(TEST_STREAM_31, writer4.getStreamNameString()); assertEquals(TEST_STREAM, writer5.getStreamNameString()); assertEquals(2, cache.cachedTableCount()); + cache.clear(); } @Test @@ -275,7 +279,7 @@ public void testConcurrentAccess() throws Exception { public void run() { try { assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { fail(e.getMessage()); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index 04c831ccc6..da6e144323 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -391,5 +391,12 @@ public Long call() throws IOException, InterruptedException, ExecutionException assertTrue(expectedOffset.remove(response.get())); } assertTrue(expectedOffset.isEmpty()); + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } + DirectWriter.clearCache(); } }