Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: remove stream ttl in client library, since there is no very clea…
…r TTL defined. (#627)

* fix: Due to backend issues resolved, we no longer need to wait for 5 seconds between reconnection

* fix: Remove the unnecessary GetWriteStream call to check stream TTL, since the stream TTL definition becomes complicate.

* add ignore method diff
  • Loading branch information
yirutang committed Oct 22, 2020
1 parent 6347c13 commit 2ae69b6
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 215 deletions.
12 changes: 11 additions & 1 deletion google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -33,4 +33,14 @@
<className>com/google/cloud/bigquery/storage/v1alpha2/BQTableSchemaToProtoDescriptor</className>
<method>com.google.protobuf.Descriptors$Descriptor ConvertBQTableSchemaToProtoDescriptor(com.google.cloud.bigquery.storage.v1alpha2.Table$TableSchema)</method>
</difference>
</differences>
<difference>
<className>com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter</className>
<differenceType>7002</differenceType>
<method>java.lang.Boolean expired()</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<differenceType>7002</differenceType>
<method>java.lang.Boolean expired()</method>
</difference>
</differences>
Expand Up @@ -260,11 +260,6 @@ public void close() {
this.streamWriter.close();
}

/** Returns if a stream has expired. */
public Boolean expired() {
return this.streamWriter.expired();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
Expand Down
Expand Up @@ -113,11 +113,7 @@ public JsonStreamWriter getTableWriter(String tableName)
synchronized (this) {
writer = jsonWriterCache.getIfPresent(tableName);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
return writer;
}
writeStream = CreateNewWriteStream(tableName);
writer = CreateNewWriter(writeStream);
Expand Down
Expand Up @@ -57,7 +57,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
Expand Down Expand Up @@ -118,9 +117,6 @@ public class StreamWriter implements AutoCloseable {
private final AtomicBoolean activeAlarm;
private ScheduledFuture<?> currentAlarmFuture;

private Instant createTime;
private Duration streamTTL = Duration.ofDays(1);

private Integer currentRetries = 0;

// Used for schema updates
Expand Down Expand Up @@ -182,19 +178,6 @@ private StreamWriter(Builder builder)
}

refreshAppend();
Stream.WriteStream stream =
stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build());
createTime =
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
}

/** Stream name we are writing to. */
Expand All @@ -212,11 +195,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this.onSchemaUpdateRunnable;
}

/** Returns if a stream has expired. */
public Boolean expired() {
return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
}

private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
Expand Down
Expand Up @@ -135,11 +135,7 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
if (tableEntry != null) {
writer = tableEntry.getIfPresent(userSchema);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
return writer;
}
compat.check(tableName, userSchema);
streamName = CreateNewStream(tableName);
Expand Down
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -50,7 +49,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
public class DirectWriterTest {
Expand Down Expand Up @@ -115,18 +113,6 @@ void WriterCreationResponseMock(String testStreamName, Set<Long> responseOffsets
Stream.WriteStream.newBuilder().setName(testStreamName).build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
Expand All @@ -144,18 +130,6 @@ void JsonWriterCreationResponseMock(String testStreamName, Set<Long> responseOff
.build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
Expand Down Expand Up @@ -183,19 +157,15 @@ public void testJsonWriteSuccess() throws Exception {
ApiFuture<Long> ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(1));
((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(1));

Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
Expand All @@ -205,7 +175,7 @@ public void testJsonWriteSuccess() throws Exception {
assertEquals(Long.valueOf(2L), ret.get());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
DirectWriter.clearCache();
}

Expand All @@ -220,13 +190,9 @@ public void testProtobufWriteSuccess() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
Expand All @@ -241,7 +207,7 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
assertEquals(expectRequest.toString(), actualRequests.get(1).toString());

Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
Expand All @@ -254,7 +220,7 @@ public void testProtobufWriteSuccess() throws Exception {
ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build());
expectRequest =
Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build();
assertEquals(expectRequest.toString(), actualRequests.get(3).toString());
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());

// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
Expand All @@ -271,14 +237,10 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM_2)
.setProtoRows(dataBuilder.build())
.build();
Assert.assertEquals(7, actualRequests.size());
Assert.assertEquals(5, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
assertEquals(expectRequest.toString(), actualRequests.get(6).toString());
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
assertEquals(expectRequest.toString(), actualRequests.get(4).toString());

DirectWriter.clearCache();
}
Expand Down Expand Up @@ -433,13 +395,9 @@ public void testJsonProtobufWrite() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
Expand All @@ -454,25 +412,19 @@ public void testJsonProtobufWrite() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
assertEquals(expectRequest.toString(), actualRequests.get(1).toString());

JsonWriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(6, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(4)).getName());
Assert.assertEquals(4, actualRequests.size());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(1));
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(1));

DirectWriter.clearCache();
}
Expand Down
Expand Up @@ -26,7 +26,6 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -40,8 +39,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

@RunWith(JUnit4.class)
public class JsonWriterCacheTest {
Expand Down Expand Up @@ -108,18 +105,6 @@ void WriterCreationResponseMock(String testStreamName) {
.setTableSchema(TABLE_SCHEMA)
.build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);
}

@After
Expand All @@ -144,50 +129,15 @@ public void testCreateNewWriter() throws Exception {
WriterCreationResponseMock(TEST_STREAM);
JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
assertEquals(2, actualRequests.size());
assertEquals(1, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

assertEquals(TEST_STREAM, writer.getStreamName());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
public void testWriterExpired() throws Exception {
JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 10);
// Response from CreateWriteStream
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(TEST_STREAM).build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now().minus(2, ChronoUnit.DAYS);
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(TEST_STREAM)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

try {
JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
fail("Should fail");
} catch (IllegalStateException e) {
assertEquals(
"Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
e.getMessage());
}
cache.clear();
}

@Test
public void testWriterWithDifferentTable() throws Exception {
JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 2);
Expand All @@ -197,14 +147,11 @@ public void testWriterWithDifferentTable() throws Exception {
JsonStreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2);

List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
assertEquals(4, actualRequests.size());
assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
Assert.assertEquals(
TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent());
assertEquals(TEST_STREAM, writer1.getStreamName());
assertEquals(TEST_STREAM_21, writer2.getStreamName());
assertEquals(2, cache.cachedTableCount());
Expand Down

0 comments on commit 2ae69b6

Please sign in to comment.