Navigation Menu

Skip to content

Commit

Permalink
feat: sql fast path impl (#509)
Browse files Browse the repository at this point in the history
* feat: sql fast path impl

add QueryJobConfig to QueryRequest logic

high level mode

reset private methods

refactor: modified code

update logic
add test

refactor: update code and test case

add integration tests

code format

add clir ignore and remove pom file

feat: add more assert

nit update

* add logic for DML and DDL queries
enable requestId
add integration tests for fast path multipages query, DML, and DDL queries

fix requestId logic

update QueryRequestInfo and add mock test

add mock test cases for SQL, DML, and DDL
clean up code

fix IT

add schema test

* update ITs to check table content correctness, update fastquery logic

nit

nit

* add test for bogus query

* add check for idempotent requestId

* update QueryRequestInfo and error handling logic

* add mock test for query JobException

* update mock test

* fix unit tests, nit update

* update exception handling from JobException to BigQueryException

* update based on comments

* nit

* update based on comments

* add maxResult support
optimization changes

* update code

* add test coverage

to address:
google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java#L69-L71
Added lines #L69 - L71 were not covered by tests

* lint fix

* feat: add more code cov

* set method back

* feat: code cove

* add codecov

Co-authored-by: Praful Makani <praful@qlogic.io>
  • Loading branch information
stephaniewang526 and Praful Makani committed Sep 22, 2020
1 parent f2ecf15 commit 64a7d65
Show file tree
Hide file tree
Showing 12 changed files with 314,367 additions and 12 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquery/clirr-ignored-differences.xml
Expand Up @@ -32,4 +32,9 @@
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.TestIamPermissionsResponse testIamPermissions(java.lang.String, java.util.List, java.util.Map)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.QueryResponse queryRpc(java.lang.String, com.google.api.services.bigquery.model.QueryRequest)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Expand Up @@ -84,7 +84,7 @@ public String getLocation() {
return location;
}

String getDebugInfo() {
public String getDebugInfo() {
return debugInfo;
}

Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -39,37 +41,52 @@ public final class BigQueryException extends BaseHttpServiceException {
new Error(500, null), new Error(502, null), new Error(503, null), new Error(504, null));
private static final long serialVersionUID = -5006625989225438209L;

private final BigQueryError error;
private final List<BigQueryError> errors;

public BigQueryException(int code, String message) {
this(code, message, (Throwable) null);
}

public BigQueryException(int code, String message, Throwable cause) {
super(code, message, null, true, RETRYABLE_ERRORS, cause);
this.error = null;
this.errors = null;
}

public BigQueryException(int code, String message, BigQueryError error) {
super(code, message, error != null ? error.getReason() : null, true, RETRYABLE_ERRORS);
this.error = error;
this.errors = Arrays.asList(error);
}

public BigQueryException(List<BigQueryError> errors) {
super(0, null, null, false, RETRYABLE_ERRORS, null);
this.errors = errors;
}

public BigQueryException(IOException exception) {
super(exception, true, RETRYABLE_ERRORS);
BigQueryError error = null;
List<BigQueryError> errors = null;
if (getReason() != null) {
error = new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo());
errors =
Arrays.asList(
new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo()));
}
this.error = error;
this.errors = errors;
}

/**
* Returns the {@link BigQueryError} that caused this exception. Returns {@code null} if none
* exists.
*/
public BigQueryError getError() {
return error;
return errors == null || errors.isEmpty() || errors.size() == 0 ? null : errors.get(0);
}

/**
* Returns a list of {@link BigQueryError}s that caused this exception. Returns {@code null} if
* none exists.
*/
public List<BigQueryError> getErrors() {
return errors;
}

@Override
Expand All @@ -81,12 +98,12 @@ public boolean equals(Object obj) {
return false;
}
BigQueryException other = (BigQueryException) obj;
return super.equals(other) && Objects.equals(error, other.error);
return super.equals(other) && Objects.equals(errors, other.errors);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), error);
return Objects.hash(super.hashCode(), errors);
}

/**
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand All @@ -48,6 +49,7 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -198,6 +200,43 @@ public Page<FieldValueList> getNextPage() {
}
}

private class QueryPageFetcher extends Thread implements NextPageFetcher<FieldValueList> {

private static final long serialVersionUID = -8501991114794410114L;
private final Map<BigQueryRpc.Option, ?> requestOptions;
private final BigQueryOptions serviceOptions;
private Job job;
private final TableId table;
private final Schema schema;

QueryPageFetcher(
JobId jobId,
Schema schema,
BigQueryOptions serviceOptions,
String cursor,
Map<BigQueryRpc.Option, ?> optionMap) {
this.requestOptions =
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
this.serviceOptions = serviceOptions;
this.job = getJob(jobId);
this.table = ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
this.schema = schema;
}

@Override
public Page<FieldValueList> getNextPage() {
while (!JobStatus.State.DONE.equals(job.getStatus().getState())) {
try {
sleep(5000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex.getMessage());
}
job = job.reload();
}
return listTableData(table, schema, serviceOptions, requestOptions).x();
}
}

private final BigQueryRpc bigQueryRpc;

BigQueryImpl(BigQueryOptions options) {
Expand Down Expand Up @@ -1184,9 +1223,79 @@ public Boolean call() {
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
throws InterruptedException, JobException {
Job.checkNotDryRun(configuration, "query");

// If all parameters passed in configuration are supported by the query() method on the backend,
// put on fast path
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
if (requestInfo.isFastQuerySupported()) {
String projectId = getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
return queryRpc(projectId, content, options);
}
// Otherwise, fall back to the existing create query job logic
return create(JobInfo.of(configuration), options).getQueryResults();
}

private TableResult queryRpc(
final String projectId, final QueryRequest content, JobOption... options) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
return bigQueryRpc.queryRpc(projectId, content);
}
},
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

if (results.getErrors() != null) {
List<BigQueryError> bigQueryErrors =
Lists.transform(results.getErrors(), BigQueryError.FROM_PB_FUNCTION);
// Throwing BigQueryException since there may be no JobId and we want to stay consistent
// with the case where there there is a HTTP error
throw new BigQueryException(bigQueryErrors);
}

Schema schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
numRows = results.getNumDmlAffectedRows();
} else {
numRows = results.getTotalRows().longValue();
}

if (results.getPageToken() != null) {
JobId jobId = JobId.fromPb(results.getJobReference());
String cursor = results.getPageToken();
return new TableResult(
schema,
numRows,
new PageImpl<>(
// fetch next pages of results
new QueryPageFetcher(jobId, schema, getOptions(), cursor, optionMap(options)),
cursor,
// cache first page of result
transformTableData(results.getRows(), schema)));
}
// only 1 page of result
return new TableResult(
schema,
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, schema, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows(), schema)));
}

@Override
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
throws InterruptedException, JobException {
Expand Down
Expand Up @@ -69,6 +69,8 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final Map<String, String> labels;
private final RangePartitioning rangePartitioning;
private final List<ConnectionProperty> connectionProperties;
// maxResults is only used for fast query path
private final Long maxResults;

/**
* Priority levels for a query. If not specified the priority is assumed to be {@link
Expand Down Expand Up @@ -118,6 +120,7 @@ public static final class Builder
private Map<String, String> labels;
private RangePartitioning rangePartitioning;
private List<ConnectionProperty> connectionProperties;
private Long maxResults;

private Builder() {
super(Type.QUERY);
Expand Down Expand Up @@ -150,6 +153,7 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.labels = jobConfiguration.labels;
this.rangePartitioning = jobConfiguration.rangePartitioning;
this.connectionProperties = jobConfiguration.connectionProperties;
this.maxResults = jobConfiguration.maxResults;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -603,6 +607,20 @@ public Builder setConnectionProperties(List<ConnectionProperty> connectionProper
return this;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data
* to return per page of results. Setting this flag to a small value such as 1000 and then
* paging through results might improve reliability when the query result set is large. In
* addition to this limit, responses are also limited to 10 MB. By default, there is no maximum
* row count, and only the byte limit applies.
*
* @param maxResults maxResults or {@code null} for none
*/
public Builder setMaxResults(Long maxResults) {
this.maxResults = maxResults;
return this;
}

public QueryJobConfiguration build() {
return new QueryJobConfiguration(this);
}
Expand Down Expand Up @@ -644,6 +662,7 @@ private QueryJobConfiguration(Builder builder) {
this.labels = builder.labels;
this.rangePartitioning = builder.rangePartitioning;
this.connectionProperties = builder.connectionProperties;
this.maxResults = builder.maxResults;
}

/**
Expand Down Expand Up @@ -833,6 +852,19 @@ public List<ConnectionProperty> getConnectionProperties() {
return connectionProperties;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data to
* return per page of results. Setting this flag to a small value such as 1000 and then paging
* through results might improve reliability when the query result set is large. In addition to
* this limit, responses are also limited to 10 MB. By default, there is no maximum row count, and
* only the byte limit applies.
*
* @return value or {@code null} for none
*/
public Long getMaxResults() {
return maxResults;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -851,7 +883,7 @@ ToStringHelper toStringHelper() {
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
.add("userQueryCache", useQueryCache)
.add("useQueryCache", useQueryCache)
.add("userDefinedFunctions", userDefinedFunctions)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
Expand Down

0 comments on commit 64a7d65

Please sign in to comment.