diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6990f13173..afd492ae6b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -19,10 +19,12 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -39,8 +41,6 @@ /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * - *

TODO: Attach schema. - * *

TODO: Attach traceId. * *

TODO: Support batching. @@ -59,6 +59,11 @@ public class StreamWriterV2 implements AutoCloseable { */ private final String streamName; + /* + * The proto schema of rows to write. + */ + private final ProtoSchema writerSchema; + /* * Max allowed inflight requests in the stream. Method append is blocked at this. */ @@ -135,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException { this.hasMessageInWaitingQueue = lock.newCondition(); this.inflightReduced = lock.newCondition(); this.streamName = builder.streamName; + this.writerSchema = builder.writerSchema; this.maxInflightRequests = builder.maxInflightRequest; this.maxInflightBytes = builder.maxInflightBytes; this.waitingRequestQueue = new LinkedList(); @@ -188,10 +194,52 @@ public void run() { * ApiFuture messageIdFuture = writer.append(message); * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { * public void onSuccess(AppendRowsResponse response) { - * if (response.hasOffset()) { - * System.out.println("written with offset: " + response.getOffset()); + * if (!response.hasError()) { + * System.out.println("written with offset: " + response.getAppendResult().getOffset()); + * } else { + * System.out.println("received an in stream error: " + response.getError().toString()); + * } + * } + * + * public void onFailure(Throwable t) { + * System.out.println("failed to write: " + t); + * } + * }, MoreExecutors.directExecutor()); + * } + * + * @param rows the rows in serialized format to write to BigQuery. + * @param offset the offset of the first row. + * @return the append response wrapped in a future. + */ + public ApiFuture append(ProtoRows rows, long offset) { + // TODO: Move this check to builder after the other append is removed. + if (this.writerSchema == null) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("Writer schema must be provided when building this writer.")); + } + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); + if (offset >= 0) { + requestBuilder.setOffset(Int64Value.of(offset)); + } + return append(requestBuilder.build()); + } + + /** + * Schedules the writing of a message. + * + *

Example of writing a message. + * + *

{@code
+   * AppendRowsRequest message;
+   * ApiFuture messageIdFuture = writer.append(message);
+   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+   *   public void onSuccess(AppendRowsResponse response) {
+   *     if (!response.hasError()) {
+   *       System.out.println("written with offset: " + response.getAppendResult().getOffset());
    *     } else {
-   *       System.out.println("received an in stream error: " + response.error().toString());
+   *       System.out.println("received an in stream error: " + response.getError().toString());
    *     }
    *   }
    *
@@ -202,8 +250,9 @@ public void run() {
    * }
* * @param message the message in serialized format to write to BigQuery. - * @return the message ID wrapped in a future. + * @return the append response wrapped in a future. */ + @Deprecated public ApiFuture append(AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { @@ -380,6 +429,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition( AppendRowsRequest original, boolean isFirstRequest) { AppendRowsRequest.Builder requestBuilder = original.toBuilder(); if (isFirstRequest) { + if (this.writerSchema != null) { + requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema); + } requestBuilder.setWriteStream(this.streamName); } else { requestBuilder.clearWriteStream(); @@ -473,6 +525,8 @@ public static final class Builder { private BigQueryWriteClient client; + private ProtoSchema writerSchema = null; + private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS; private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES; @@ -495,6 +549,12 @@ private Builder(String streamName, BigQueryWriteClient client) { this.client = Preconditions.checkNotNull(client); } + /** Sets the proto schema of the rows. */ + public Builder setWriterSchema(ProtoSchema writerSchema) { + this.writerSchema = writerSchema; + return this; + } + public Builder setMaxInflightRequests(long value) { this.maxInflightRequest = value; return this; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index f32487afb1..a6b4a1e790 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -87,31 +87,39 @@ private StreamWriterV2 getTestStreamWriterV2() throws IOException { return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); } - private AppendRowsRequest createAppendRequest(String[] messages, long offset) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); - dataBuilder.setWriterSchema( - ProtoSchema.newBuilder() - .setProtoDescriptor( - DescriptorProtos.DescriptorProto.newBuilder() - .setName("Message") - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("foo") - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) - .setNumber(1) - .build()) - .build())); - ProtoRows.Builder rows = ProtoRows.newBuilder(); + private ProtoSchema createProtoSchema() { + return ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("Message") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build()) + .build(); + } + + private ProtoRows createProtoRows(String[] messages) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); for (String message : messages) { FooType foo = FooType.newBuilder().setFoo(message).build(); - rows.addSerializedRows(foo.toByteString()); + rowsBuilder.addSerializedRows(foo.toByteString()); } + return rowsBuilder.build(); + } + + private AppendRowsRequest createAppendRequest(String[] messages, long offset) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema(createProtoSchema()); if (offset > 0) { requestBuilder.setOffset(Int64Value.of(offset)); } return requestBuilder - .setProtoRows(dataBuilder.setRows(rows.build()).build()) + .setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build()) .setWriteStream(TEST_STREAM) .build(); } @@ -166,6 +174,24 @@ public void run() { appendThread.interrupt(); } + private void verifyAppendRequests(long appendCount) { + assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + assertTrue(serverRequest.getProtoRows().getRows().getSerializedRowsCount() > 0); + assertEquals(i, serverRequest.getOffset().getValue()); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + } + @Test public void testBuildBigQueryWriteClientInWriter() throws Exception { StreamWriterV2 writer = @@ -181,40 +207,68 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { } @Test - public void testAppendSuccess() throws Exception { - StreamWriterV2 writer = getTestStreamWriterV2(); + public void testAppendWithRowsSuccess() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build(); - long appendCount = 1000; + long appendCount = 100; for (int i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } List> futures = new ArrayList<>(); for (int i = 0; i < appendCount; i++) { - futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); } for (int i = 0; i < appendCount; i++) { assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); } - assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + verifyAppendRequests(appendCount); + + writer.close(); + } + + @Test + public void testAppendWithMessageSuccess() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + + long appendCount = 1000; for (int i = 0; i < appendCount; i++) { - AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); - if (i == 0) { - // First request received by server should have schema and stream name. - assertTrue(serverRequest.getProtoRows().hasWriterSchema()); - assertEquals(serverRequest.getWriteStream(), TEST_STREAM); - } else { - // Following request should not have schema and stream name. - assertFalse(serverRequest.getProtoRows().hasWriterSchema()); - assertEquals(serverRequest.getWriteStream(), ""); - } + testBigQueryWrite.addResponse(createAppendResponse(i)); } + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i))); + } + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + + verifyAppendRequests(appendCount); + writer.close(); } + @Test + public void testAppendWithRowsNoSchema() throws Exception { + final StreamWriterV2 writer = getTestStreamWriterV2(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + writer.append(createProtoRows(new String[] {"A"}), -1); + } + }); + assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); + assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided")); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2();