From 2ae69b640adc48b79f0aab71c215eb3ef055a34c Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Thu, 22 Oct 2020 12:04:59 -0700 Subject: [PATCH] fix: remove stream ttl in client library, since there is no very clear TTL defined. (#627) * fix: Due to backend issues resolved, we no longer need to wait for 5 seconds between reconnection * fix: Remove the unnecessary GetWriteStream call to check stream TTL, since the stream TTL definition becomes complicate. * add ignore method diff --- .../clirr-ignored-differences.xml | 12 ++- .../storage/v1alpha2/JsonStreamWriter.java | 5 -- .../storage/v1alpha2/JsonWriterCache.java | 6 +- .../storage/v1alpha2/StreamWriter.java | 22 ------ .../storage/v1alpha2/WriterCache.java | 6 +- .../storage/v1alpha2/DirectWriterTest.java | 78 ++++--------------- .../storage/v1alpha2/JsonWriterCacheTest.java | 59 +------------- .../storage/v1alpha2/WriterCacheTest.java | 63 ++------------- 8 files changed, 36 insertions(+), 215 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 2e9eb51582..309241d8a9 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -33,4 +33,14 @@ com/google/cloud/bigquery/storage/v1alpha2/BQTableSchemaToProtoDescriptor com.google.protobuf.Descriptors$Descriptor ConvertBQTableSchemaToProtoDescriptor(com.google.cloud.bigquery.storage.v1alpha2.Table$TableSchema) - + + com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter + 7002 + java.lang.Boolean expired() + + + com/google/cloud/bigquery/storage/v1alpha2/StreamWriter + 7002 + java.lang.Boolean expired() + + \ No newline at end of file diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java index f0c63dd583..ed8ee0f9fe 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java @@ -260,11 +260,6 @@ public void close() { this.streamWriter.close(); } - /** Returns if a stream has expired. */ - public Boolean expired() { - return this.streamWriter.expired(); - } - private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable { private JsonStreamWriter jsonStreamWriter; /** diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java index d9d22ac75a..f623076252 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java @@ -113,11 +113,7 @@ public JsonStreamWriter getTableWriter(String tableName) synchronized (this) { writer = jsonWriterCache.getIfPresent(tableName); if (writer != null) { - if (!writer.expired()) { - return writer; - } else { - writer.close(); - } + return writer; } writeStream = CreateNewWriteStream(tableName); writer = CreateNewWriter(writeStream); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index e146169cb1..83fa66cefa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -57,7 +57,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.threeten.bp.Duration; -import org.threeten.bp.Instant; /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. @@ -118,9 +117,6 @@ public class StreamWriter implements AutoCloseable { private final AtomicBoolean activeAlarm; private ScheduledFuture currentAlarmFuture; - private Instant createTime; - private Duration streamTTL = Duration.ofDays(1); - private Integer currentRetries = 0; // Used for schema updates @@ -182,19 +178,6 @@ private StreamWriter(Builder builder) } refreshAppend(); - Stream.WriteStream stream = - stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build()); - createTime = - Instant.ofEpochSecond( - stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos()); - if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) { - throw new IllegalStateException( - "Cannot write to a stream that is already committed: " + streamName); - } - if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) { - throw new IllegalStateException( - "Cannot write to a stream that is already expired: " + streamName); - } } /** Stream name we are writing to. */ @@ -212,11 +195,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() { return this.onSchemaUpdateRunnable; } - /** Returns if a stream has expired. */ - public Boolean expired() { - return createTime.plus(streamTTL).compareTo(Instant.now()) < 0; - } - private void setException(Throwable t) { exceptionLock.lock(); if (this.streamException == null) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index 7f9019bafe..ad74900f76 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -135,11 +135,7 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema) if (tableEntry != null) { writer = tableEntry.getIfPresent(userSchema); if (writer != null) { - if (!writer.expired()) { - return writer; - } else { - writer.close(); - } + return writer; } compat.check(tableName, userSchema); streamName = CreateNewStream(tableName); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java index 1e358f26ed..e0550196ba 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -28,7 +28,6 @@ import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; import com.google.common.collect.Sets; import com.google.protobuf.AbstractMessage; -import com.google.protobuf.Timestamp; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -50,7 +49,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.threeten.bp.Instant; @RunWith(JUnit4.class) public class DirectWriterTest { @@ -115,18 +113,6 @@ void WriterCreationResponseMock(String testStreamName, Set responseOffsets Stream.WriteStream.newBuilder().setName(testStreamName).build(); mockBigQueryWrite.addResponse(expectedResponse); - // Response from GetWriteStream - Instant time = Instant.now(); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(testStreamName) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); - for (Long offset : responseOffsets) { Storage.AppendRowsResponse response = Storage.AppendRowsResponse.newBuilder().setOffset(offset).build(); @@ -144,18 +130,6 @@ void JsonWriterCreationResponseMock(String testStreamName, Set responseOff .build(); mockBigQueryWrite.addResponse(expectedResponse); - // Response from GetWriteStream - Instant time = Instant.now(); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(testStreamName) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); - for (Long offset : responseOffsets) { Storage.AppendRowsResponse response = Storage.AppendRowsResponse.newBuilder().setOffset(offset).build(); @@ -183,19 +157,15 @@ public void testJsonWriteSuccess() throws Exception { ApiFuture ret = DirectWriter.append(TEST_TABLE, jsonArr); assertEquals(Long.valueOf(0L), ret.get()); List actualRequests = mockBigQueryWrite.getRequests(); - Assert.assertEquals(3, actualRequests.size()); + Assert.assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); assertEquals( m1.toByteString(), - ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0)); + ((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(0)); assertEquals( m2.toByteString(), - ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(1)); + ((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(1)); Storage.AppendRowsResponse response = Storage.AppendRowsResponse.newBuilder().setOffset(2).build(); @@ -205,7 +175,7 @@ public void testJsonWriteSuccess() throws Exception { assertEquals(Long.valueOf(2L), ret.get()); assertEquals( m1.toByteString(), - ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0)); + ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0)); DirectWriter.clearCache(); } @@ -220,13 +190,9 @@ public void testProtobufWriteSuccess() throws Exception { verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor()); assertEquals(Long.valueOf(0L), ret.get()); List actualRequests = mockBigQueryWrite.getRequests(); - Assert.assertEquals(3, actualRequests.size()); + Assert.assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); Storage.AppendRowsRequest.ProtoData.Builder dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder(); @@ -241,7 +207,7 @@ public void testProtobufWriteSuccess() throws Exception { .setWriteStream(TEST_STREAM) .setProtoRows(dataBuilder.build()) .build(); - assertEquals(expectRequest.toString(), actualRequests.get(2).toString()); + assertEquals(expectRequest.toString(), actualRequests.get(1).toString()); Storage.AppendRowsResponse response = Storage.AppendRowsResponse.newBuilder().setOffset(2).build(); @@ -254,7 +220,7 @@ public void testProtobufWriteSuccess() throws Exception { ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build()); expectRequest = Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build(); - assertEquals(expectRequest.toString(), actualRequests.get(3).toString()); + assertEquals(expectRequest.toString(), actualRequests.get(2).toString()); // Write with a different schema. WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L))); @@ -271,14 +237,10 @@ public void testProtobufWriteSuccess() throws Exception { .setWriteStream(TEST_STREAM_2) .setProtoRows(dataBuilder.build()) .build(); - Assert.assertEquals(7, actualRequests.size()); + Assert.assertEquals(5, actualRequests.size()); assertEquals( - TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType()); - assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName()); - assertEquals(expectRequest.toString(), actualRequests.get(6).toString()); + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent()); + assertEquals(expectRequest.toString(), actualRequests.get(4).toString()); DirectWriter.clearCache(); } @@ -433,13 +395,9 @@ public void testJsonProtobufWrite() throws Exception { verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor()); assertEquals(Long.valueOf(0L), ret.get()); List actualRequests = mockBigQueryWrite.getRequests(); - Assert.assertEquals(3, actualRequests.size()); + Assert.assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); Storage.AppendRowsRequest.ProtoData.Builder dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder(); @@ -454,25 +412,19 @@ public void testJsonProtobufWrite() throws Exception { .setWriteStream(TEST_STREAM) .setProtoRows(dataBuilder.build()) .build(); - assertEquals(expectRequest.toString(), actualRequests.get(2).toString()); + assertEquals(expectRequest.toString(), actualRequests.get(1).toString()); JsonWriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L))); ret = DirectWriter.append(TEST_TABLE, jsonArr); assertEquals(Long.valueOf(0L), ret.get()); actualRequests = mockBigQueryWrite.getRequests(); - Assert.assertEquals(6, actualRequests.size()); - assertEquals( - TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(4)).getName()); + Assert.assertEquals(4, actualRequests.size()); assertEquals( m1.toByteString(), - ((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(0)); + ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0)); assertEquals( m2.toByteString(), - ((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(1)); + ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(1)); DirectWriter.clearCache(); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java index 5dd4ce820d..c95229e59f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java @@ -26,7 +26,6 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.Test.*; import com.google.protobuf.AbstractMessage; -import com.google.protobuf.Timestamp; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -40,8 +39,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.threeten.bp.Instant; -import org.threeten.bp.temporal.ChronoUnit; @RunWith(JUnit4.class) public class JsonWriterCacheTest { @@ -108,18 +105,6 @@ void WriterCreationResponseMock(String testStreamName) { .setTableSchema(TABLE_SCHEMA) .build(); mockBigQueryWrite.addResponse(expectedResponse); - - // Response from GetWriteStream - Instant time = Instant.now(); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(testStreamName) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); } @After @@ -144,50 +129,15 @@ public void testCreateNewWriter() throws Exception { WriterCreationResponseMock(TEST_STREAM); JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE); List actualRequests = mockBigQueryWrite.getRequests(); - assertEquals(2, actualRequests.size()); + assertEquals(1, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals( - Stream.WriteStream.Type.COMMITTED, - ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); assertEquals(TEST_STREAM, writer.getStreamName()); assertEquals(1, cache.cachedTableCount()); cache.clear(); } - @Test - public void testWriterExpired() throws Exception { - JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 10); - // Response from CreateWriteStream - Stream.WriteStream expectedResponse = - Stream.WriteStream.newBuilder().setName(TEST_STREAM).build(); - mockBigQueryWrite.addResponse(expectedResponse); - - // Response from GetWriteStream - Instant time = Instant.now().minus(2, ChronoUnit.DAYS); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(TEST_STREAM) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); - - try { - JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE); - fail("Should fail"); - } catch (IllegalStateException e) { - assertEquals( - "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s", - e.getMessage()); - } - cache.clear(); - } - @Test public void testWriterWithDifferentTable() throws Exception { JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 2); @@ -197,14 +147,11 @@ public void testWriterWithDifferentTable() throws Exception { JsonStreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2); List actualRequests = mockBigQueryWrite.getRequests(); - assertEquals(4, actualRequests.size()); + assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); assertEquals( - TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent()); - Assert.assertEquals( - TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName()); + TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent()); assertEquals(TEST_STREAM, writer1.getStreamName()); assertEquals(TEST_STREAM_21, writer2.getStreamName()); assertEquals(2, cache.cachedTableCount()); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java index a427a5bbc3..cc62b4ee96 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -26,7 +26,6 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.Test.*; import com.google.protobuf.AbstractMessage; -import com.google.protobuf.Timestamp; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -40,8 +39,6 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.threeten.bp.Instant; -import org.threeten.bp.temporal.ChronoUnit; @RunWith(JUnit4.class) public class WriterCacheTest { @@ -96,18 +93,6 @@ void WriterCreationResponseMock(String testStreamName) { Stream.WriteStream expectedResponse = Stream.WriteStream.newBuilder().setName(testStreamName).build(); mockBigQueryWrite.addResponse(expectedResponse); - - // Response from GetWriteStream - Instant time = Instant.now(); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(testStreamName) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); } @After @@ -133,51 +118,18 @@ public void testCreateNewWriter() throws Exception { StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor()); List actualRequests = mockBigQueryWrite.getRequests(); - assertEquals(2, actualRequests.size()); + assertEquals(1, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); assertEquals( Stream.WriteStream.Type.COMMITTED, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); - assertEquals(TEST_TABLE, writer.getTableNameString()); assertEquals(TEST_STREAM, writer.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); cache.clear(); } - @Test - public void testWriterExpired() throws Exception { - WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); - // Response from CreateWriteStream - Stream.WriteStream expectedResponse = - Stream.WriteStream.newBuilder().setName(TEST_STREAM).build(); - mockBigQueryWrite.addResponse(expectedResponse); - - // Response from GetWriteStream - Instant time = Instant.now().minus(2, ChronoUnit.DAYS); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - Stream.WriteStream expectedResponse2 = - Stream.WriteStream.newBuilder() - .setName(TEST_STREAM) - .setType(Stream.WriteStream.Type.COMMITTED) - .setCreateTime(timestamp) - .build(); - mockBigQueryWrite.addResponse(expectedResponse2); - - try { - StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); - fail("Should fail"); - } catch (IllegalStateException e) { - assertEquals( - "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s", - e.getMessage()); - } - cache.clear(); - } - @Test public void testWriterWithNewSchema() throws Exception { WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); @@ -190,13 +142,11 @@ public void testWriterWithNewSchema() throws Exception { verify(mockSchemaCheck, times(1)).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); List actualRequests = mockBigQueryWrite.getRequests(); - assertEquals(4, actualRequests.size()); + assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); assertEquals( - TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent()); - assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName()); + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent()); assertEquals(TEST_STREAM, writer1.getStreamNameString()); assertEquals(TEST_STREAM_2, writer2.getStreamNameString()); assertEquals(1, cache.cachedTableCount()); @@ -233,14 +183,11 @@ public void testWriterWithDifferentTable() throws Exception { verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor()); List actualRequests = mockBigQueryWrite.getRequests(); - assertEquals(4, actualRequests.size()); + assertEquals(2, actualRequests.size()); assertEquals( TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); - assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); assertEquals( - TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent()); - Assert.assertEquals( - TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName()); + TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent()); assertEquals(TEST_STREAM, writer1.getStreamNameString()); assertEquals(TEST_STREAM_21, writer2.getStreamNameString()); assertEquals(2, cache.cachedTableCount());