diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 9acc5b4039..bc3eebffbf 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -28,6 +28,11 @@ 7002 void flushAll(long) + + com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2 + 7002 + com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest) + com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder 7002 diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 33abf53bf4..95696a4574 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -143,6 +143,11 @@ private StreamWriterV2(Builder builder) throws IOException { this.hasMessageInWaitingQueue = lock.newCondition(); this.inflightReduced = lock.newCondition(); this.streamName = builder.streamName; + if (builder.writerSchema == null) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("Writer schema must be provided when building this writer.")); + } this.writerSchema = builder.writerSchema; this.maxInflightRequests = builder.maxInflightRequest; this.maxInflightBytes = builder.maxInflightBytes; @@ -216,48 +221,15 @@ public void run() { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - // TODO: Move this check to builder after the other append is removed. - if (this.writerSchema == null) { - throw new StatusRuntimeException( - Status.fromCode(Code.INVALID_ARGUMENT) - .withDescription("Writer schema must be provided when building this writer.")); - } AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); if (offset >= 0) { requestBuilder.setOffset(Int64Value.of(offset)); } - return append(requestBuilder.build()); + return appendInternal(requestBuilder.build()); } - /** - * Schedules the writing of a message. - * - *

Example of writing a message. - * - *

{@code
-   * AppendRowsRequest message;
-   * ApiFuture messageIdFuture = writer.append(message);
-   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
-   *   public void onSuccess(AppendRowsResponse response) {
-   *     if (!response.hasError()) {
-   *       System.out.println("written with offset: " + response.getAppendResult().getOffset());
-   *     } else {
-   *       System.out.println("received an in stream error: " + response.getError().toString());
-   *     }
-   *   }
-   *
-   *   public void onFailure(Throwable t) {
-   *     System.out.println("failed to write: " + t);
-   *   }
-   * }, MoreExecutors.directExecutor());
-   * }
- * - * @param message the message in serialized format to write to BigQuery. - * @return the append response wrapped in a future. - */ - @Deprecated - public ApiFuture append(AppendRowsRequest message) { + private ApiFuture appendInternal(AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 69aa4341a0..6f2d3767cb 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -85,7 +85,10 @@ public void tearDown() throws Exception { } private StreamWriterV2 getTestStreamWriterV2() throws IOException { - return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build(); + return StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .build(); } private ProtoSchema createProtoSchema() { @@ -112,19 +115,6 @@ private ProtoRows createProtoRows(String[] messages) { return rowsBuilder.build(); } - private AppendRowsRequest createAppendRequest(String[] messages, long offset) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); - dataBuilder.setWriterSchema(createProtoSchema()); - if (offset > 0) { - requestBuilder.setOffset(Int64Value.of(offset)); - } - return requestBuilder - .setProtoRows(dataBuilder.setRows(createProtoRows(messages)).build()) - .setWriteStream(TEST_STREAM) - .build(); - } - private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( @@ -139,7 +129,7 @@ private AppendRowsResponse createAppendResponseWithError(Status.Code code, Strin } private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) { - return writer.append(createAppendRequest(messages, -1)); + return writer.append(createProtoRows(messages), -1); } private static T assertFutureException( @@ -201,6 +191,7 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { StreamWriterV2.newBuilder(TEST_STREAM) .setCredentialsProvider(NoCredentialsProvider.create()) .setChannelProvider(serviceHelper.createChannelProvider()) + .setWriterSchema(createProtoSchema()) .build(); testBigQueryWrite.addResponse(createAppendResponse(0)); @@ -210,12 +201,8 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { } @Test - public void testAppendWithRowsSuccess() throws Exception { - StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client) - .setWriterSchema(createProtoSchema()) - .setTraceId(TEST_TRACE_ID) - .build(); + public void testAppendSuccess() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); long appendCount = 100; for (int i = 0; i < appendCount; i++) { @@ -237,38 +224,14 @@ public void testAppendWithRowsSuccess() throws Exception { } @Test - public void testAppendWithMessageSuccess() throws Exception { - StreamWriterV2 writer = getTestStreamWriterV2(); - - long appendCount = 1000; - for (int i = 0; i < appendCount; i++) { - testBigQueryWrite.addResponse(createAppendResponse(i)); - } - - List> futures = new ArrayList<>(); - for (int i = 0; i < appendCount; i++) { - futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i))); - } - - for (int i = 0; i < appendCount; i++) { - assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); - } - - verifyAppendRequests(appendCount); - - writer.close(); - } - - @Test - public void testAppendWithRowsNoSchema() throws Exception { - final StreamWriterV2 writer = getTestStreamWriterV2(); + public void testNoSchema() throws Exception { StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, new ThrowingRunnable() { @Override public void run() throws Throwable { - writer.append(createProtoRows(new String[] {"A"}), -1); + StreamWriterV2.newBuilder(TEST_STREAM, client).build(); } }); assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode()); @@ -455,7 +418,10 @@ public void serverCloseWhileRequestsInflight() throws Exception { @Test public void testZeroMaxInflightRequests() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(0).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightRequests(0) + .build(); testBigQueryWrite.addResponse(createAppendResponse(0)); verifyAppendIsBlocked(writer); writer.close(); @@ -464,7 +430,10 @@ public void testZeroMaxInflightRequests() throws Exception { @Test public void testZeroMaxInflightBytes() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(0).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightBytes(0) + .build(); testBigQueryWrite.addResponse(createAppendResponse(0)); verifyAppendIsBlocked(writer); writer.close(); @@ -473,7 +442,10 @@ public void testZeroMaxInflightBytes() throws Exception { @Test public void testOneMaxInflightRequests() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(1).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightRequests(1) + .build(); // Server will sleep 1 second before every response. testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); testBigQueryWrite.addResponse(createAppendResponse(0)); @@ -489,7 +461,10 @@ public void testOneMaxInflightRequests() throws Exception { @Test public void testAppendsWithTinyMaxInflightBytes() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(1).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightBytes(1) + .build(); // Server will sleep 100ms before every response. testBigQueryWrite.setResponseSleep(Duration.ofMillis(100)); long appendCount = 10; @@ -500,7 +475,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception { List> futures = new ArrayList<>(); long appendStartTimeMs = System.currentTimeMillis(); for (int i = 0; i < appendCount; i++) { - futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i))); + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); } long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; assertTrue(appendElapsedMs >= 1000); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java index 0de3302063..ac6cafc966 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor; import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; @@ -35,7 +34,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; -import com.google.protobuf.Int64Value; import com.google.protobuf.Message; import java.io.IOException; import java.time.Duration; @@ -163,7 +161,11 @@ private void writeToStream( BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( writeStream.getTableSchema()); ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor); - try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) { + try (StreamWriterV2 writer = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(protoSchema) + .setTraceId("SAMPLE:parallel_append") + .build()) { while (System.currentTimeMillis() < deadlineMillis) { synchronized (this) { if (error != null) { @@ -171,8 +173,7 @@ private void writeToStream( throw error; } } - ApiFuture future = - writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1)); + ApiFuture future = writer.append(createAppendRows(descriptor), -1); synchronized (this) { inflightCount++; } @@ -197,8 +198,7 @@ private void waitForInflightToReachZero(Duration timeout) { throw new RuntimeException("Timeout waiting for inflight count to reach 0"); } - private AppendRowsRequest createAppendRequest( - String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) { + private ProtoRows createAppendRows(Descriptor descriptor) { ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); for (int i = 0; i < BATCH_SIZE; i++) { byte[] payload = new byte[ROW_SIZE]; @@ -208,15 +208,7 @@ private AppendRowsRequest createAppendRequest( Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record); rowsBuilder.addSerializedRows(protoMessage.toByteString()); } - AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); - data.setWriterSchema(protoSchema); - data.setRows(rowsBuilder.build()); - AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build()); - request.setWriteStream(streamName); - if (offset >= 0) { - request.setOffset(Int64Value.of(offset)); - } - return request.build(); + return rowsBuilder.build(); } private void sleepIgnoringInterruption(Duration duration) {