Skip to content

Commit

Permalink
fix: get recordvalue by field name (#718)
Browse files Browse the repository at this point in the history
* fix: get recordvalue by field name

* fix: remove duplicate code
  • Loading branch information
Praful Makani committed Sep 2, 2020
1 parent 70417fc commit b3f59b1
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 10 deletions.
Expand Up @@ -177,21 +177,24 @@ private static class TableDataPageFetcher implements NextPageFetcher<FieldValueL
private final Map<BigQueryRpc.Option, ?> requestOptions;
private final BigQueryOptions serviceOptions;
private final TableId table;
private final Schema schema;

TableDataPageFetcher(
TableId table,
Schema schema,
BigQueryOptions serviceOptions,
String cursor,
Map<BigQueryRpc.Option, ?> optionMap) {
this.requestOptions =
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
this.serviceOptions = serviceOptions;
this.table = table;
this.schema = schema;
}

@Override
public Page<FieldValueList> getNextPage() {
return listTableData(table, serviceOptions, requestOptions).x();
return listTableData(table, schema, serviceOptions, requestOptions).x();
}
}

Expand Down Expand Up @@ -1014,12 +1017,13 @@ public TableResult listTableData(
@Override
public TableResult listTableData(TableId tableId, Schema schema, TableDataListOption... options) {
Tuple<? extends Page<FieldValueList>, Long> data =
listTableData(tableId, getOptions(), optionMap(options));
listTableData(tableId, schema, getOptions(), optionMap(options));
return new TableResult(schema, data.y(), data.x());
}

private static Tuple<? extends Page<FieldValueList>, Long> listTableData(
final TableId tableId,
final Schema schema,
final BigQueryOptions serviceOptions,
final Map<BigQueryRpc.Option, ?> optionsMap) {
try {
Expand Down Expand Up @@ -1048,23 +1052,26 @@ public TableDataList call() {
String cursor = result.getPageToken();
return Tuple.of(
new PageImpl<>(
new TableDataPageFetcher(tableId, serviceOptions, cursor, optionsMap),
new TableDataPageFetcher(tableId, schema, serviceOptions, cursor, optionsMap),
cursor,
transformTableData(result.getRows())),
transformTableData(result.getRows(), schema)),
result.getTotalRows());
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}

private static Iterable<FieldValueList> transformTableData(Iterable<TableRow> tableDataPb) {
private static Iterable<FieldValueList> transformTableData(
Iterable<TableRow> tableDataPb, final Schema schema) {
return ImmutableList.copyOf(
Iterables.transform(
tableDataPb != null ? tableDataPb : ImmutableList.<TableRow>of(),
new Function<TableRow, FieldValueList>() {
FieldList fields = schema != null ? schema.getFields() : null;

@Override
public FieldValueList apply(TableRow rowPb) {
return FieldValueList.fromPb(rowPb.getF(), null);
return FieldValueList.fromPb(rowPb.getF(), fields);
}
}));
}
Expand Down
Expand Up @@ -85,6 +85,7 @@
import com.google.cloud.bigquery.RoutineInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLDataType;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
Expand Down Expand Up @@ -1547,15 +1548,69 @@ public void testStructNamedQueryParameters() throws InterruptedException {
for (FieldValueList values : result.iterateAll()) {
for (FieldValue value : values) {
for (FieldValue record : value.getRecordValue()) {
assertEquals(FieldValue.Attribute.RECORD, record.getAttribute());
assertEquals(true, record.getRecordValue().get(0).getBooleanValue());
assertEquals(10, record.getRecordValue().get(1).getLongValue());
assertEquals("test-stringField", record.getRecordValue().get(2).getStringValue());
assertsFieldValue(record);
}
}
}
}

@Test
public void testStructQuery() throws InterruptedException {
String tableName = "test_record_table_" + UUID.randomUUID().toString().substring(0, 8);
TableId tableId = TableId.of(DATASET, tableName);
try {
// create a table
Field booleanField = Field.of("booleanField", StandardSQLTypeName.BOOL);
Field integerField = Field.of("integerField", StandardSQLTypeName.INT64);
Field stringField = Field.of("stringField", StandardSQLTypeName.STRING);
Field recordField =
Field.newBuilder(
"recordField",
StandardSQLTypeName.STRUCT,
booleanField,
integerField,
stringField)
.setMode(Field.Mode.NULLABLE)
.build();
Schema schema = Schema.of(recordField);
StandardTableDefinition tableDefinition = StandardTableDefinition.of(schema);
assertNotNull(bigquery.create(TableInfo.of(tableId, tableDefinition)));
// inserting data
Map<String, Object> content = new HashMap<>();
content.put("booleanField", true);
content.put("integerField", 10);
content.put("stringField", "test-stringField");
Map<String, Object> recordContent = new HashMap<>();
recordContent.put("recordField", content);
InsertAllResponse response =
bigquery.insertAll(InsertAllRequest.newBuilder(tableId).addRow(recordContent).build());
assertFalse(response.hasErrors());
// query into a table
String query = String.format("SELECT * FROM %s.%s", DATASET, tableName);
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(query)
.setDefaultDataset(DATASET)
.setUseLegacySql(false)
.build();
TableResult result = bigquery.query(config);
assertEquals(1, Iterables.size(result.getValues()));
for (FieldValueList values : result.iterateAll()) {
for (FieldValue record : values) {
assertsFieldValue(record);
}
}
} finally {
assertTrue(bigquery.delete(tableId));
}
}

private static void assertsFieldValue(FieldValue record) {
assertEquals(FieldValue.Attribute.RECORD, record.getAttribute());
assertEquals(true, record.getRecordValue().get("booleanField").getBooleanValue());
assertEquals(10, record.getRecordValue().get("integerField").getLongValue());
assertEquals("test-stringField", record.getRecordValue().get("stringField").getStringValue());
}

@Test
public void testNestedStructNamedQueryParameters() throws InterruptedException {
QueryParameterValue booleanValue = QueryParameterValue.bool(true);
Expand Down

0 comments on commit b3f59b1

Please sign in to comment.