Skip to content

Commit

Permalink
update ITs to check table content correctness, update fastquery logic
Browse files Browse the repository at this point in the history
nit
  • Loading branch information
stephaniewang526 committed Jul 10, 2020
1 parent d8ab960 commit 67e30a9
Show file tree
Hide file tree
Showing 3 changed files with 313,528 additions and 30 deletions.
Expand Up @@ -51,9 +51,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;

final class BigQueryImpl extends BaseService<BigQueryOptions> implements BigQuery {

private static final Logger LOGGER = Logger.getLogger(BigQueryImpl.class.getName());

private static class DatasetPageFetcher implements NextPageFetcher<Dataset> {

private static final long serialVersionUID = -3057564042439021278L;
Expand Down Expand Up @@ -1203,33 +1207,48 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
// DDL queries
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
// DML queries
numRows = results.getNumDmlAffectedRows();
} else {
// SQL queries
numRows = results.getTotalRows().longValue();
}

// Return result if there is only 1 page, otherwise use jobId returned from backend to return
// full results
if (results.getPageToken() == null) {
// If fast query completed and has only one page in results
if (results.getJobComplete() && results.getPageToken() == null) {
// Check for errors
ImmutableList.Builder<BigQueryError> errors = ImmutableList.builder();
if (results.getErrors() != null) {
for (ErrorProto error : results.getErrors()) {
errors.add(BigQueryError.fromPb(error));
}
// Since there is no setError method in TableResult, we log the errors
LOGGER.log(Level.WARNING, errors.toString());
}

// If there is no error, we construct TableResult
TableSchema schemaPb = results.getSchema();

Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
// DDL queries
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
// DML queries
numRows = results.getNumDmlAffectedRows();
} else {
// SQL queries
numRows = results.getTotalRows().longValue();
}

return new TableResult(
Schema.fromPb(results.getSchema()),
schemaPb == null ? null : Schema.fromPb(schemaPb),
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows())));
} else {
// Use jobId returned from backend to return full TableResult
String jobId = results.getJobReference().getJobId();
Job job = getJob(JobId.of(jobId));
job.waitFor();
return job.getQueryResults();
TableResult result = job.getQueryResults();
return result;
}
}

Expand Down
Expand Up @@ -81,7 +81,6 @@
import com.google.cloud.bigquery.RoutineInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLDataType;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
Expand All @@ -108,6 +107,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -223,6 +223,37 @@ public class ITBigQueryTest {
FLOAT_FIELD_SCHEMA,
GEOGRAPHY_FIELD_SCHEMA,
NUMERIC_FIELD_SCHEMA);

private static final Field DDL_TIMESTAMP_FIELD_SCHEMA =
Field.newBuilder("TimestampField", LegacySQLTypeName.TIMESTAMP)
.setDescription("TimestampDescription")
.build();
private static final Field DDL_STRING_FIELD_SCHEMA =
Field.newBuilder("StringField", LegacySQLTypeName.STRING)
.setDescription("StringDescription")
.build();
private static final Field DDL_BOOLEAN_FIELD_SCHEMA =
Field.newBuilder("BooleanField", LegacySQLTypeName.BOOLEAN)
.setDescription("BooleanDescription")
.build();
private static final Schema DDL_TABLE_SCHEMA =
Schema.of(DDL_TIMESTAMP_FIELD_SCHEMA, DDL_STRING_FIELD_SCHEMA, DDL_BOOLEAN_FIELD_SCHEMA);
private static final Schema LARGE_TABLE_SCHEMA =
Schema.of(
Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Field.Mode.NULLABLE).build(),
Field.newBuilder("county", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(),
Field.newBuilder("state_name", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build(),
Field.newBuilder("county_fips_code", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build(),
Field.newBuilder("confirmed_cases", LegacySQLTypeName.INTEGER)
.setMode(Field.Mode.NULLABLE)
.build(),
Field.newBuilder("deaths", LegacySQLTypeName.INTEGER)
.setMode(Field.Mode.NULLABLE)
.build());
private static final Schema SIMPLE_SCHEMA = Schema.of(STRING_FIELD_SCHEMA);
private static final Schema POLICY_SCHEMA =
Schema.of(STRING_FIELD_SCHEMA, STRING_FIELD_SCHEMA_WITH_POLICY, INTEGER_FIELD_SCHEMA);
Expand Down Expand Up @@ -253,12 +284,16 @@ public class ITBigQueryTest {
private static final RangePartitioning RANGE_PARTITIONING =
RangePartitioning.newBuilder().setField("IntegerField").setRange(RANGE).build();
private static final String LOAD_FILE = "load.csv";
private static final String LOAD_FILE_LARGE = "load_large.csv";
private static final String JSON_LOAD_FILE = "load.json";
private static final String JSON_LOAD_FILE_SIMPLE = "load_simple.json";
private static final String EXTRACT_FILE = "extract.csv";
private static final String EXTRACT_MODEL_FILE = "extract_model.csv";
private static final String BUCKET = RemoteStorageHelper.generateBucketName();
private static final TableId TABLE_ID = TableId.of(DATASET, "testing_table");
private static final TableId TABLE_ID_DDL = TableId.of(DATASET, "ddl_testing_table");
private static final TableId TABLE_ID_FASTQUERY = TableId.of(DATASET, "fastquery_testing_table");
private static final TableId TABLE_ID_LARGE = TableId.of(DATASET, "large_data_testing_table");
private static final String CSV_CONTENT = "StringValue1\nStringValue2\n";
private static final String JSON_CONTENT =
"{"
Expand Down Expand Up @@ -305,6 +340,17 @@ public class ITBigQueryTest {
+ " \"GeographyField\": \"POINT(-122.35022 47.649154)\","
+ " \"NumericField\": \"123456.789012345\""
+ "}";
private static final String JSON_CONTENT_SIMPLE =
"{"
+ " \"TimestampField\": \"2014-08-19 07:41:35.220 -05:00\","
+ " \"StringField\": \"stringValue\","
+ " \"BooleanField\": \"false\""
+ "}\n"
+ "{"
+ " \"TimestampField\": \"2014-08-19 07:41:35.220 -05:00\","
+ " \"StringField\": \"stringValue\","
+ " \"BooleanField\": \"false\""
+ "}";
private static final String KEY = "time_zone";
private static final String VALUE = "US/Eastern";
private static final ConnectionProperty CONNECTION_PROPERTY =
Expand All @@ -321,7 +367,7 @@ public class ITBigQueryTest {
@Rule public Timeout globalTimeout = Timeout.seconds(300);

@BeforeClass
public static void beforeClass() throws InterruptedException, TimeoutException {
public static void beforeClass() throws InterruptedException, IOException {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
RemoteStorageHelper storageHelper = RemoteStorageHelper.create();
Map<String, String> labels = ImmutableMap.of("test-job-name", "test-load-job");
Expand All @@ -334,6 +380,14 @@ public static void beforeClass() throws InterruptedException, TimeoutException {
storage.create(
BlobInfo.newBuilder(BUCKET, JSON_LOAD_FILE).setContentType("application/json").build(),
JSON_CONTENT.getBytes(StandardCharsets.UTF_8));
storage.create(
BlobInfo.newBuilder(BUCKET, JSON_LOAD_FILE_SIMPLE)
.setContentType("application/json")
.build(),
JSON_CONTENT_SIMPLE.getBytes(StandardCharsets.UTF_8));
storage.createFrom(
BlobInfo.newBuilder(BUCKET, LOAD_FILE_LARGE).setContentType("text/plain").build(),
FileSystems.getDefault().getPath("src/test/resources", "QueryTestData.csv"));
DatasetInfo info =
DatasetInfo.newBuilder(DATASET).setDescription(DESCRIPTION).setLabels(LABELS).build();
bigquery.create(info);
Expand Down Expand Up @@ -366,6 +420,28 @@ public static void beforeClass() throws InterruptedException, TimeoutException {
Job jobFastQuery = bigquery.create(JobInfo.of(configurationFastQuery));
jobFastQuery = jobFastQuery.waitFor();
assertNull(jobFastQuery.getStatus().getError());

LoadJobConfiguration configurationDDL =
LoadJobConfiguration.newBuilder(
TABLE_ID_DDL, "gs://" + BUCKET + "/" + JSON_LOAD_FILE_SIMPLE, FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(DDL_TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobDDL = bigquery.create(JobInfo.of(configurationDDL));
jobDDL = jobDDL.waitFor();
assertNull(jobDDL.getStatus().getError());

LoadJobConfiguration configurationLargeTable =
LoadJobConfiguration.newBuilder(
TABLE_ID_LARGE, "gs://" + BUCKET + "/" + LOAD_FILE_LARGE, FormatOptions.csv())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(LARGE_TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobLargeTable = bigquery.create(JobInfo.of(configurationLargeTable));
jobLargeTable = jobLargeTable.waitFor();
assertNull(jobLargeTable.getStatus().getError());
}

@AfterClass
Expand Down Expand Up @@ -1394,7 +1470,7 @@ public void testQuery() throws InterruptedException {
}

@Test
public void testFastQuery() throws InterruptedException {
public void testFastSQLQuery() throws InterruptedException {
String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.getTable();
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build();
Expand Down Expand Up @@ -1423,12 +1499,15 @@ public void testFastQuery() throws InterruptedException {
}

@Test
public void testFastQueryMultiPages() throws InterruptedException {
public void testFastSQLQueryMultiPage() throws InterruptedException {
String query =
"SELECT date, state_name, confirmed_cases FROM `bigquery-public-data.covid19_nyt.us_counties`";
"SELECT date, county, state_name, county_fips_code, confirmed_cases, deaths FROM "
+ TABLE_ID_LARGE.getTable();
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(config);
assertEquals(LARGE_TABLE_SCHEMA, result.getSchema());
assertEquals(313348, result.getTotalRows());
assertNotNull(result.getNextPage());
assertNotNull(result.getNextPageToken());
assertTrue(result.hasNextPage());
Expand All @@ -1439,25 +1518,77 @@ public void testFastDMLQuery() throws InterruptedException {
String tableName = TABLE_ID_FASTQUERY.getTable();
String dmlQuery =
String.format("UPDATE %s.%s SET StringField = 'hello' WHERE TRUE", DATASET, tableName);
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(dmlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(config);
QueryJobConfiguration dmlConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
TableResult result = bigquery.query(dmlConfig);
assertEquals(TABLE_SCHEMA, result.getSchema());
assertEquals(2, result.getTotalRows());
// Verify correctness of table content
String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName);
QueryJobConfiguration sqlConfig = QueryJobConfiguration.newBuilder(sqlQuery).build();
TableResult resultAfterDML = bigquery.query(sqlConfig);
for (FieldValueList row : resultAfterDML.getValues()) {
FieldValue timestampCell = row.get(0);
assertEquals(timestampCell, row.get("TimestampField"));
FieldValue stringCell = row.get(1);
assertEquals(stringCell, row.get("StringField"));
FieldValue booleanCell = row.get(3);
assertEquals(booleanCell, row.get("BooleanField"));
assertEquals(FieldValue.Attribute.PRIMITIVE, timestampCell.getAttribute());
assertEquals(FieldValue.Attribute.PRIMITIVE, stringCell.getAttribute());
assertEquals(FieldValue.Attribute.PRIMITIVE, booleanCell.getAttribute());
assertEquals(1408452095220000L, timestampCell.getTimestampValue());
assertEquals("hello", stringCell.getStringValue());
assertEquals(false, booleanCell.getBooleanValue());
}
}

@Test
public void testFastDDLQuery() throws InterruptedException {
String tableName = "test_table_fast_query_ddl";
String tableNameFastQuery = TABLE_ID_DDL.getTable();
String ddlQuery =
String.format("CREATE OR REPLACE TABLE %s.%s ( StringField STRING )", DATASET, tableName);
QueryJobConfiguration config =
String.format(
"CREATE OR REPLACE TABLE %s ("
+ "TimestampField TIMESTAMP OPTIONS(description='TimestampDescription'), "
+ "StringField STRING OPTIONS(description='StringDescription'), "
+ "BooleanField BOOLEAN OPTIONS(description='BooleanDescription') "
+ ") AS SELECT * FROM %s",
tableName, tableNameFastQuery);
QueryJobConfiguration ddlConfig =
QueryJobConfiguration.newBuilder(ddlQuery).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(config);
assertEquals(
Schema.of(Field.newBuilder("StringField", StandardSQLTypeName.STRING).build()),
result.getSchema());
TableResult result = bigquery.query(ddlConfig);
assertEquals(DDL_TABLE_SCHEMA, result.getSchema());
assertEquals(0, result.getTotalRows());
// Verify correctness of table content
String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName);
QueryJobConfiguration sqlConfig = QueryJobConfiguration.newBuilder(sqlQuery).build();
TableResult resultAfterDDL = bigquery.query(sqlConfig);
for (FieldValueList row : resultAfterDDL.getValues()) {
FieldValue timestampCell = row.get(0);
assertEquals(timestampCell, row.get("TimestampField"));
FieldValue stringCell = row.get(1);
assertEquals(stringCell, row.get("StringField"));
FieldValue booleanCell = row.get(2);
assertEquals(booleanCell, row.get("BooleanField"));
assertEquals(FieldValue.Attribute.PRIMITIVE, timestampCell.getAttribute());
assertEquals(FieldValue.Attribute.PRIMITIVE, stringCell.getAttribute());
assertEquals(FieldValue.Attribute.PRIMITIVE, booleanCell.getAttribute());
assertEquals(1408452095220000L, timestampCell.getTimestampValue());
assertEquals("stringValue", stringCell.getStringValue());
assertEquals(false, booleanCell.getBooleanValue());
}
}

@Test
public void testBadSQLQuery() throws InterruptedException {
String query = "SELECT &*$$%Q#^^&W%&ETY FROM " + TABLE_ID.getTable();
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result = bigquery.query(config);
assertEquals(QUERY_RESULT_SCHEMA, result.getSchema());
assertNull(result.getNextPage());
assertNull(result.getNextPageToken());
assertFalse(result.hasNextPage());
}

@Test
Expand Down

0 comments on commit 67e30a9

Please sign in to comment.