Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Revive schema update e2e test and adjust some test names #921

Merged
merged 2 commits into from Mar 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -205,7 +205,7 @@ public void testBatchWriteWithCommittedStream()
}

@Test
public void testJsonStreamWriterWriteWithCommittedStream()
public void testJsonStreamWriterCommittedStream()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTable";
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testJsonStreamWriterWriteWithCommittedStream()
}

@Test
public void testJsonStreamWriterBatchWriteWithDefaultStream()
public void testJsonStreamWriterWithDefaultStream()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTableDefaultStream";
Expand Down Expand Up @@ -355,141 +355,137 @@ public void testJsonStreamWriterBatchWriteWithDefaultStream()
}
}

// @Test
// public void testJsonStreamWriterSchemaUpdate()
// throws IOException, InterruptedException, ExecutionException,
// Descriptors.DescriptorValidationException {
// String tableName = "SchemaUpdateTable";
// TableInfo tableInfo =
// TableInfo.newBuilder(
// TableId.of(DATASET, tableName),
// StandardTableDefinition.of(
// Schema.of(
// com.google.cloud.bigquery.Field.newBuilder("foo",
// LegacySQLTypeName.STRING)
// .build())))
// .build();
//
// bigquery.create(tableInfo);
// TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
// WriteStream writeStream =
// client.createWriteStream(
// CreateWriteStreamRequest.newBuilder()
// .setParent(parent.toString())
// .setWriteStream(
// WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
// .build());
//
// try (JsonStreamWriter jsonStreamWriter =
// JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
// .setBatchingSettings(
// StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
// .toBuilder()
// .setElementCountThreshold(1L)
// .build())
// .build()) {
// // 1). Send 1 row
// JSONObject foo = new JSONObject();
// foo.put("foo", "aaa");
// JSONArray jsonArr = new JSONArray();
// jsonArr.put(foo);
//
// ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, -1);
// assertEquals(0, response.get().getAppendResult().getOffset().getValue());
// // 2). Schema update and wait until querying it returns a new schema.
// try {
// com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName);
// Schema schema = table.getDefinition().getSchema();
// FieldList fields = schema.getFields();
// Field newField =
// Field.newBuilder("bar",
// LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
//
// List<Field> fieldList = new ArrayList<Field>();
// fieldList.add(fields.get(0));
// fieldList.add(newField);
// Schema newSchema = Schema.of(fieldList);
// // Update the table with the new schema
// com.google.cloud.bigquery.Table updatedTable =
// table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build();
// updatedTable.update();
// int millis = 0;
// while (millis <= 10000) {
// if (newSchema.equals(table.reload().getDefinition().getSchema())) {
// break;
// }
// Thread.sleep(1000);
// millis += 1000;
// }
// newSchema = schema;
// LOG.info(
// "bar column successfully added to table in "
// + millis
// + " millis: "
// + bigquery.getTable(DATASET, tableName).getDefinition().getSchema());
// } catch (BigQueryException e) {
// LOG.severe("bar column was not added. \n" + e.toString());
// }
// // 3). Send rows to wait for updatedSchema to be returned.
// JSONObject foo2 = new JSONObject();
// foo2.put("foo", "bbb");
// JSONArray jsonArr2 = new JSONArray();
// jsonArr2.put(foo2);
//
// int next = 0;
// for (int i = 1; i < 100; i++) {
// ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
// assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
// if (response2.get().hasUpdatedSchema()) {
// next = i;
// break;
// } else {
// Thread.sleep(1000);
// }
// }
//
// int millis = 0;
// while (millis <= 10000) {
// if (jsonStreamWriter.getDescriptor().getFields().size() == 2) {
// LOG.info("JsonStreamWriter successfully updated internal descriptor!");
// break;
// }
// Thread.sleep(100);
// millis += 100;
// }
// assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2);
// // 4). Send rows with updated schema.
// JSONObject updatedFoo = new JSONObject();
// updatedFoo.put("foo", "ccc");
// updatedFoo.put("bar", "ddd");
// JSONArray updatedJsonArr = new JSONArray();
// updatedJsonArr.put(updatedFoo);
// for (int i = 0; i < 10; i++) {
// ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(updatedJsonArr, -1);
// assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
// response3.get();
// }
//
// TableResult result3 =
// bigquery.listTableData(
// tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
// Iterator<FieldValueList> iter3 = result3.getValues().iterator();
// assertEquals("aaa", iter3.next().get(0).getStringValue());
// for (int j = 1; j <= next; j++) {
// assertEquals("bbb", iter3.next().get(0).getStringValue());
// }
// for (int j = next + 1; j < next + 1 + 10; j++) {
// FieldValueList temp = iter3.next();
// assertEquals("ccc", temp.get(0).getStringValue());
// assertEquals("ddd", temp.get(1).getStringValue());
// }
// assertEquals(false, iter3.hasNext());
// }
// }
@Test
public void testJsonStreamWriterSchemaUpdate()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "SchemaUpdateTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
.build())))
.build();

// Create a Bigquery table.
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
// Create a Write Api stream.
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
// 1). Send 1 row
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());
// 2). Schema update and wait until querying it returns a new schema.
try {
com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName);
Schema schema = table.getDefinition().getSchema();
FieldList fields = schema.getFields();
Field newField =
Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();

List<Field> fieldList = new ArrayList<Field>();
fieldList.add(fields.get(0));
fieldList.add(newField);
Schema newSchema = Schema.of(fieldList);
// Update the table with the new schema
com.google.cloud.bigquery.Table updatedTable =
table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build();
updatedTable.update();
int millis = 0;
while (millis <= 10000) {
if (newSchema.equals(table.reload().getDefinition().getSchema())) {
break;
}
Thread.sleep(1000);
millis += 1000;
}
newSchema = schema;
LOG.info(
"bar column successfully added to table in "
+ millis
+ " millis: "
+ bigquery.getTable(DATASET, tableName).getDefinition().getSchema());
} catch (BigQueryException e) {
LOG.severe("bar column was not added. \n" + e.toString());
}
// 3). Send rows to wait for updatedSchema to be returned.
JSONObject foo2 = new JSONObject();
foo2.put("foo", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

int next = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
next = i;
break;
} else {
Thread.sleep(1000);
}
}

int millis = 0;
while (millis <= 10000) {
if (jsonStreamWriter.getDescriptor().getFields().size() == 2) {
LOG.info("JsonStreamWriter successfully updated internal descriptor!");
break;
}
Thread.sleep(100);
millis += 100;
}
assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2);
// 4). Send rows with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "ccc");
updatedFoo.put("bar", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
for (int i = 0; i < 10; i++) {
ApiFuture<AppendRowsResponse> response3 =
jsonStreamWriter.append(updatedJsonArr, next + 1 + i);
assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
response3.get();
}

TableResult result3 =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter3 = result3.getValues().iterator();
assertEquals("aaa", iter3.next().get(0).getStringValue());
for (int j = 1; j <= next; j++) {
assertEquals("bbb", iter3.next().get(0).getStringValue());
}
for (int j = next + 1; j < next + 1 + 10; j++) {
FieldValueList temp = iter3.next();
assertEquals("ccc", temp.get(0).getStringValue());
assertEquals("ddd", temp.get(1).getStringValue());
}
assertEquals(false, iter3.hasNext());
}
}

@Test
public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
LOG.info("Create a write stream");
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
Expand Down Expand Up @@ -520,6 +516,7 @@ public void testComplicateSchemaWithPendingStream()
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals(false, iter.hasNext());

LOG.info("Finalize a write stream");
finalizeResponse =
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
Expand All @@ -536,8 +533,8 @@ public void testComplicateSchemaWithPendingStream()
LOG.info("Got exception: " + expected.toString());
}
}
// Finalize row count is not populated.
assertEquals(2, finalizeResponse.getRowCount());
LOG.info("Commit a write stream");
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
client.batchCommitWriteStreams(
BatchCommitWriteStreamsRequest.newBuilder()
Expand Down