Skip to content

Commit

Permalink
add logic for DML and DDL queries
Browse files Browse the repository at this point in the history
enable requestId
add integration tests for fast path multipages query, DML, and DDL queries

fix requestId logic

update QueryRequestInfo and add mock test

add mock test cases for SQL, DML, and DDL
clean up code

fix IT
  • Loading branch information
stephaniewang526 committed Jul 9, 2020
1 parent f6e93cc commit e73936b
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 48 deletions.
Expand Up @@ -1187,8 +1187,9 @@ public TableResult query(QueryJobConfiguration configuration, JobOption... optio
private TableResult fastQuery(
final String projectId, final QueryRequest content, JobOption... options)
throws InterruptedException {
com.google.api.services.bigquery.model.QueryResponse results;
try {
com.google.api.services.bigquery.model.QueryResponse queryResponse =
results =
runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
Expand All @@ -1199,26 +1200,37 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());

// Return result if there is only 1 page, otherwise use jobId returned from backend to return
// full results
if (queryResponse.getPageToken() == null) {
return new TableResult(
Schema.fromPb(queryResponse.getSchema()),
queryResponse.getTotalRows().longValue(),
new PageImpl<>(
new TableDataPageFetcher(null, getOptions(), null, optionMap(options)),
null,
transformTableData(queryResponse.getRows())));
} else {
String jobId = queryResponse.getJobReference().getJobId();
Job job = getJob(JobId.of(jobId));
job.waitFor();
return job.getQueryResults();
}
} catch (RetryHelper.RetryHelperException e) {
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
// DDL queries
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
// DML queries
numRows = results.getNumDmlAffectedRows();
} else {
// SQL queries
numRows = results.getTotalRows().longValue();
}

// Return result if there is only 1 page, otherwise use jobId returned from backend to return
// full results
if (results.getPageToken() == null) {
return new TableResult(
Schema.fromPb(results.getSchema()),
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows())));
} else {
String jobId = results.getJobReference().getJobId();
Job job = getJob(JobId.of(jobId));
job.waitFor();
return job.getQueryResults();
}
}

@Override
Expand Down
Expand Up @@ -851,7 +851,7 @@ ToStringHelper toStringHelper() {
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
.add("userQueryCache", useQueryCache)
.add("useQueryCache", useQueryCache)
.add("userDefinedFunctions", userDefinedFunctions)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
Expand Down
Expand Up @@ -16,17 +16,40 @@

package com.google.cloud.bigquery;

import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryParameter;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.UUID;

final class QueryRequestInfo {

private static final String REQUEST_ID = UUID.randomUUID().toString();
private QueryJobConfiguration config;
private final List<ConnectionProperty> connectionProperties;
private final DatasetId defaultDataset;
private final Boolean dryRun;
private final Map<String, String> labels;
private final Long maximumBytesBilled;
private final String query;
private final List<QueryParameter> queryParameters;
private final Boolean useQueryCache;
private final Boolean useLegacySql;

QueryRequestInfo(QueryJobConfiguration config) {
this.config = config;
this.connectionProperties = config.getConnectionProperties();
this.defaultDataset = config.getDefaultDataset();
this.dryRun = config.dryRun();
this.labels = config.getLabels();
this.maximumBytesBilled = config.getMaximumBytesBilled();
this.query = config.getQuery();
this.queryParameters = config.toPb().getQuery().getQueryParameters();
this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
}

boolean isFastQuerySupported() {
Expand All @@ -45,37 +68,73 @@ boolean isFastQuerySupported() {
}

QueryRequest toPb() {
QueryRequest query = new QueryRequest();
if (config.getConnectionProperties() != null) {
query.setConnectionProperties(
Lists.transform(config.getConnectionProperties(), ConnectionProperty.TO_PB_FUNCTION));
QueryRequest request = new QueryRequest();
if (connectionProperties != null) {
request.setConnectionProperties(
Lists.transform(connectionProperties, ConnectionProperty.TO_PB_FUNCTION));
}
if (config.getDefaultDataset() != null) {
query.setDefaultDataset(config.getDefaultDataset().toPb());
if (defaultDataset != null) {
request.setDefaultDataset(defaultDataset.toPb());
}
if (config.dryRun() != null) {
query.setDryRun(config.dryRun());
if (dryRun != null) {
request.setDryRun(dryRun);
}
if (config.getLabels() != null) {
query.setLabels(config.getLabels());
if (labels != null) {
request.setLabels(labels);
}
if (config.getMaximumBytesBilled() != null) {
query.setMaximumBytesBilled(config.getMaximumBytesBilled());
if (maximumBytesBilled != null) {
request.setMaximumBytesBilled(maximumBytesBilled);
}
query.setQuery(config.getQuery());
// TODO: add back when supported
// query.setRequestId(UUID.randomUUID().toString());
JobConfiguration jobConfiguration = config.toPb();
JobConfigurationQuery configurationQuery = jobConfiguration.getQuery();
if (configurationQuery.getQueryParameters() != null) {
query.setQueryParameters(configurationQuery.getQueryParameters());
request.setQuery(query);
request.setRequestId(REQUEST_ID);
if (queryParameters != null) {
request.setQueryParameters(queryParameters);
}
if (config.useLegacySql() != null) {
query.setUseLegacySql(config.useLegacySql());
if (useLegacySql != null) {
request.setUseLegacySql(useLegacySql);
}
if (config.useQueryCache() != null) {
query.setUseQueryCache(config.useQueryCache());
if (useQueryCache != null) {
request.setUseQueryCache(useQueryCache);
}
return query;
return request;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("connectionProperties", connectionProperties)
.add("defaultDataset", defaultDataset)
.add("dryRun", dryRun)
.add("labels", labels)
.add("maximumBytesBilled", maximumBytesBilled)
.add("query", query)
.add("requestId", REQUEST_ID)
.add("queryParameters", queryParameters)
.add("useQueryCache", useQueryCache)
.add("useLegacySql", useLegacySql)
.toString();
}

@Override
public int hashCode() {
return Objects.hashCode(
connectionProperties,
defaultDataset,
dryRun,
labels,
maximumBytesBilled,
query,
queryParameters,
REQUEST_ID,
useQueryCache,
useLegacySql);
}

@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(QueryRequestInfo.class)
&& java.util.Objects.equals(toPb(), ((QueryRequestInfo) obj).toPb());
}
}
Expand Up @@ -36,6 +36,7 @@
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand Down Expand Up @@ -212,6 +213,16 @@ public class BigQueryImplTest {
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DMLQUERY =
QueryJobConfiguration.newBuilder("DML")
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DDLQUERY =
QueryJobConfiguration.newBuilder("DDL")
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final JobInfo JOB_INFO =
JobInfo.newBuilder(QUERY_JOB_CONFIGURATION_FOR_QUERY)
.setJobId(JobId.of(PROJECT, JOB))
Expand Down Expand Up @@ -1815,12 +1826,10 @@ public void testQueryRequestCompleted() throws InterruptedException {

@Test
public void testFastQueryRequestCompleted() throws InterruptedException {
JobId queryJob = JobId.of(PROJECT, JOB);
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setJobReference(queryJob.toPb())
.setKind("bigquery#queryResponse")
.setPageToken(null)
.setRows(ImmutableList.of(TABLE_ROW))
Expand Down Expand Up @@ -2109,6 +2118,110 @@ public void testQueryDryRun() throws Exception {
}
}

@Test
public void testFastQuerySQLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertEquals(1, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testFastQueryDMLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setNumDmlAffectedRows(1L)
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
assertEquals(1, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testFastQueryDDLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
assertEquals(0, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testCreateRoutine() {
RoutineInfo routineInfo = ROUTINE_INFO.setProjectId(OTHER_PROJECT);
Expand Down

0 comments on commit e73936b

Please sign in to comment.