From bd7d85c489cd260feeabbdc9ecbb8dcdc8d9ae77 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 13 Nov 2020 14:40:07 -0500 Subject: [PATCH] fix: make sure to fall back to old query path when query job is incomplete (#941) * fix: make sure to fall back to old query path when query job is incomplete (takes more than 10s) * nit * address comments * add comment * nit update --- .../google/cloud/bigquery/BigQueryImpl.java | 16 +++--- .../cloud/bigquery/BigQueryImplTest.java | 57 +++++++++++++++++++ .../cloud/bigquery/it/ITBigQueryTest.java | 30 ++++++++++ 3 files changed, 96 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index a19c65e63..0ed1ca68c 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -1268,13 +1268,8 @@ public com.google.api.services.bigquery.model.QueryResponse call() { long numRows; Schema schema; - if (results.getSchema() == null && results.getJobComplete()) { - JobId jobId = JobId.fromPb(results.getJobReference()); - Job job = getJob(jobId, options); - TableResult tableResult = job.getQueryResults(); - return tableResult; - } else { - schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema()); + if (results.getJobComplete() && results.getSchema() != null) { + schema = Schema.fromPb(results.getSchema()); if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) { numRows = 0L; } else if (results.getNumDmlAffectedRows() != null) { @@ -1282,6 +1277,13 @@ public com.google.api.services.bigquery.model.QueryResponse call() { } else { numRows = results.getTotalRows().longValue(); } + } else { + // Query is long running (> 10s) and hasn't completed yet, or query completed but didn't + // return the schema, fallback. Some operations don't return the schema and can be optimized + // here, but this is left as future work. + JobId jobId = JobId.fromPb(results.getJobReference()); + Job job = getJob(jobId, options); + return job.getQueryResults(); } if (results.getPageToken() != null) { diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index a9b2293fa..e1a48f620 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -1955,6 +1955,63 @@ public void testFastQueryMultiplePages() throws InterruptedException { verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb()); } + @Test + public void testFastQuerySlowDdl() throws InterruptedException { + // mock new fast query path response when running a query that takes more than 10s + JobId queryJob = JobId.of(PROJECT, JOB); + com.google.api.services.bigquery.model.QueryResponse queryResponsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setJobComplete(false) // false when query does not complete in 10s + .setJobReference(queryJob.toPb()) // backend sends back a jobReference + .setRows(ImmutableList.of(TABLE_ROW)) + .setSchema(TABLE_SCHEMA.toPb()); + + // mock job response from backend + com.google.api.services.bigquery.model.Job responseJob = + new com.google.api.services.bigquery.model.Job() + .setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb()) + .setJobReference(queryJob.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")); + + // mock old query path response when falling back + GetQueryResultsResponse queryResultsResponsePb = + new GetQueryResultsResponse() + .setJobReference(responseJob.getJobReference()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(true) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(TABLE_SCHEMA.toPb()); + + QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY); + QueryRequest requestPb = requestInfo.toPb(); + + when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(queryResponsePb); + responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb()); + when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob); + when(bigqueryRpcMock.getQueryResults( + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) + .thenReturn(queryResultsResponsePb); + when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS)) + .thenReturn(new TableDataList().setRows(ImmutableList.of(TABLE_ROW)).setTotalRows(1L)); + + bigquery = options.getService(); + TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY); + assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA); + assertThat(result.getTotalRows()).isEqualTo(1); + for (FieldValueList row : result.getValues()) { + assertThat(row.get(0).getBooleanValue()).isFalse(); + assertThat(row.get(1).getLongValue()).isEqualTo(1); + } + + verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb()); + verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS); + verify(bigqueryRpcMock) + .getQueryResults( + PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)); + verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS); + } + @Test public void testQueryRequestCompletedOptions() throws InterruptedException { JobId queryJob = JobId.of(PROJECT, JOB); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 29f19f3f4..ab25534a7 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -1813,6 +1813,36 @@ public void testFastDDLQuery() throws InterruptedException { } } + @Test + public void testFastQuerySlowDDL() throws InterruptedException { + String tableName = + "test_table_fast_query_ddl_slow_" + UUID.randomUUID().toString().substring(0, 8); + // This query take more than 10s to run and should fall back on the old query path + String slowDdlQuery = + String.format( + "CREATE OR REPLACE TABLE %s AS SELECT unique_key, agency, complaint_type, descriptor, street_name, city, landmark FROM `bigquery-public-data.new_york.311_service_requests`", + tableName); + QueryJobConfiguration ddlConfig = + QueryJobConfiguration.newBuilder(slowDdlQuery) + .setDefaultDataset(DatasetId.of(DATASET)) + .build(); + TableResult result = bigquery.query(ddlConfig); + assertEquals(0, result.getTotalRows()); + assertNotNull(result.getSchema()); + // Verify correctness of table content + String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName); + QueryJobConfiguration sqlConfig = QueryJobConfiguration.newBuilder(sqlQuery).build(); + TableResult resultAfterDDL = bigquery.query(sqlConfig); + for (FieldValueList row : resultAfterDDL.getValues()) { + FieldValue unique_key = row.get(0); + assertEquals(unique_key, row.get("unique_key")); + FieldValue agency = row.get(1); + assertEquals(agency, row.get("agency")); + FieldValue complaint_type = row.get(2); + assertEquals(complaint_type, row.get("complaint_type")); + } + } + @Test public void testFastQueryHTTPException() throws InterruptedException { String queryInvalid =