Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: remove deprecated append method in StreamWriterV2 #924

Merged
merged 3 commits into from Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -28,6 +28,11 @@
<differenceType>7002</differenceType>
<method>void flushAll(long)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2</className>
<differenceType>7002</differenceType>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
Expand Down
Expand Up @@ -143,6 +143,11 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
if (builder.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
Expand Down Expand Up @@ -216,48 +221,15 @@ public void run() {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
// TODO: Move this check to builder after the other append is removed.
if (this.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return append(requestBuilder.build());
return appendInternal(requestBuilder.build());
}

/**
* Schedules the writing of a message.
*
* <p>Example of writing a message.
*
* <pre>{@code
* AppendRowsRequest message;
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
* public void onFailure(Throwable t) {
* System.out.println("failed to write: " + t);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*
* @param message the message in serialized format to write to BigQuery.
* @return the append response wrapped in a future.
*/
@Deprecated
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
Expand Down
Expand Up @@ -85,7 +85,10 @@ public void tearDown() throws Exception {
}

private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
return StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
}

private ProtoSchema createProtoSchema() {
Expand All @@ -112,19 +115,6 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(createProtoSchema());
if (offset > 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return requestBuilder
.setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build())
.setWriteStream(TEST_STREAM)
.build();
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand All @@ -139,7 +129,7 @@ private AppendRowsResponse createAppendResponseWithError(Status.Code code, Strin
}

private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
return writer.append(createAppendRequest(messages, -1));
return writer.append(createProtoRows(messages), -1);
}

private static <T extends Throwable> T assertFutureException(
Expand Down Expand Up @@ -201,6 +191,7 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2.newBuilder(TEST_STREAM)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
Expand All @@ -210,12 +201,8 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
}

@Test
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
Expand All @@ -237,38 +224,14 @@ public void testAppendWithRowsSuccess() throws Exception {
}

@Test
public void testAppendWithMessageSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

long appendCount = 1000;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

verifyAppendRequests(appendCount);

writer.close();
}

@Test
public void testAppendWithRowsNoSchema() throws Exception {
final StreamWriterV2 writer = getTestStreamWriterV2();
public void testNoSchema() throws Exception {
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
writer.append(createProtoRows(new String[] {"A"}), -1);
StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
Expand Down Expand Up @@ -455,7 +418,10 @@ public void serverCloseWhileRequestsInflight() throws Exception {
@Test
public void testZeroMaxInflightRequests() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(0).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(0)
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));
verifyAppendIsBlocked(writer);
writer.close();
Expand All @@ -464,7 +430,10 @@ public void testZeroMaxInflightRequests() throws Exception {
@Test
public void testZeroMaxInflightBytes() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(0).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(0)
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));
verifyAppendIsBlocked(writer);
writer.close();
Expand All @@ -473,7 +442,10 @@ public void testZeroMaxInflightBytes() throws Exception {
@Test
public void testOneMaxInflightRequests() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(1).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.build();
// Server will sleep 1 second before every response.
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));
Expand All @@ -489,7 +461,10 @@ public void testOneMaxInflightRequests() throws Exception {
@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(1).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.build();
// Server will sleep 100ms before every response.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
long appendCount = 10;
Expand All @@ -500,7 +475,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
long appendStartTimeMs = System.currentTimeMillis();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i)));
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
assertTrue(appendElapsedMs >= 1000);
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
Expand All @@ -35,7 +34,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -163,16 +161,19 @@ private void writeToStream(
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
try (StreamWriterV2 writer =
StreamWriterV2.newBuilder(writeStream.getName())
.setWriterSchema(protoSchema)
.setTraceId("SAMPLE:parallel_append")
.build()) {
while (System.currentTimeMillis() < deadlineMillis) {
synchronized (this) {
if (error != null) {
// Stop writing once we get an error.
throw error;
}
}
ApiFuture<AppendRowsResponse> future =
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
ApiFuture<AppendRowsResponse> future = writer.append(createAppendRows(descriptor), -1);
synchronized (this) {
inflightCount++;
}
Expand All @@ -197,8 +198,7 @@ private void waitForInflightToReachZero(Duration timeout) {
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
}

private AppendRowsRequest createAppendRequest(
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
private ProtoRows createAppendRows(Descriptor descriptor) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (int i = 0; i < BATCH_SIZE; i++) {
byte[] payload = new byte[ROW_SIZE];
Expand All @@ -208,15 +208,7 @@ private AppendRowsRequest createAppendRequest(
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(protoSchema);
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
request.setWriteStream(streamName);
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
return request.build();
return rowsBuilder.build();
}

private void sleepIgnoringInterruption(Duration duration) {
Expand Down