From 7e3f40199f93ed5a0aaef61e311df00f99052c14 Mon Sep 17 00:00:00 2001 From: yirutang Date: Thu, 30 Apr 2020 14:55:41 -0700 Subject: [PATCH 1/2] fix: some test warnings about writers are not closed properly. modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java --- .../storage/v1alpha2/DirectWriter.java | 7 +++ .../storage/v1alpha2/WriterCache.java | 16 ++++++ .../storage/v1alpha2/DirectWriterTest.java | 50 ++++++++++++------- .../storage/v1alpha2/StreamWriterTest.java | 11 ++-- .../storage/v1alpha2/WriterCacheTest.java | 6 ++- .../it/ITBigQueryWriteManualClientTest.java | 7 +++ 6 files changed, 73 insertions(+), 24 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java index 295638f74f..5e4aad469a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -102,4 +102,11 @@ public static void testSetStub( BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) { cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck); } + + /** + * Clears the underlying cache and all the transport connections. + */ + public static void clearCache() { + cache.clear(); + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index 9b7cb1fd18..52d408189c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; import com.google.protobuf.Descriptors.Descriptor; import java.io.IOException; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -144,6 +145,21 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema) return writer; } + public void clear() { + synchronized (this) { + ConcurrentMap> map = writerCache.asMap(); + for (String key : map.keySet()) { + Cache entry = writerCache.getIfPresent(key); + ConcurrentMap entryMap = entry.asMap(); + for (Descriptor descriptor : entryMap.keySet()) { + StreamWriter writer = entry.getIfPresent(descriptor); + writer.close(); + } + } + writerCache.cleanUp(); + } + } + @VisibleForTesting public long cachedTableCount() { synchronized (writerCache) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java index f6a0948802..251dba141d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -24,15 +24,16 @@ import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.Test.*; +import com.google.common.collect.Sets; import com.google.protobuf.AbstractMessage; import com.google.protobuf.Timestamp; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ExecutionException; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,6 +43,8 @@ @RunWith(JUnit4.class) public class DirectWriterTest { + private static final Logger LOG = Logger.getLogger(DirectWriterTest.class.getName()); + private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2"; @@ -86,7 +89,7 @@ public void tearDown() throws Exception { } /** Response mocks for create a new writer */ - void WriterCreationResponseMock(String testStreamName, List responseOffsets) { + void WriterCreationResponseMock(String testStreamName, Set responseOffsets) { // Response from CreateWriteStream Stream.WriteStream expectedResponse = Stream.WriteStream.newBuilder().setName(testStreamName).build(); @@ -117,7 +120,7 @@ public void testWriteSuccess() throws Exception { FooType m1 = FooType.newBuilder().setFoo("m1").build(); FooType m2 = FooType.newBuilder().setFoo("m2").build(); - WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L))); + WriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L))); ApiFuture ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor()); assertEquals(Long.valueOf(0L), ret.get()); @@ -159,7 +162,7 @@ public void testWriteSuccess() throws Exception { assertEquals(expectRequest.toString(), actualRequests.get(3).toString()); // Write with a different schema. - WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L))); + WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L))); AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build(); ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m3)); verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); @@ -181,6 +184,8 @@ public void testWriteSuccess() throws Exception { ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType()); assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName()); assertEquals(expectRequest.toString(), actualRequests.get(6).toString()); + + DirectWriter.clearCache(); } @Test @@ -195,20 +200,20 @@ public void testWriteBadTableName() throws Exception { } catch (IllegalArgumentException expected) { assertEquals("Invalid table name: abc", expected.getMessage()); } + + DirectWriter.clearCache(); } @Test public void testConcurrentAccess() throws Exception { - WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck); + DirectWriter.testSetStub(client, 10, schemaCheck); final FooType m1 = FooType.newBuilder().setFoo("m1").build(); final FooType m2 = FooType.newBuilder().setFoo("m2").build(); - final List expectedOffset = - Arrays.asList( - Long.valueOf(0L), - Long.valueOf(2L), - Long.valueOf(4L), - Long.valueOf(8L), - Long.valueOf(10L)); + final Set expectedOffset = Sets.newHashSet(Long.valueOf(0L), + Long.valueOf(2L), + Long.valueOf(4L), + Long.valueOf(8L), + Long.valueOf(10L)); // Make sure getting the same table writer in multiple thread only cause create to be called // once. WriterCreationResponseMock(TEST_STREAM, expectedOffset); @@ -221,12 +226,21 @@ public void run() { try { ApiFuture result = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); - assertTrue(expectedOffset.remove(result.get())); - } catch (IOException | InterruptedException | ExecutionException e) { - fail(e.getMessage()); + synchronized (expectedOffset) { + assertTrue(expectedOffset.remove(result.get())); + } + } catch (Exception e) { + fail(e.toString()); } } }); } + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } + DirectWriter.clearCache(); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 950419fdc9..14b24a9e25 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -135,8 +135,9 @@ private ApiFuture sendTestMessage(StreamWriter writer, Strin @Test public void testTableName() throws Exception { - StreamWriter writer = getTestStreamWriterBuilder().build(); - assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + try (StreamWriter writer = getTestStreamWriterBuilder().build()) { + assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + } } @Test @@ -175,7 +176,7 @@ public void testAppendByDuration() throws Exception { .getSerializedRowsCount()); assertEquals( true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - writer.shutdown(); + writer.close(); } @Test @@ -228,7 +229,7 @@ public void testAppendByNumBatchedMessages() throws Exception { .getSerializedRowsCount()); assertEquals( false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - writer.shutdown(); + writer.close(); } @Test @@ -264,7 +265,7 @@ public void testAppendByNumBytes() throws Exception { assertEquals(3, testBigQueryWrite.getAppendRequests().size()); - writer.shutdown(); + writer.close(); } @Test diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java index 47ad647e66..8f6f7a4bd0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -143,6 +143,7 @@ public void testCreateNewWriter() throws Exception { assertEquals(TEST_TABLE, writer.getTableNameString()); assertEquals(TEST_STREAM, writer.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); + cache.clear(); } @Test @@ -173,6 +174,7 @@ public void testWriterExpired() throws Exception { "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s", e.getMessage()); } + cache.clear(); } @Test @@ -216,6 +218,7 @@ public void testWriterWithNewSchema() throws Exception { assertEquals(TEST_STREAM_3, writer4.getStreamNameString()); assertEquals(TEST_STREAM_4, writer5.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); + cache.clear(); } @Test @@ -259,6 +262,7 @@ public void testWriterWithDifferentTable() throws Exception { assertEquals(TEST_STREAM_31, writer4.getStreamNameString()); assertEquals(TEST_STREAM, writer5.getStreamNameString()); assertEquals(2, cache.cachedTableCount()); + cache.clear(); } @Test @@ -275,7 +279,7 @@ public void testConcurrentAccess() throws Exception { public void run() { try { assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { fail(e.getMessage()); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index 04c831ccc6..da6e144323 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -391,5 +391,12 @@ public Long call() throws IOException, InterruptedException, ExecutionException assertTrue(expectedOffset.remove(response.get())); } assertTrue(expectedOffset.isEmpty()); + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } + DirectWriter.clearCache(); } } From 9181269ea4ed03009224663e4846cc7e7c3234d9 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 1 May 2020 10:09:39 -0700 Subject: [PATCH 2/2] format and doc modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java --- .../bigquery/storage/v1alpha2/DirectWriter.java | 4 +--- .../bigquery/storage/v1alpha2/WriterCache.java | 1 + .../storage/v1alpha2/DirectWriterTest.java | 15 ++++++++------- .../storage/v1alpha2/StreamWriterTest.java | 3 ++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java index 5e4aad469a..4b3032b615 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -103,9 +103,7 @@ public static void testSetStub( cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck); } - /** - * Clears the underlying cache and all the transport connections. - */ + /** Clears the underlying cache and all the transport connections. */ public static void clearCache() { cache.clear(); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index 52d408189c..00c9a69050 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -145,6 +145,7 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema) return writer; } + /** Clear the cache and close all the writers in the cache. */ public void clear() { synchronized (this) { ConcurrentMap> map = writerCache.asMap(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java index 251dba141d..c3b059c777 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; - import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -206,14 +205,16 @@ public void testWriteBadTableName() throws Exception { @Test public void testConcurrentAccess() throws Exception { - DirectWriter.testSetStub(client, 10, schemaCheck); + DirectWriter.testSetStub(client, 2, schemaCheck); final FooType m1 = FooType.newBuilder().setFoo("m1").build(); final FooType m2 = FooType.newBuilder().setFoo("m2").build(); - final Set expectedOffset = Sets.newHashSet(Long.valueOf(0L), - Long.valueOf(2L), - Long.valueOf(4L), - Long.valueOf(8L), - Long.valueOf(10L)); + final Set expectedOffset = + Sets.newHashSet( + Long.valueOf(0L), + Long.valueOf(2L), + Long.valueOf(4L), + Long.valueOf(8L), + Long.valueOf(10L)); // Make sure getting the same table writer in multiple thread only cause create to be called // once. WriterCreationResponseMock(TEST_STREAM, expectedOffset); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 14b24a9e25..f558503914 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -430,7 +430,8 @@ public void testFlowControlBehaviorException() throws Exception { try { appendFuture2.get(); Assert.fail("This should fail"); - } catch (ExecutionException e) { + } catch (Exception e) { + LOG.info("ControlFlow test exception: " + e.toString()); assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage()); } assertEquals(1L, appendFuture1.get().getOffset());