From baf973d84577cd490e275f6eebf91e25d5c34ccc Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 19 Feb 2021 12:18:48 -0800 Subject: [PATCH] fix: temporally disable refreshAppend (#853) * fix:temporily disable refreshAppend * . * . * . * Revert "." This reverts commit 9a29abe57a89a78c9236b45933cef610ef7aa082. --- .../storage/v1beta2/StreamWriter.java | 23 +- .../storage/v1beta2/JsonStreamWriterTest.java | 943 +++++++++--------- .../it/ITBigQueryWriteManualClientTest.java | 260 ++--- 3 files changed, 608 insertions(+), 618 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 0d760e327c..b2da57e75b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -35,6 +35,7 @@ import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnimplementedException; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Preconditions; import com.google.protobuf.Int64Value; @@ -281,27 +282,7 @@ public ApiFuture append(AppendRowsRequest message) { * @throws InterruptedException */ public void refreshAppend() throws InterruptedException { - appendAndRefreshAppendLock.lock(); - if (shutdown.get()) { - LOG.warning("Cannot refresh on a already shutdown writer."); - appendAndRefreshAppendLock.unlock(); - return; - } - // There could be a moment, stub is not yet initialized. - if (clientStream != null) { - LOG.info("Closing the stream " + streamName); - clientStream.closeSend(); - } - messagesBatch.resetAttachSchema(); - bidiStreamingCallable = stub.appendRowsCallable(); - clientStream = bidiStreamingCallable.splitCall(responseObserver); - while (!clientStream.isSendReady()) { - Thread.sleep(10); - } - Thread.sleep(this.retrySettings.getInitialRetryDelay().toMillis()); - // Can only unlock here since need to sleep the full 7 seconds before stream can allow appends. - appendAndRefreshAppendLock.unlock(); - LOG.info("Write Stream " + streamName + " connection established"); + throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false); } private void setupAlarm() { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 99fdd960da..dd8e5a6805 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -30,8 +30,6 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.storage.test.JsonTest.ComplexRoot; import com.google.cloud.bigquery.storage.test.Test.FooType; -import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; -import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType2; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; @@ -468,137 +466,140 @@ public void testSingleAppendComplexJson() throws Exception { } } - @Test - public void testAppendMultipleSchemaUpdate() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Add fake resposne for FakeBigQueryWrite, first response has updated schema. - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - // First append - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - ApiFuture appendFuture1 = writer.append(jsonArr); - - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 2); - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - - // Second append with updated schema. - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "allen"); - updatedFoo.put("bar", "allen2"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - - ApiFuture appendFuture2 = writer.append(updatedJsonArr); - - millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 3) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 3); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - - // Third append with updated schema. - JSONObject updatedFoo2 = new JSONObject(); - updatedFoo2.put("foo", "allen"); - updatedFoo2.put("bar", "allen2"); - updatedFoo2.put("baz", "allen3"); - JSONArray updatedJsonArr2 = new JSONArray(); - updatedJsonArr2.put(updatedFoo2); - - ApiFuture appendFuture3 = writer.append(updatedJsonArr2); - - assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(2) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(2) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType2.newBuilder() - .setFoo("allen") - .setBar("allen2") - .setBaz("allen3") - .build() - .toByteString()); - // Check if writer schemas were added in for both connections. - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - assertTrue(testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); - } - } + // @Test + // public void testAppendMultipleSchemaUpdate() throws Exception { + // try (JsonStreamWriter writer = + // getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + // // Add fake resposne for FakeBigQueryWrite, first response has updated schema. + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + // .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + // .build()); + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + // .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) + // .build()); + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + // .build()); + // // First append + // JSONObject foo = new JSONObject(); + // foo.put("foo", "allen"); + // JSONArray jsonArr = new JSONArray(); + // jsonArr.put(foo); + // + // ApiFuture appendFuture1 = writer.append(jsonArr); + // + // int millis = 0; + // while (millis <= 10000) { + // if (writer.getDescriptor().getFields().size() == 2) { + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertTrue(writer.getDescriptor().getFields().size() == 2); + // assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(0) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(0) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // FooType.newBuilder().setFoo("allen").build().toByteString()); + // + // // Second append with updated schema. + // JSONObject updatedFoo = new JSONObject(); + // updatedFoo.put("foo", "allen"); + // updatedFoo.put("bar", "allen2"); + // JSONArray updatedJsonArr = new JSONArray(); + // updatedJsonArr.put(updatedFoo); + // + // ApiFuture appendFuture2 = writer.append(updatedJsonArr); + // + // millis = 0; + // while (millis <= 10000) { + // if (writer.getDescriptor().getFields().size() == 3) { + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertTrue(writer.getDescriptor().getFields().size() == 3); + // assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + // + // // Third append with updated schema. + // JSONObject updatedFoo2 = new JSONObject(); + // updatedFoo2.put("foo", "allen"); + // updatedFoo2.put("bar", "allen2"); + // updatedFoo2.put("baz", "allen3"); + // JSONArray updatedJsonArr2 = new JSONArray(); + // updatedJsonArr2.put(updatedFoo2); + // + // ApiFuture appendFuture3 = writer.append(updatedJsonArr2); + // + // assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(2) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(2) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // UpdatedFooType2.newBuilder() + // .setFoo("allen") + // .setBar("allen2") + // .setBaz("allen3") + // .build() + // .toByteString()); + // // Check if writer schemas were added in for both connections. + // assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + // assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + // assertTrue(testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); + // } + // } @Test public void testAppendOutOfRangeException() throws Exception { @@ -622,198 +623,202 @@ public void testAppendOutOfRangeException() throws Exception { } } - @Test - public void testAppendOutOfRangeAndUpdateSchema() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - ApiFuture appendFuture = writer.append(jsonArr); - try { - appendFuture.get(); - Assert.fail("expected ExecutionException"); - } catch (ExecutionException ex) { - assertEquals(ex.getCause().getMessage(), "OUT_OF_RANGE: "); - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 2); - } - - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "allen"); - updatedFoo.put("bar", "allen2"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - - ApiFuture appendFuture2 = writer.append(updatedJsonArr); - assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue()); - appendFuture2.get(); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - - // Check if writer schemas were added in for both connections. - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - } - } - - @Test - public void testSchemaUpdateWithNonemptyBatch() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .build()) - .build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - // First append - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - ApiFuture appendFuture1 = writer.append(jsonArr); - ApiFuture appendFuture2 = writer.append(jsonArr); - ApiFuture appendFuture3 = writer.append(jsonArr); - - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - assertEquals( - 2, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRows(1), - FooType.newBuilder().setFoo("allen").build().toByteString()); - - assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 2); - - // Second append with updated schema. - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "allen"); - updatedFoo.put("bar", "allen2"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - - ApiFuture appendFuture4 = writer.append(updatedJsonArr); - - assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(2) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(2) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertTrue( - testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema() - || testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); - } - } + // @Test + // public void testAppendOutOfRangeAndUpdateSchema() throws Exception { + // try (JsonStreamWriter writer = + // getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) + // .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + // .build()); + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + // .build()); + // + // JSONObject foo = new JSONObject(); + // foo.put("foo", "allen"); + // JSONArray jsonArr = new JSONArray(); + // jsonArr.put(foo); + // ApiFuture appendFuture = writer.append(jsonArr); + // try { + // appendFuture.get(); + // Assert.fail("expected ExecutionException"); + // } catch (ExecutionException ex) { + // assertEquals(ex.getCause().getMessage(), "OUT_OF_RANGE: "); + // int millis = 0; + // while (millis <= 10000) { + // if (writer.getDescriptor().getFields().size() == 2) { + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertTrue(writer.getDescriptor().getFields().size() == 2); + // } + // + // JSONObject updatedFoo = new JSONObject(); + // updatedFoo.put("foo", "allen"); + // updatedFoo.put("bar", "allen2"); + // JSONArray updatedJsonArr = new JSONArray(); + // updatedJsonArr.put(updatedFoo); + // + // ApiFuture appendFuture2 = writer.append(updatedJsonArr); + // assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue()); + // appendFuture2.get(); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + // + // // Check if writer schemas were added in for both connections. + // assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + // assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + // } + // } + + // @Test + // public void testSchemaUpdateWithNonemptyBatch() throws Exception { + // try (JsonStreamWriter writer = + // getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + // .setBatchingSettings( + // StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + // .toBuilder() + // .setElementCountThreshold(2L) + // .build()) + // .build()) { + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + // .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + // .build()); + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + // .build()); + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // + // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + // .build()); + // // First append + // JSONObject foo = new JSONObject(); + // foo.put("foo", "allen"); + // JSONArray jsonArr = new JSONArray(); + // jsonArr.put(foo); + // + // ApiFuture appendFuture1 = writer.append(jsonArr); + // ApiFuture appendFuture2 = writer.append(jsonArr); + // ApiFuture appendFuture3 = writer.append(jsonArr); + // + // assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + // assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 2, + // testBigQueryWrite + // .getAppendRequests() + // .get(0) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(0) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // FooType.newBuilder().setFoo("allen").build().toByteString()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(0) + // .getProtoRows() + // .getRows() + // .getSerializedRows(1), + // FooType.newBuilder().setFoo("allen").build().toByteString()); + // + // assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(1) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // FooType.newBuilder().setFoo("allen").build().toByteString()); + // + // int millis = 0; + // while (millis <= 10000) { + // if (writer.getDescriptor().getFields().size() == 2) { + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertTrue(writer.getDescriptor().getFields().size() == 2); + // + // // Second append with updated schema. + // JSONObject updatedFoo = new JSONObject(); + // updatedFoo.put("foo", "allen"); + // updatedFoo.put("bar", "allen2"); + // JSONArray updatedJsonArr = new JSONArray(); + // updatedJsonArr.put(updatedFoo); + // + // ApiFuture appendFuture4 = writer.append(updatedJsonArr); + // + // assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(2) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(2) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + // + // assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + // assertTrue( + // testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema() + // || testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); + // } + // } @Test public void testCreateDefaultStream() throws Exception { @@ -904,147 +909,149 @@ public void run() { } } - @Test - public void testMultiThreadAppendWithSchemaUpdate() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .build()) { - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - final JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - final Collection offsetSets = Collections.synchronizedCollection(new HashSet()); - int numberThreads = 5; - Thread[] thread_arr = new Thread[numberThreads]; - for (int i = 0; i < numberThreads; i++) { - if (i == 2) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - } else { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .build()); - } - - offsetSets.add((long) i); - Thread t = - new Thread( - new Runnable() { - public void run() { - try { - ApiFuture appendFuture = writer.append(jsonArr); - AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getAppendResult().getOffset().getValue()); - } catch (Exception e) { - LOG.severe("Thread execution failed: " + e.getMessage()); - } - } - }); - thread_arr[i] = t; - t.start(); - } - - for (int i = 0; i < numberThreads; i++) { - thread_arr[i].join(); - } - assertTrue(offsetSets.size() == 0); - for (int i = 0; i < numberThreads; i++) { - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - } - - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertEquals(2, writer.getDescriptor().getFields().size()); - - foo.put("bar", "allen2"); - final JSONArray jsonArr2 = new JSONArray(); - jsonArr2.put(foo); - - for (int i = numberThreads; i < numberThreads + 5; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .build()); - offsetSets.add((long) i); - Thread t = - new Thread( - new Runnable() { - public void run() { - try { - ApiFuture appendFuture = writer.append(jsonArr2); - AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getAppendResult().getOffset().getValue()); - } catch (Exception e) { - LOG.severe("Thread execution failed: " + e.getMessage()); - } - } - }); - thread_arr[i - 5] = t; - t.start(); - } - - for (int i = 0; i < numberThreads; i++) { - thread_arr[i].join(); - } - assertTrue(offsetSets.size() == 0); - for (int i = 0; i < numberThreads; i++) { - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(i + 5) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(i + 5) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - } - } - } + // @Test + // public void testMultiThreadAppendWithSchemaUpdate() throws Exception { + // try (JsonStreamWriter writer = + // getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + // .setBatchingSettings( + // StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + // .toBuilder() + // .setElementCountThreshold(1L) + // .build()) + // .build()) { + // JSONObject foo = new JSONObject(); + // foo.put("foo", "allen"); + // final JSONArray jsonArr = new JSONArray(); + // jsonArr.put(foo); + // + // final Collection offsetSets = Collections.synchronizedCollection(new + // HashSet()); + // int numberThreads = 5; + // Thread[] thread_arr = new Thread[numberThreads]; + // for (int i = 0; i < numberThreads; i++) { + // if (i == 2) { + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // AppendRowsResponse.AppendResult.newBuilder() + // .setOffset(Int64Value.of(i)) + // .build()) + // .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + // .build()); + // } else { + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // AppendRowsResponse.AppendResult.newBuilder() + // .setOffset(Int64Value.of(i)) + // .build()) + // .build()); + // } + // + // offsetSets.add((long) i); + // Thread t = + // new Thread( + // new Runnable() { + // public void run() { + // try { + // ApiFuture appendFuture = writer.append(jsonArr); + // AppendRowsResponse response = appendFuture.get(); + // offsetSets.remove(response.getAppendResult().getOffset().getValue()); + // } catch (Exception e) { + // LOG.severe("Thread execution failed: " + e.getMessage()); + // } + // } + // }); + // thread_arr[i] = t; + // t.start(); + // } + // + // for (int i = 0; i < numberThreads; i++) { + // thread_arr[i].join(); + // } + // assertTrue(offsetSets.size() == 0); + // for (int i = 0; i < numberThreads; i++) { + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(i) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(i) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // FooType.newBuilder().setFoo("allen").build().toByteString()); + // } + // + // int millis = 0; + // while (millis <= 10000) { + // if (writer.getDescriptor().getFields().size() == 2) { + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertEquals(2, writer.getDescriptor().getFields().size()); + // + // foo.put("bar", "allen2"); + // final JSONArray jsonArr2 = new JSONArray(); + // jsonArr2.put(foo); + // + // for (int i = numberThreads; i < numberThreads + 5; i++) { + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder() + // .setAppendResult( + // AppendRowsResponse.AppendResult.newBuilder() + // .setOffset(Int64Value.of(i)) + // .build()) + // .build()); + // offsetSets.add((long) i); + // Thread t = + // new Thread( + // new Runnable() { + // public void run() { + // try { + // ApiFuture appendFuture = writer.append(jsonArr2); + // AppendRowsResponse response = appendFuture.get(); + // offsetSets.remove(response.getAppendResult().getOffset().getValue()); + // } catch (Exception e) { + // LOG.severe("Thread execution failed: " + e.getMessage()); + // } + // } + // }); + // thread_arr[i - 5] = t; + // t.start(); + // } + // + // for (int i = 0; i < numberThreads; i++) { + // thread_arr[i].join(); + // } + // assertTrue(offsetSets.size() == 0); + // for (int i = 0; i < numberThreads; i++) { + // assertEquals( + // 1, + // testBigQueryWrite + // .getAppendRequests() + // .get(i + 5) + // .getProtoRows() + // .getRows() + // .getSerializedRowsCount()); + // assertEquals( + // testBigQueryWrite + // .getAppendRequests() + // .get(i + 5) + // .getProtoRows() + // .getRows() + // .getSerializedRows(0), + // + // UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + // } + // } + // } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index af665eb35b..0981dd4ea2 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -370,135 +370,137 @@ public void testJsonStreamWriterBatchWriteWithDefaultStream() } } - @Test - public void testJsonStreamWriterSchemaUpdate() - throws IOException, InterruptedException, ExecutionException, - Descriptors.DescriptorValidationException { - String tableName = "SchemaUpdateTable"; - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) - .build()))) - .build(); - - bigquery.create(tableInfo); - TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .build()) { - // 1). Send 1 row - JSONObject foo = new JSONObject(); - foo.put("foo", "aaa"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - ApiFuture response = jsonStreamWriter.append(jsonArr, -1); - assertEquals(0, response.get().getAppendResult().getOffset().getValue()); - // 2). Schema update and wait until querying it returns a new schema. - try { - com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); - Schema schema = table.getDefinition().getSchema(); - FieldList fields = schema.getFields(); - Field newField = - Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); - - List fieldList = new ArrayList(); - fieldList.add(fields.get(0)); - fieldList.add(newField); - Schema newSchema = Schema.of(fieldList); - // Update the table with the new schema - com.google.cloud.bigquery.Table updatedTable = - table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); - updatedTable.update(); - int millis = 0; - while (millis <= 10000) { - if (newSchema.equals(table.reload().getDefinition().getSchema())) { - break; - } - Thread.sleep(1000); - millis += 1000; - } - newSchema = schema; - LOG.info( - "bar column successfully added to table in " - + millis - + " millis: " - + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); - } catch (BigQueryException e) { - LOG.severe("bar column was not added. \n" + e.toString()); - } - // 3). Send rows to wait for updatedSchema to be returned. - JSONObject foo2 = new JSONObject(); - foo2.put("foo", "bbb"); - JSONArray jsonArr2 = new JSONArray(); - jsonArr2.put(foo2); - - int next = 0; - for (int i = 1; i < 100; i++) { - ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1); - assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); - if (response2.get().hasUpdatedSchema()) { - next = i; - break; - } else { - Thread.sleep(1000); - } - } - - int millis = 0; - while (millis <= 10000) { - if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { - LOG.info("JsonStreamWriter successfully updated internal descriptor!"); - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); - // 4). Send rows with updated schema. - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "ccc"); - updatedFoo.put("bar", "ddd"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - for (int i = 0; i < 10; i++) { - ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1); - assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); - response3.get(); - } - - TableResult result3 = - bigquery.listTableData( - tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter3 = result3.getValues().iterator(); - assertEquals("aaa", iter3.next().get(0).getStringValue()); - for (int j = 1; j <= next; j++) { - assertEquals("bbb", iter3.next().get(0).getStringValue()); - } - for (int j = next + 1; j < next + 1 + 10; j++) { - FieldValueList temp = iter3.next(); - assertEquals("ccc", temp.get(0).getStringValue()); - assertEquals("ddd", temp.get(1).getStringValue()); - } - assertEquals(false, iter3.hasNext()); - } - } + // @Test + // public void testJsonStreamWriterSchemaUpdate() + // throws IOException, InterruptedException, ExecutionException, + // Descriptors.DescriptorValidationException { + // String tableName = "SchemaUpdateTable"; + // TableInfo tableInfo = + // TableInfo.newBuilder( + // TableId.of(DATASET, tableName), + // StandardTableDefinition.of( + // Schema.of( + // com.google.cloud.bigquery.Field.newBuilder("foo", + // LegacySQLTypeName.STRING) + // .build()))) + // .build(); + // + // bigquery.create(tableInfo); + // TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + // WriteStream writeStream = + // client.createWriteStream( + // CreateWriteStreamRequest.newBuilder() + // .setParent(parent.toString()) + // .setWriteStream( + // WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + // .build()); + // + // try (JsonStreamWriter jsonStreamWriter = + // JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + // .setBatchingSettings( + // StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + // .toBuilder() + // .setElementCountThreshold(1L) + // .build()) + // .build()) { + // // 1). Send 1 row + // JSONObject foo = new JSONObject(); + // foo.put("foo", "aaa"); + // JSONArray jsonArr = new JSONArray(); + // jsonArr.put(foo); + // + // ApiFuture response = jsonStreamWriter.append(jsonArr, -1); + // assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + // // 2). Schema update and wait until querying it returns a new schema. + // try { + // com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); + // Schema schema = table.getDefinition().getSchema(); + // FieldList fields = schema.getFields(); + // Field newField = + // Field.newBuilder("bar", + // LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); + // + // List fieldList = new ArrayList(); + // fieldList.add(fields.get(0)); + // fieldList.add(newField); + // Schema newSchema = Schema.of(fieldList); + // // Update the table with the new schema + // com.google.cloud.bigquery.Table updatedTable = + // table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); + // updatedTable.update(); + // int millis = 0; + // while (millis <= 10000) { + // if (newSchema.equals(table.reload().getDefinition().getSchema())) { + // break; + // } + // Thread.sleep(1000); + // millis += 1000; + // } + // newSchema = schema; + // LOG.info( + // "bar column successfully added to table in " + // + millis + // + " millis: " + // + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); + // } catch (BigQueryException e) { + // LOG.severe("bar column was not added. \n" + e.toString()); + // } + // // 3). Send rows to wait for updatedSchema to be returned. + // JSONObject foo2 = new JSONObject(); + // foo2.put("foo", "bbb"); + // JSONArray jsonArr2 = new JSONArray(); + // jsonArr2.put(foo2); + // + // int next = 0; + // for (int i = 1; i < 100; i++) { + // ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1); + // assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + // if (response2.get().hasUpdatedSchema()) { + // next = i; + // break; + // } else { + // Thread.sleep(1000); + // } + // } + // + // int millis = 0; + // while (millis <= 10000) { + // if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { + // LOG.info("JsonStreamWriter successfully updated internal descriptor!"); + // break; + // } + // Thread.sleep(100); + // millis += 100; + // } + // assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); + // // 4). Send rows with updated schema. + // JSONObject updatedFoo = new JSONObject(); + // updatedFoo.put("foo", "ccc"); + // updatedFoo.put("bar", "ddd"); + // JSONArray updatedJsonArr = new JSONArray(); + // updatedJsonArr.put(updatedFoo); + // for (int i = 0; i < 10; i++) { + // ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1); + // assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); + // response3.get(); + // } + // + // TableResult result3 = + // bigquery.listTableData( + // tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + // Iterator iter3 = result3.getValues().iterator(); + // assertEquals("aaa", iter3.next().get(0).getStringValue()); + // for (int j = 1; j <= next; j++) { + // assertEquals("bbb", iter3.next().get(0).getStringValue()); + // } + // for (int j = next + 1; j < next + 1 + 10; j++) { + // FieldValueList temp = iter3.next(); + // assertEquals("ccc", temp.get(0).getStringValue()); + // assertEquals("ddd", temp.get(1).getStringValue()); + // } + // assertEquals(false, iter3.hasNext()); + // } + // } @Test public void testComplicateSchemaWithPendingStream()