diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index 2e9eb51582..309241d8a9 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -33,4 +33,14 @@
com/google/cloud/bigquery/storage/v1alpha2/BQTableSchemaToProtoDescriptor
com.google.protobuf.Descriptors$Descriptor ConvertBQTableSchemaToProtoDescriptor(com.google.cloud.bigquery.storage.v1alpha2.Table$TableSchema)
-
+
+ com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter
+ 7002
+ java.lang.Boolean expired()
+
+
+ com/google/cloud/bigquery/storage/v1alpha2/StreamWriter
+ 7002
+ java.lang.Boolean expired()
+
+
\ No newline at end of file
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java
index f0c63dd583..ed8ee0f9fe 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java
@@ -260,11 +260,6 @@ public void close() {
this.streamWriter.close();
}
- /** Returns if a stream has expired. */
- public Boolean expired() {
- return this.streamWriter.expired();
- }
-
private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java
index d9d22ac75a..f623076252 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCache.java
@@ -113,11 +113,7 @@ public JsonStreamWriter getTableWriter(String tableName)
synchronized (this) {
writer = jsonWriterCache.getIfPresent(tableName);
if (writer != null) {
- if (!writer.expired()) {
- return writer;
- } else {
- writer.close();
- }
+ return writer;
}
writeStream = CreateNewWriteStream(tableName);
writer = CreateNewWriter(writeStream);
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
index e146169cb1..83fa66cefa 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
@@ -57,7 +57,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.threeten.bp.Duration;
-import org.threeten.bp.Instant;
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
@@ -118,9 +117,6 @@ public class StreamWriter implements AutoCloseable {
private final AtomicBoolean activeAlarm;
private ScheduledFuture> currentAlarmFuture;
- private Instant createTime;
- private Duration streamTTL = Duration.ofDays(1);
-
private Integer currentRetries = 0;
// Used for schema updates
@@ -182,19 +178,6 @@ private StreamWriter(Builder builder)
}
refreshAppend();
- Stream.WriteStream stream =
- stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build());
- createTime =
- Instant.ofEpochSecond(
- stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
- if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
- throw new IllegalStateException(
- "Cannot write to a stream that is already committed: " + streamName);
- }
- if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
- throw new IllegalStateException(
- "Cannot write to a stream that is already expired: " + streamName);
- }
}
/** Stream name we are writing to. */
@@ -212,11 +195,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this.onSchemaUpdateRunnable;
}
- /** Returns if a stream has expired. */
- public Boolean expired() {
- return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
- }
-
private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
index 7f9019bafe..ad74900f76 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
@@ -135,11 +135,7 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
if (tableEntry != null) {
writer = tableEntry.getIfPresent(userSchema);
if (writer != null) {
- if (!writer.expired()) {
- return writer;
- } else {
- writer.close();
- }
+ return writer;
}
compat.check(tableName, userSchema);
streamName = CreateNewStream(tableName);
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
index 1e358f26ed..e0550196ba 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
@@ -28,7 +28,6 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
-import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -50,7 +49,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.threeten.bp.Instant;
@RunWith(JUnit4.class)
public class DirectWriterTest {
@@ -115,18 +113,6 @@ void WriterCreationResponseMock(String testStreamName, Set responseOffsets
Stream.WriteStream.newBuilder().setName(testStreamName).build();
mockBigQueryWrite.addResponse(expectedResponse);
- // Response from GetWriteStream
- Instant time = Instant.now();
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(testStreamName)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
-
for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
@@ -144,18 +130,6 @@ void JsonWriterCreationResponseMock(String testStreamName, Set responseOff
.build();
mockBigQueryWrite.addResponse(expectedResponse);
- // Response from GetWriteStream
- Instant time = Instant.now();
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(testStreamName)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
-
for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
@@ -183,19 +157,15 @@ public void testJsonWriteSuccess() throws Exception {
ApiFuture ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
List actualRequests = mockBigQueryWrite.getRequests();
- Assert.assertEquals(3, actualRequests.size());
+ Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
m1.toByteString(),
- ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
+ ((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
- ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(1));
+ ((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(1));
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
@@ -205,7 +175,7 @@ public void testJsonWriteSuccess() throws Exception {
assertEquals(Long.valueOf(2L), ret.get());
assertEquals(
m1.toByteString(),
- ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
+ ((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
DirectWriter.clearCache();
}
@@ -220,13 +190,9 @@ public void testProtobufWriteSuccess() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List actualRequests = mockBigQueryWrite.getRequests();
- Assert.assertEquals(3, actualRequests.size());
+ Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
@@ -241,7 +207,7 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
- assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
+ assertEquals(expectRequest.toString(), actualRequests.get(1).toString());
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
@@ -254,7 +220,7 @@ public void testProtobufWriteSuccess() throws Exception {
ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build());
expectRequest =
Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build();
- assertEquals(expectRequest.toString(), actualRequests.get(3).toString());
+ assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
@@ -271,14 +237,10 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM_2)
.setProtoRows(dataBuilder.build())
.build();
- Assert.assertEquals(7, actualRequests.size());
+ Assert.assertEquals(5, actualRequests.size());
assertEquals(
- TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
- assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
- assertEquals(expectRequest.toString(), actualRequests.get(6).toString());
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
+ assertEquals(expectRequest.toString(), actualRequests.get(4).toString());
DirectWriter.clearCache();
}
@@ -433,13 +395,9 @@ public void testJsonProtobufWrite() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List actualRequests = mockBigQueryWrite.getRequests();
- Assert.assertEquals(3, actualRequests.size());
+ Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
@@ -454,25 +412,19 @@ public void testJsonProtobufWrite() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
- assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
+ assertEquals(expectRequest.toString(), actualRequests.get(1).toString());
JsonWriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
actualRequests = mockBigQueryWrite.getRequests();
- Assert.assertEquals(6, actualRequests.size());
- assertEquals(
- TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(4)).getName());
+ Assert.assertEquals(4, actualRequests.size());
assertEquals(
m1.toByteString(),
- ((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(0));
+ ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
- ((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(1));
+ ((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(1));
DirectWriter.clearCache();
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java
index 5dd4ce820d..c95229e59f 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonWriterCacheTest.java
@@ -26,7 +26,6 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.protobuf.AbstractMessage;
-import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -40,8 +39,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.threeten.bp.Instant;
-import org.threeten.bp.temporal.ChronoUnit;
@RunWith(JUnit4.class)
public class JsonWriterCacheTest {
@@ -108,18 +105,6 @@ void WriterCreationResponseMock(String testStreamName) {
.setTableSchema(TABLE_SCHEMA)
.build();
mockBigQueryWrite.addResponse(expectedResponse);
-
- // Response from GetWriteStream
- Instant time = Instant.now();
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(testStreamName)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
}
@After
@@ -144,50 +129,15 @@ public void testCreateNewWriter() throws Exception {
WriterCreationResponseMock(TEST_STREAM);
JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
List actualRequests = mockBigQueryWrite.getRequests();
- assertEquals(2, actualRequests.size());
+ assertEquals(1, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(
- Stream.WriteStream.Type.COMMITTED,
- ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(TEST_STREAM, writer.getStreamName());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}
- @Test
- public void testWriterExpired() throws Exception {
- JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 10);
- // Response from CreateWriteStream
- Stream.WriteStream expectedResponse =
- Stream.WriteStream.newBuilder().setName(TEST_STREAM).build();
- mockBigQueryWrite.addResponse(expectedResponse);
-
- // Response from GetWriteStream
- Instant time = Instant.now().minus(2, ChronoUnit.DAYS);
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(TEST_STREAM)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
-
- try {
- JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
- fail("Should fail");
- } catch (IllegalStateException e) {
- assertEquals(
- "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
- e.getMessage());
- }
- cache.clear();
- }
-
@Test
public void testWriterWithDifferentTable() throws Exception {
JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 2);
@@ -197,14 +147,11 @@ public void testWriterWithDifferentTable() throws Exception {
JsonStreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2);
List actualRequests = mockBigQueryWrite.getRequests();
- assertEquals(4, actualRequests.size());
+ assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
- TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
- Assert.assertEquals(
- TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
+ TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent());
assertEquals(TEST_STREAM, writer1.getStreamName());
assertEquals(TEST_STREAM_21, writer2.getStreamName());
assertEquals(2, cache.cachedTableCount());
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
index a427a5bbc3..cc62b4ee96 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
@@ -26,7 +26,6 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.protobuf.AbstractMessage;
-import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -40,8 +39,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.threeten.bp.Instant;
-import org.threeten.bp.temporal.ChronoUnit;
@RunWith(JUnit4.class)
public class WriterCacheTest {
@@ -96,18 +93,6 @@ void WriterCreationResponseMock(String testStreamName) {
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(testStreamName).build();
mockBigQueryWrite.addResponse(expectedResponse);
-
- // Response from GetWriteStream
- Instant time = Instant.now();
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(testStreamName)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
}
@After
@@ -133,51 +118,18 @@ public void testCreateNewWriter() throws Exception {
StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor());
List actualRequests = mockBigQueryWrite.getRequests();
- assertEquals(2, actualRequests.size());
+ assertEquals(1, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
-
assertEquals(TEST_TABLE, writer.getTableNameString());
assertEquals(TEST_STREAM, writer.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}
- @Test
- public void testWriterExpired() throws Exception {
- WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
- // Response from CreateWriteStream
- Stream.WriteStream expectedResponse =
- Stream.WriteStream.newBuilder().setName(TEST_STREAM).build();
- mockBigQueryWrite.addResponse(expectedResponse);
-
- // Response from GetWriteStream
- Instant time = Instant.now().minus(2, ChronoUnit.DAYS);
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- Stream.WriteStream expectedResponse2 =
- Stream.WriteStream.newBuilder()
- .setName(TEST_STREAM)
- .setType(Stream.WriteStream.Type.COMMITTED)
- .setCreateTime(timestamp)
- .build();
- mockBigQueryWrite.addResponse(expectedResponse2);
-
- try {
- StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
- fail("Should fail");
- } catch (IllegalStateException e) {
- assertEquals(
- "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
- e.getMessage());
- }
- cache.clear();
- }
-
@Test
public void testWriterWithNewSchema() throws Exception {
WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
@@ -190,13 +142,11 @@ public void testWriterWithNewSchema() throws Exception {
verify(mockSchemaCheck, times(1)).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
List actualRequests = mockBigQueryWrite.getRequests();
- assertEquals(4, actualRequests.size());
+ assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
- TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
- assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent());
assertEquals(TEST_STREAM, writer1.getStreamNameString());
assertEquals(TEST_STREAM_2, writer2.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
@@ -233,14 +183,11 @@ public void testWriterWithDifferentTable() throws Exception {
verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor());
List actualRequests = mockBigQueryWrite.getRequests();
- assertEquals(4, actualRequests.size());
+ assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
- assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
- TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
- Assert.assertEquals(
- TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
+ TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent());
assertEquals(TEST_STREAM, writer1.getStreamNameString());
assertEquals(TEST_STREAM_21, writer2.getStreamNameString());
assertEquals(2, cache.cachedTableCount());