From 248ab73eebb9feda94e7e29de591a70c91a064c1 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Thu, 17 Dec 2020 12:11:42 -0800 Subject: [PATCH] feat: add default stream support for StreamWriter (#744) * fix: a race condition in test * . * . * fix: allow default stream name to StreamWriter * . --- .../cloud/bigquery/storage/v1beta2/StreamWriter.java | 2 +- .../bigquery/storage/v1beta2/StreamWriterTest.java | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index fe13bf9af7..75506bab08 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -82,7 +82,7 @@ public class StreamWriter implements AutoCloseable { private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName()); private static String streamPatternString = - "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/(streams/[^/]+|_default)"; private static Pattern streamPattern = Pattern.compile(streamPatternString); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index b938e63365..64b4a1d77c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -58,6 +58,7 @@ public class StreamWriterTest { private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName()); private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_DEFAULT_STREAM = "projects/p/datasets/d/tables/t/_default"; private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); private static LocalChannelProvider channelProvider; @@ -102,6 +103,10 @@ private StreamWriter.Builder getTestStreamWriterBuilder() { return getTestStreamWriterBuilder(TEST_STREAM); } + private StreamWriter.Builder getDefaultTestStreamWriterBuilder() { + return getTestStreamWriterBuilder(TEST_DEFAULT_STREAM); + } + private AppendRowsRequest createAppendRequest(String[] messages, long offset) { AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); @@ -143,6 +148,13 @@ public void testTableName() throws Exception { } } + @Test + public void testDefaultStream() throws Exception { + try (StreamWriter writer = getDefaultTestStreamWriterBuilder().build()) { + assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + } + } + @Test public void testAppendByDuration() throws Exception { StreamWriter writer =