diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 8775669893..6990f13173 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 88d6fcd1ac..f32487afb1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -198,6 +199,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); }