From c54bcfec1706eef58eaf9dad8b49dc79fc8da133 Mon Sep 17 00:00:00 2001 From: yayi-google <75696801+yayi-google@users.noreply.github.com> Date: Thu, 25 Feb 2021 15:05:09 -0800 Subject: [PATCH] feat: StreamWriterV2 will handle schema/streamName attachment (#877) * feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones. * feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones --- .../storage/v1beta2/StreamWriterV2.java | 19 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 8775669893..6990f13173 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 88d6fcd1ac..f32487afb1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -198,6 +199,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); }