From 21e399bf4fca9b3ef84443ae1d32a6bfe393b61b Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 4 May 2021 16:09:06 -0700 Subject: [PATCH] fix: remove schema update capability from jsonwriter and delete related tests (#1047) * fix: remove schema update capability from json writer and delete related tests * . * . --- .../storage/v1beta2/JsonStreamWriter.java | 48 +- .../v1beta2/OnSchemaUpdateRunnable.java | 2 + .../storage/v1beta2/JsonStreamWriterTest.java | 528 ------------------ .../it/ITBigQueryWriteManualClientTest.java | 127 ----- 4 files changed, 5 insertions(+), 700 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java index 64aa9e45de..19fc29f9cf 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java @@ -36,10 +36,7 @@ /** * A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is * built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then - * calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter - * functions, but also provides an additional feature: schema update support, where if the BigQuery - * table schema is updated, users will be able to ingest data on the new schema after some time (in - * order of minutes). + * calls StreamWriter's append() method to write to BigQuery tables. */ public class JsonStreamWriter implements AutoCloseable { private static String streamPatternString = @@ -83,8 +80,7 @@ private JsonStreamWriter(Builder builder) /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON - * data to protobuf messages, then using StreamWriter's append() to write the data. If there is a - * schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform. + * data to protobuf messages, then using StreamWriter's append() to write the data. * * @param jsonArr The JSON array that contains JSONObjects to be written * @return ApiFuture returns an AppendRowsResponse message wrapped in an @@ -96,8 +92,7 @@ public ApiFuture append(JSONArray jsonArr) { /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON - * data to protobuf messages, then using StreamWriter's append() to write the data. If there is a - * schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform. + * data to protobuf messages, then using StreamWriter's append() to write the data. * * @param jsonArr The JSON array that contains JSONObjects to be written * @param offset Offset for deduplication @@ -193,10 +188,6 @@ private void setStreamWriterSettings( if (createDefaultStream) { streamWriterBuilder.createDefaultStream(); } - JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable = - new JsonStreamWriterOnSchemaUpdateRunnable(); - jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this); - streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable); } /** @@ -267,39 +258,6 @@ public void close() { this.streamWriter.close(); } - private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable { - private JsonStreamWriter jsonStreamWriter; - /** - * Setter for the jsonStreamWriter - * - * @param jsonStreamWriter - */ - public void setJsonStreamWriter(JsonStreamWriter jsonStreamWriter) { - this.jsonStreamWriter = jsonStreamWriter; - } - - /** Getter for the jsonStreamWriter */ - public JsonStreamWriter getJsonStreamWriter() { - return this.jsonStreamWriter; - } - - @Override - public void run() { - this.getJsonStreamWriter().setTableSchema(this.getUpdatedSchema()); - try { - this.getJsonStreamWriter().refreshConnection(); - } catch (InterruptedException | IOException e) { - LOG.severe("StreamWriter failed to refresh upon schema update." + e); - return; - } catch (Descriptors.DescriptorValidationException e) { - LOG.severe( - "Schema update fail: updated schema could not be converted to a valid descriptor."); - return; - } - LOG.info("Successfully updated schema: " + this.getUpdatedSchema()); - } - } - public static final class Builder { private String streamOrTableName; private BigQueryWriteClient client; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java index 17c961cab7..dc2f855d0c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java @@ -19,6 +19,8 @@ * A abstract class that implements the Runnable interface and provides access to the current * StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been * passed back through the AppendRowsResponse. Users should only implement the run() function. + * + * @deprecated */ public abstract class OnSchemaUpdateRunnable implements Runnable { private StreamWriter streamWriter; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 7e24d0ead4..7dae1cd5d3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -16,7 +16,6 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.gax.core.ExecutorProvider; @@ -30,7 +29,6 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.storage.test.JsonTest.ComplexRoot; import com.google.cloud.bigquery.storage.test.Test.FooType; -import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; @@ -38,9 +36,6 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; @@ -503,141 +498,6 @@ public void testSingleAppendComplexJson() throws Exception { } } - // @Test - // public void testAppendMultipleSchemaUpdate() throws Exception { - // try (JsonStreamWriter writer = - // getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // // Add fake resposne for FakeBigQueryWrite, first response has updated schema. - // testBigQueryWrite.addResponse( - // AppendRowsResponse.newBuilder() - // .setAppendResult( - // - // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - // .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - // .build()); - // testBigQueryWrite.addResponse( - // AppendRowsResponse.newBuilder() - // .setAppendResult( - // - // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - // .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) - // .build()); - // testBigQueryWrite.addResponse( - // AppendRowsResponse.newBuilder() - // .setAppendResult( - // - // AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - // .build()); - // // First append - // JSONObject foo = new JSONObject(); - // foo.put("foo", "allen"); - // JSONArray jsonArr = new JSONArray(); - // jsonArr.put(foo); - // - // ApiFuture appendFuture1 = writer.append(jsonArr); - // - // int millis = 0; - // while (millis <= 10000) { - // if (writer.getDescriptor().getFields().size() == 2) { - // break; - // } - // Thread.sleep(100); - // millis += 100; - // } - // assertTrue(writer.getDescriptor().getFields().size() == 2); - // assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - // assertEquals( - // 1, - // testBigQueryWrite - // .getAppendRequests() - // .get(0) - // .getProtoRows() - // .getRows() - // .getSerializedRowsCount()); - // assertEquals( - // testBigQueryWrite - // .getAppendRequests() - // .get(0) - // .getProtoRows() - // .getRows() - // .getSerializedRows(0), - // FooType.newBuilder().setFoo("allen").build().toByteString()); - // - // // Second append with updated schema. - // JSONObject updatedFoo = new JSONObject(); - // updatedFoo.put("foo", "allen"); - // updatedFoo.put("bar", "allen2"); - // JSONArray updatedJsonArr = new JSONArray(); - // updatedJsonArr.put(updatedFoo); - // - // ApiFuture appendFuture2 = writer.append(updatedJsonArr); - // - // millis = 0; - // while (millis <= 10000) { - // if (writer.getDescriptor().getFields().size() == 3) { - // break; - // } - // Thread.sleep(100); - // millis += 100; - // } - // assertTrue(writer.getDescriptor().getFields().size() == 3); - // assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - // assertEquals( - // 1, - // testBigQueryWrite - // .getAppendRequests() - // .get(1) - // .getProtoRows() - // .getRows() - // .getSerializedRowsCount()); - // assertEquals( - // testBigQueryWrite - // .getAppendRequests() - // .get(1) - // .getProtoRows() - // .getRows() - // .getSerializedRows(0), - // UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - // - // // Third append with updated schema. - // JSONObject updatedFoo2 = new JSONObject(); - // updatedFoo2.put("foo", "allen"); - // updatedFoo2.put("bar", "allen2"); - // updatedFoo2.put("baz", "allen3"); - // JSONArray updatedJsonArr2 = new JSONArray(); - // updatedJsonArr2.put(updatedFoo2); - // - // ApiFuture appendFuture3 = writer.append(updatedJsonArr2); - // - // assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - // assertEquals( - // 1, - // testBigQueryWrite - // .getAppendRequests() - // .get(2) - // .getProtoRows() - // .getRows() - // .getSerializedRowsCount()); - // assertEquals( - // testBigQueryWrite - // .getAppendRequests() - // .get(2) - // .getProtoRows() - // .getRows() - // .getSerializedRows(0), - // UpdatedFooType2.newBuilder() - // .setFoo("allen") - // .setBar("allen2") - // .setBaz("allen3") - // .build() - // .toByteString()); - // // Check if writer schemas were added in for both connections. - // assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - // assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - // assertTrue(testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); - // } - // } - @Test public void testAppendOutOfRangeException() throws Exception { try (JsonStreamWriter writer = @@ -660,191 +520,6 @@ public void testAppendOutOfRangeException() throws Exception { } } - @Test - public void testAppendOutOfRangeAndUpdateSchema() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - ApiFuture appendFuture = writer.append(jsonArr); - try { - appendFuture.get(); - Assert.fail("expected ExecutionException"); - } catch (ExecutionException ex) { - assertEquals(ex.getCause().getMessage(), "OUT_OF_RANGE: "); - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 2); - } - - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "allen"); - updatedFoo.put("bar", "allen2"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - - ApiFuture appendFuture2 = writer.append(updatedJsonArr); - assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue()); - appendFuture2.get(); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - - // Check if writer schemas were added in for both connections. - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - } - } - - @Test - public void testSchemaUpdateSuccess() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - // First append - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - ApiFuture appendFuture1 = writer.append(jsonArr); - ApiFuture appendFuture2 = writer.append(jsonArr); - ApiFuture appendFuture3 = writer.append(jsonArr); - - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - - assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(writer.getDescriptor().getFields().size() == 2); - - // Second append with updated schema. - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "allen"); - updatedFoo.put("bar", "allen2"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - - ApiFuture appendFuture4 = writer.append(updatedJsonArr); - - assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); - assertEquals(4, testBigQueryWrite.getAppendRequests().size()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(3) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(3) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertTrue( - testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() - || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); - } - } - @Test public void testCreateDefaultStream() throws Exception { Schema v2Schema = @@ -861,207 +536,4 @@ public void testCreateDefaultStream() throws Exception { assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName()); } } - - @Test - public void testMultiThreadAppendNoSchemaUpdate() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - final JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - final Collection offsetSets = Collections.synchronizedCollection(new HashSet()); - int thread_nums = 5; - Thread[] thread_arr = new Thread[thread_nums]; - for (int i = 0; i < thread_nums; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .build()); - offsetSets.add((long) i); - Thread t = - new Thread( - new Runnable() { - public void run() { - try { - ApiFuture appendFuture = writer.append(jsonArr); - AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getAppendResult().getOffset().getValue()); - } catch (Exception e) { - LOG.severe("Thread execution failed: " + e.getMessage()); - } - } - }); - thread_arr[i] = t; - t.start(); - } - - for (int i = 0; i < thread_nums; i++) { - thread_arr[i].join(); - } - assertTrue(offsetSets.size() == 0); - for (int i = 0; i < thread_nums; i++) { - LOG.info(testBigQueryWrite.getAppendRequests().get(i).toString()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - } - } - } - - @Test - public void testMultiThreadAppendWithSchemaUpdate() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - final JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - final Collection offsetSets = Collections.synchronizedCollection(new HashSet()); - int numberThreads = 5; - Thread[] thread_arr = new Thread[numberThreads]; - for (int i = 0; i < numberThreads; i++) { - if (i == 2) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .setUpdatedSchema(UPDATED_TABLE_SCHEMA) - .build()); - } else { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .build()); - } - - offsetSets.add((long) i); - Thread t = - new Thread( - new Runnable() { - public void run() { - try { - ApiFuture appendFuture = writer.append(jsonArr); - AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getAppendResult().getOffset().getValue()); - } catch (Exception e) { - LOG.severe("Thread execution failed: " + e.getMessage()); - } - } - }); - thread_arr[i] = t; - t.start(); - } - - for (int i = 0; i < numberThreads; i++) { - thread_arr[i].join(); - } - assertTrue(offsetSets.size() == 0); - for (int i = 0; i < numberThreads; i++) { - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(i) - .getProtoRows() - .getRows() - .getSerializedRows(0), - FooType.newBuilder().setFoo("allen").build().toByteString()); - } - - int millis = 0; - while (millis <= 10000) { - if (writer.getDescriptor().getFields().size() == 2) { - break; - } - Thread.sleep(100); - millis += 100; - } - assertEquals(2, writer.getDescriptor().getFields().size()); - - foo.put("bar", "allen2"); - final JSONArray jsonArr2 = new JSONArray(); - jsonArr2.put(foo); - - for (int i = numberThreads; i < numberThreads + 5; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i)) - .build()) - .build()); - offsetSets.add((long) i); - Thread t = - new Thread( - new Runnable() { - public void run() { - try { - ApiFuture appendFuture = writer.append(jsonArr2); - AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getAppendResult().getOffset().getValue()); - } catch (Exception e) { - LOG.severe("Thread execution failed: " + e.getMessage()); - } - } - }); - thread_arr[i - 5] = t; - t.start(); - } - - for (int i = 0; i < numberThreads; i++) { - thread_arr[i].join(); - } - assertTrue(offsetSets.size() == 0); - for (int i = 0; i < numberThreads; i++) { - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(i + 5) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite - .getAppendRequests() - .get(i + 5) - .getProtoRows() - .getRows() - .getSerializedRows(0), - UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); - } - } - } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index cfb5570b73..4c89dd55e5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -447,133 +447,6 @@ public void testJsonStreamWriterWithDefaultStream() } } - @Test - public void testJsonStreamWriterSchemaUpdate() - throws IOException, InterruptedException, ExecutionException, - Descriptors.DescriptorValidationException { - String tableName = "SchemaUpdateTable"; - TableInfo tableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, tableName), - StandardTableDefinition.of( - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) - .build()))) - .build(); - - // Create a Bigquery table. - bigquery.create(tableInfo); - TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); - // Create a Write Api stream. - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(parent.toString()) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - - try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { - // 1). Send 1 row - JSONObject foo = new JSONObject(); - foo.put("foo", "aaa"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - - ApiFuture response = jsonStreamWriter.append(jsonArr, 0); - assertEquals(0, response.get().getAppendResult().getOffset().getValue()); - // 2). Schema update and wait until querying it returns a new schema. - try { - com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); - Schema schema = table.getDefinition().getSchema(); - FieldList fields = schema.getFields(); - Field newField = - Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); - - List fieldList = new ArrayList(); - fieldList.add(fields.get(0)); - fieldList.add(newField); - Schema newSchema = Schema.of(fieldList); - // Update the table with the new schema - com.google.cloud.bigquery.Table updatedTable = - table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); - updatedTable.update(); - int millis = 0; - while (millis <= 10000) { - if (newSchema.equals(table.reload().getDefinition().getSchema())) { - break; - } - Thread.sleep(1000); - millis += 1000; - } - newSchema = schema; - LOG.info( - "bar column successfully added to table in " - + millis - + " millis: " - + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); - } catch (BigQueryException e) { - LOG.severe("bar column was not added. \n" + e.toString()); - } - // 3). Send rows to wait for updatedSchema to be returned. - JSONObject foo2 = new JSONObject(); - foo2.put("foo", "bbb"); - JSONArray jsonArr2 = new JSONArray(); - jsonArr2.put(foo2); - - int next = 0; - for (int i = 1; i < 100; i++) { - ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); - assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); - if (response2.get().hasUpdatedSchema()) { - next = i; - break; - } else { - Thread.sleep(1000); - } - } - - int millis = 0; - while (millis <= 10000) { - if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { - LOG.info("JsonStreamWriter successfully updated internal descriptor!"); - break; - } - Thread.sleep(100); - millis += 100; - } - assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); - // 4). Send rows with updated schema. - JSONObject updatedFoo = new JSONObject(); - updatedFoo.put("foo", "ccc"); - updatedFoo.put("bar", "ddd"); - JSONArray updatedJsonArr = new JSONArray(); - updatedJsonArr.put(updatedFoo); - for (int i = 0; i < 10; i++) { - ApiFuture response3 = - jsonStreamWriter.append(updatedJsonArr, next + 1 + i); - assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); - response3.get(); - } - - TableResult result3 = - bigquery.listTableData( - tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter3 = result3.getValues().iterator(); - assertEquals("aaa", iter3.next().get(0).getStringValue()); - for (int j = 1; j <= next; j++) { - assertEquals("bbb", iter3.next().get(0).getStringValue()); - } - for (int j = next + 1; j < next + 1 + 10; j++) { - FieldValueList temp = iter3.next(); - assertEquals("ccc", temp.get(0).getStringValue()); - assertEquals("ddd", temp.get(1).getStringValue()); - } - assertEquals(false, iter3.hasNext()); - } - } - @Test public void testComplicateSchemaWithPendingStream() throws IOException, InterruptedException, ExecutionException {