Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sql fast path impl #509

Merged
merged 25 commits into from Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f6e93cc
feat: sql fast path impl
stephaniewang526 Jun 26, 2020
d8ab960
add logic for DML and DDL queries
stephaniewang526 Jul 7, 2020
780b836
update ITs to check table content correctness, update fastquery logic
stephaniewang526 Jul 10, 2020
7126437
add test for bogus query
stephaniewang526 Jul 13, 2020
08d6c7e
add check for idempotent requestId
stephaniewang526 Jul 13, 2020
31a55ce
update QueryRequestInfo and error handling logic
stephaniewang526 Jul 15, 2020
bcecbb0
add mock test for query JobException
stephaniewang526 Jul 16, 2020
81937fc
update mock test
stephaniewang526 Jul 16, 2020
b62b569
fix unit tests, nit update
stephaniewang526 Jul 16, 2020
7225101
update exception handling from JobException to BigQueryException
stephaniewang526 Jul 17, 2020
79bc75f
update based on comments
stephaniewang526 Aug 7, 2020
0fcb5b6
nit
stephaniewang526 Aug 7, 2020
2495fbb
update based on comments
stephaniewang526 Aug 7, 2020
7c2ae39
add maxResult support
stephaniewang526 Aug 21, 2020
293f3e6
Merge branch 'master' into sql-client
stephaniewang526 Aug 27, 2020
187c86e
Merge remote-tracking branch 'upstream/master' into sql-client
Sep 3, 2020
2862ad8
update code
Sep 3, 2020
d8f1229
add test coverage
stephaniewang526 Sep 3, 2020
f7d73c4
Merge remote-tracking branch 'origin/sql-client' into sql-client
stephaniewang526 Sep 3, 2020
fd9dcae
lint fix
stephaniewang526 Sep 3, 2020
0cdf672
feat: add more code cov
Sep 11, 2020
27d1a63
set method back
stephaniewang526 Sep 11, 2020
23c9008
Merge branch 'master' into sql-client
stephaniewang526 Sep 11, 2020
48397ad
feat: code cove
Sep 15, 2020
e161cf9
add codecov
stephaniewang526 Sep 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigquery/clirr-ignored-differences.xml
Expand Up @@ -32,4 +32,9 @@
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.TestIamPermissionsResponse testIamPermissions(java.lang.String, java.util.List, java.util.Map)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.QueryResponse queryRpc(java.lang.String, com.google.api.services.bigquery.model.QueryRequest)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -39,37 +41,52 @@ public final class BigQueryException extends BaseHttpServiceException {
new Error(500, null), new Error(502, null), new Error(503, null), new Error(504, null));
private static final long serialVersionUID = -5006625989225438209L;

private final BigQueryError error;
private final List<BigQueryError> errors;

public BigQueryException(int code, String message) {
this(code, message, (Throwable) null);
}

public BigQueryException(int code, String message, Throwable cause) {
super(code, message, null, true, RETRYABLE_ERRORS, cause);
this.error = null;
this.errors = null;
}

public BigQueryException(int code, String message, BigQueryError error) {
super(code, message, error != null ? error.getReason() : null, true, RETRYABLE_ERRORS);
this.error = error;
this.errors = Arrays.asList(error);
}

public BigQueryException(List<BigQueryError> errors) {
super(0, null, null, false, RETRYABLE_ERRORS, null);
this.errors = errors;
}

public BigQueryException(IOException exception) {
super(exception, true, RETRYABLE_ERRORS);
BigQueryError error = null;
List<BigQueryError> errors = null;
if (getReason() != null) {
error = new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo());
errors =
Arrays.asList(
new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo()));
}
this.error = error;
this.errors = errors;
}

/**
* Returns the {@link BigQueryError} that caused this exception. Returns {@code null} if none
* exists.
*/
public BigQueryError getError() {
return error;
return errors == null || errors.isEmpty() || errors.size() == 0 ? null : errors.get(0);
}

/**
* Returns a list of {@link BigQueryError}s that caused this exception. Returns {@code null} if
* none exists.
*/
public List<BigQueryError> getErrors() {
return errors;
}

@Override
Expand All @@ -81,12 +98,12 @@ public boolean equals(Object obj) {
return false;
}
BigQueryException other = (BigQueryException) obj;
return super.equals(other) && Objects.equals(error, other.error);
return super.equals(other) && Objects.equals(errors, other.errors);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), error);
return Objects.hash(super.hashCode(), errors);
}

/**
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand All @@ -48,6 +49,7 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -198,6 +200,45 @@ public Page<FieldValueList> getNextPage() {
}
}

private class QueryPageFetcher implements NextPageFetcher<FieldValueList> {

private static final long serialVersionUID = -8501991114794410114L;
private final Map<BigQueryRpc.Option, ?> requestOptions;
private final BigQueryOptions serviceOptions;
private final Job job;
private final boolean jobStatus;
private final TableId table;
private final Schema schema;

QueryPageFetcher(
JobId jobId,
boolean jobStatus,
Schema schema,
BigQueryOptions serviceOptions,
String cursor,
Map<BigQueryRpc.Option, ?> optionMap) {
this.requestOptions =
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
this.serviceOptions = serviceOptions;
this.job = getJob(jobId);
this.jobStatus = jobStatus;
this.table = ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
this.schema = schema;
}

@Override
public Page<FieldValueList> getNextPage() {
try {
if (!jobStatus) {
job.waitFor();
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
return listTableData(table, schema, serviceOptions, requestOptions).x();
}
}

private final BigQueryRpc bigQueryRpc;

BigQueryImpl(BigQueryOptions options) {
Expand Down Expand Up @@ -1184,9 +1225,81 @@ public Boolean call() {
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
throws InterruptedException, JobException {
Job.checkNotDryRun(configuration, "query");

// If all parameters passed in configuration are supported by the query() method on the backend,
// put on fast path
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
if (requestInfo.isFastQuerySupported()) {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
String projectId = getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
return queryRpc(projectId, content, options);
}
// Otherwise, fall back to the existing create query job logic
return create(JobInfo.of(configuration), options).getQueryResults();
}

private TableResult queryRpc(
final String projectId, final QueryRequest content, JobOption... options) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
return bigQueryRpc.queryRpc(projectId, content);
}
},
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

if (results.getErrors() != null) {
List<BigQueryError> bigQueryErrors =
Lists.transform(results.getErrors(), BigQueryError.FROM_PB_FUNCTION);
// Throwing BigQueryException since there may be no JobId and we want to stay consistent
// with the case where there there is a HTTP error
throw new BigQueryException(bigQueryErrors);
}

Schema schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
numRows = results.getNumDmlAffectedRows();
} else {
numRows = results.getTotalRows().longValue();
}

if (results.getPageToken() != null) {
JobId jobId = JobId.fromPb(results.getJobReference());
boolean jobStatus = results.getJobComplete();
String cursor = results.getPageToken();
return new TableResult(
schema,
numRows,
new PageImpl<>(
// fetch next pages of results
new QueryPageFetcher(
jobId, jobStatus, schema, getOptions(), cursor, optionMap(options)),
cursor,
// cache first page of result
transformTableData(results.getRows(), schema)));
}
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
// only 1 page of result
return new TableResult(
schema,
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, schema, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows(), schema)));
}

@Override
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
throws InterruptedException, JobException {
Expand Down
Expand Up @@ -69,6 +69,8 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final Map<String, String> labels;
private final RangePartitioning rangePartitioning;
private final List<ConnectionProperty> connectionProperties;
// maxResults is only used for fast query path
private final Long maxResults;

/**
* Priority levels for a query. If not specified the priority is assumed to be {@link
Expand Down Expand Up @@ -118,6 +120,7 @@ public static final class Builder
private Map<String, String> labels;
private RangePartitioning rangePartitioning;
private List<ConnectionProperty> connectionProperties;
private Long maxResults;

private Builder() {
super(Type.QUERY);
Expand Down Expand Up @@ -150,6 +153,7 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.labels = jobConfiguration.labels;
this.rangePartitioning = jobConfiguration.rangePartitioning;
this.connectionProperties = jobConfiguration.connectionProperties;
this.maxResults = jobConfiguration.maxResults;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -603,6 +607,20 @@ public Builder setConnectionProperties(List<ConnectionProperty> connectionProper
return this;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data
* to return per page of results. Setting this flag to a small value such as 1000 and then
* paging through results might improve reliability when the query result set is large. In
* addition to this limit, responses are also limited to 10 MB. By default, there is no maximum
* row count, and only the byte limit applies.
*
* @param maxResults maxResults or {@code null} for none
*/
public Builder setMaxResults(Long maxResults) {
this.maxResults = maxResults;
return this;
}

public QueryJobConfiguration build() {
return new QueryJobConfiguration(this);
}
Expand Down Expand Up @@ -644,6 +662,7 @@ private QueryJobConfiguration(Builder builder) {
this.labels = builder.labels;
this.rangePartitioning = builder.rangePartitioning;
this.connectionProperties = builder.connectionProperties;
this.maxResults = builder.maxResults;
}

/**
Expand Down Expand Up @@ -833,6 +852,19 @@ public List<ConnectionProperty> getConnectionProperties() {
return connectionProperties;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data to
* return per page of results. Setting this flag to a small value such as 1000 and then paging
* through results might improve reliability when the query result set is large. In addition to
* this limit, responses are also limited to 10 MB. By default, there is no maximum row count, and
* only the byte limit applies.
*
* @return value or {@code null} for none
*/
public Long getMaxResults() {
return maxResults;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -851,7 +883,7 @@ ToStringHelper toStringHelper() {
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
.add("userQueryCache", useQueryCache)
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
.add("useQueryCache", useQueryCache)
.add("userDefinedFunctions", userDefinedFunctions)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
Expand Down