From dd392e54953e0b75e780532a4dab2d143b8d8665 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 9 Mar 2021 08:15:07 -0800 Subject: [PATCH] fix: Revive schema update e2e test and adjust some test names (#921) * fix: Revive schema update e2e test and adjust some test names * . --- .../it/ITBigQueryWriteManualClientTest.java | 265 +++++++++--------- 1 file changed, 131 insertions(+), 134 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 34e2223151..2e1fd95ed8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -205,7 +205,7 @@ public void testBatchWriteWithCommittedStream() } @Test - public void testJsonStreamWriterWriteWithCommittedStream() + public void testJsonStreamWriterCommittedStream() throws IOException, InterruptedException, ExecutionException, Descriptors.DescriptorValidationException { String tableName = "JsonTable"; @@ -283,7 +283,7 @@ public void testJsonStreamWriterWriteWithCommittedStream() } @Test - public void testJsonStreamWriterBatchWriteWithDefaultStream() + public void testJsonStreamWriterWithDefaultStream() throws IOException, InterruptedException, ExecutionException, Descriptors.DescriptorValidationException { String tableName = "JsonTableDefaultStream"; @@ -355,141 +355,137 @@ public void testJsonStreamWriterBatchWriteWithDefaultStream() } } - // @Test - // public void testJsonStreamWriterSchemaUpdate() - // throws IOException, InterruptedException, ExecutionException, - // Descriptors.DescriptorValidationException { - // String tableName = "SchemaUpdateTable"; - // TableInfo tableInfo = - // TableInfo.newBuilder( - // TableId.of(DATASET, tableName), - // StandardTableDefinition.of( - // Schema.of( - // com.google.cloud.bigquery.Field.newBuilder("foo", - // LegacySQLTypeName.STRING) - // .build()))) - // .build(); - // - // bigquery.create(tableInfo); - // TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); - // WriteStream writeStream = - // client.createWriteStream( - // CreateWriteStreamRequest.newBuilder() - // .setParent(parent.toString()) - // .setWriteStream( - // WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - // .build()); - // - // try (JsonStreamWriter jsonStreamWriter = - // JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - // .setBatchingSettings( - // StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - // .toBuilder() - // .setElementCountThreshold(1L) - // .build()) - // .build()) { - // // 1). Send 1 row - // JSONObject foo = new JSONObject(); - // foo.put("foo", "aaa"); - // JSONArray jsonArr = new JSONArray(); - // jsonArr.put(foo); - // - // ApiFuture response = jsonStreamWriter.append(jsonArr, -1); - // assertEquals(0, response.get().getAppendResult().getOffset().getValue()); - // // 2). Schema update and wait until querying it returns a new schema. - // try { - // com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); - // Schema schema = table.getDefinition().getSchema(); - // FieldList fields = schema.getFields(); - // Field newField = - // Field.newBuilder("bar", - // LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); - // - // List fieldList = new ArrayList(); - // fieldList.add(fields.get(0)); - // fieldList.add(newField); - // Schema newSchema = Schema.of(fieldList); - // // Update the table with the new schema - // com.google.cloud.bigquery.Table updatedTable = - // table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); - // updatedTable.update(); - // int millis = 0; - // while (millis <= 10000) { - // if (newSchema.equals(table.reload().getDefinition().getSchema())) { - // break; - // } - // Thread.sleep(1000); - // millis += 1000; - // } - // newSchema = schema; - // LOG.info( - // "bar column successfully added to table in " - // + millis - // + " millis: " - // + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); - // } catch (BigQueryException e) { - // LOG.severe("bar column was not added. \n" + e.toString()); - // } - // // 3). Send rows to wait for updatedSchema to be returned. - // JSONObject foo2 = new JSONObject(); - // foo2.put("foo", "bbb"); - // JSONArray jsonArr2 = new JSONArray(); - // jsonArr2.put(foo2); - // - // int next = 0; - // for (int i = 1; i < 100; i++) { - // ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1); - // assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); - // if (response2.get().hasUpdatedSchema()) { - // next = i; - // break; - // } else { - // Thread.sleep(1000); - // } - // } - // - // int millis = 0; - // while (millis <= 10000) { - // if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { - // LOG.info("JsonStreamWriter successfully updated internal descriptor!"); - // break; - // } - // Thread.sleep(100); - // millis += 100; - // } - // assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); - // // 4). Send rows with updated schema. - // JSONObject updatedFoo = new JSONObject(); - // updatedFoo.put("foo", "ccc"); - // updatedFoo.put("bar", "ddd"); - // JSONArray updatedJsonArr = new JSONArray(); - // updatedJsonArr.put(updatedFoo); - // for (int i = 0; i < 10; i++) { - // ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1); - // assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); - // response3.get(); - // } - // - // TableResult result3 = - // bigquery.listTableData( - // tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - // Iterator iter3 = result3.getValues().iterator(); - // assertEquals("aaa", iter3.next().get(0).getStringValue()); - // for (int j = 1; j <= next; j++) { - // assertEquals("bbb", iter3.next().get(0).getStringValue()); - // } - // for (int j = next + 1; j < next + 1 + 10; j++) { - // FieldValueList temp = iter3.next(); - // assertEquals("ccc", temp.get(0).getStringValue()); - // assertEquals("ddd", temp.get(1).getStringValue()); - // } - // assertEquals(false, iter3.hasNext()); - // } - // } + @Test + public void testJsonStreamWriterSchemaUpdate() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "SchemaUpdateTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + + // Create a Bigquery table. + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + // Create a Write Api stream. + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + // 1). Send 1 row + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture response = jsonStreamWriter.append(jsonArr, 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + // 2). Schema update and wait until querying it returns a new schema. + try { + com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); + Schema schema = table.getDefinition().getSchema(); + FieldList fields = schema.getFields(); + Field newField = + Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); + + List fieldList = new ArrayList(); + fieldList.add(fields.get(0)); + fieldList.add(newField); + Schema newSchema = Schema.of(fieldList); + // Update the table with the new schema + com.google.cloud.bigquery.Table updatedTable = + table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); + updatedTable.update(); + int millis = 0; + while (millis <= 10000) { + if (newSchema.equals(table.reload().getDefinition().getSchema())) { + break; + } + Thread.sleep(1000); + millis += 1000; + } + newSchema = schema; + LOG.info( + "bar column successfully added to table in " + + millis + + " millis: " + + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); + } catch (BigQueryException e) { + LOG.severe("bar column was not added. \n" + e.toString()); + } + // 3). Send rows to wait for updatedSchema to be returned. + JSONObject foo2 = new JSONObject(); + foo2.put("foo", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo2); + + int next = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + if (response2.get().hasUpdatedSchema()) { + next = i; + break; + } else { + Thread.sleep(1000); + } + } + + int millis = 0; + while (millis <= 10000) { + if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { + LOG.info("JsonStreamWriter successfully updated internal descriptor!"); + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); + // 4). Send rows with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "ccc"); + updatedFoo.put("bar", "ddd"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + for (int i = 0; i < 10; i++) { + ApiFuture response3 = + jsonStreamWriter.append(updatedJsonArr, next + 1 + i); + assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); + response3.get(); + } + + TableResult result3 = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter3 = result3.getValues().iterator(); + assertEquals("aaa", iter3.next().get(0).getStringValue()); + for (int j = 1; j <= next; j++) { + assertEquals("bbb", iter3.next().get(0).getStringValue()); + } + for (int j = next + 1; j < next + 1 + 10; j++) { + FieldValueList temp = iter3.next(); + assertEquals("ccc", temp.get(0).getStringValue()); + assertEquals("ddd", temp.get(1).getStringValue()); + } + assertEquals(false, iter3.hasNext()); + } + } @Test public void testComplicateSchemaWithPendingStream() throws IOException, InterruptedException, ExecutionException { + LOG.info("Create a write stream"); WriteStream writeStream = client.createWriteStream( CreateWriteStreamRequest.newBuilder() @@ -520,6 +516,7 @@ public void testComplicateSchemaWithPendingStream() Iterator iter = result.getValues().iterator(); assertEquals(false, iter.hasNext()); + LOG.info("Finalize a write stream"); finalizeResponse = client.finalizeWriteStream( FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); @@ -536,8 +533,8 @@ public void testComplicateSchemaWithPendingStream() LOG.info("Got exception: " + expected.toString()); } } - // Finalize row count is not populated. assertEquals(2, finalizeResponse.getRowCount()); + LOG.info("Commit a write stream"); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = client.batchCommitWriteStreams( BatchCommitWriteStreamsRequest.newBuilder()