Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set requestId for fast query path in QueryRequestInfo instead of QueryJobConfiguration #987

Merged
merged 6 commits into from Dec 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigquery/pom.xml
Expand Up @@ -103,6 +103,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -35,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

/**
* Google BigQuery Query Job configuration. A Query Job runs a query against BigQuery data. Query
Expand Down Expand Up @@ -72,7 +71,6 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final List<ConnectionProperty> connectionProperties;
// maxResults is only used for fast query path
private final Long maxResults;
private final String requestId;

/**
* Priority levels for a query. If not specified the priority is assumed to be {@link
Expand Down Expand Up @@ -123,7 +121,6 @@ public static final class Builder
private RangePartitioning rangePartitioning;
private List<ConnectionProperty> connectionProperties;
private Long maxResults;
private String requestId;

private Builder() {
super(Type.QUERY);
Expand Down Expand Up @@ -157,7 +154,6 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.rangePartitioning = jobConfiguration.rangePartitioning;
this.connectionProperties = jobConfiguration.connectionProperties;
this.maxResults = jobConfiguration.maxResults;
this.requestId = jobConfiguration.requestId;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -625,11 +621,6 @@ public Builder setMaxResults(Long maxResults) {
return this;
}

Builder setRequestId(String requestId) {
this.requestId = requestId;
return this;
}

public QueryJobConfiguration build() {
return new QueryJobConfiguration(this);
}
Expand Down Expand Up @@ -672,7 +663,6 @@ private QueryJobConfiguration(Builder builder) {
this.rangePartitioning = builder.rangePartitioning;
this.connectionProperties = builder.connectionProperties;
this.maxResults = builder.maxResults;
this.requestId = builder.requestId;
}

/**
Expand Down Expand Up @@ -875,10 +865,6 @@ public Long getMaxResults() {
return maxResults;
}

String getRequestId() {
return requestId;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand Down Expand Up @@ -1057,7 +1043,7 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
/** Creates a builder for a BigQuery Query Job given the query to be run. */
public static Builder newBuilder(String query) {
checkArgument(!isNullOrEmpty(query), "Provided query is null or empty");
return new Builder().setQuery(query).setRequestId(UUID.randomUUID().toString());
return new Builder().setQuery(query);
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.UUID;

final class QueryRequestInfo {

Expand All @@ -35,9 +36,9 @@ final class QueryRequestInfo {
private final Long maxResults;
private final String query;
private final List<QueryParameter> queryParameters;
private final String requestId;
private final Boolean useQueryCache;
private final Boolean useLegacySql;
private final String requestId;

QueryRequestInfo(QueryJobConfiguration config) {
this.config = config;
Expand All @@ -49,9 +50,9 @@ final class QueryRequestInfo {
this.maxResults = config.getMaxResults();
this.query = config.getQuery();
this.queryParameters = config.toPb().getQuery().getQueryParameters();
this.requestId = UUID.randomUUID().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where a QueryRequestInfo may get reused? Such usage may indicate we set this at time of sending the request (outside of the retry wrapper) rather than time of instantiating the request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- we decided to design it to set this requestId at the time of instantiating the request object which means when a user runs the same QueryJobConfiguration twice, they will be sending two requestIds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. My worry was mostly around cases where someone may retain a reference and use this in unexpected ways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted -- thank you!

this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
this.requestId = config.getRequestId();
}

boolean isFastQuerySupported() {
Expand Down
Expand Up @@ -1885,9 +1885,8 @@ public void testFastQueryRequestCompleted() throws InterruptedException {
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);

when(bigqueryRpcMock.queryRpc(PROJECT, requestInfo.toPb())).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to validate that what is captured is actually what we expect to see (except the requestId).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: 7123195

.thenReturn(queryResponsePb);

bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
Expand All @@ -1900,7 +1899,15 @@ public void testFastQueryRequestCompleted() throws InterruptedException {
assertThat(row.get(0).getBooleanValue()).isFalse();
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand Down Expand Up @@ -1937,22 +1944,30 @@ public void testFastQueryMultiplePages() throws InterruptedException {
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
when(bigqueryRpcMock.queryRpc(PROJECT, requestInfo.toPb())).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);

bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertTrue(result.hasNextPage());
assertNotNull(result.getNextPageToken());
assertNotNull(result.getNextPage());

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand Down Expand Up @@ -1983,10 +1998,8 @@ public void testFastQuerySlowDdl() throws InterruptedException {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.getQueryResults(
Expand All @@ -2004,7 +2017,14 @@ public void testFastQuerySlowDdl() throws InterruptedException {
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}

verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.getQueryResults(
Expand Down Expand Up @@ -2287,9 +2307,6 @@ public void testFastQuerySQLShouldRetry() throws Exception {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
Expand All @@ -2310,13 +2327,13 @@ public void testFastQuerySQLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand All @@ -2331,9 +2348,6 @@ public void testFastQueryDMLShouldRetry() throws Exception {
.setNumDmlAffectedRows(1L)
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
Expand All @@ -2354,13 +2368,13 @@ public void testFastQueryDMLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand All @@ -2374,9 +2388,6 @@ public void testFastQueryDDLShouldRetry() throws Exception {
.setTotalBytesProcessed(42L)
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
Expand All @@ -2397,13 +2408,13 @@ public void testFastQueryDDLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand All @@ -2424,10 +2435,7 @@ public void testFastQueryBigQueryException() throws InterruptedException {
.setPageToken(null)
.setErrors(errorProtoList);

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(responsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture())).thenReturn(responsePb);

bigquery = options.getService();
try {
Expand All @@ -2436,7 +2444,14 @@ public void testFastQueryBigQueryException() throws InterruptedException {
} catch (BigQueryException ex) {
assertEquals(Lists.transform(errorProtoList, BigQueryError.FROM_PB_FUNCTION), ex.getErrors());
}
verify(bigqueryRpcMock).queryRpc(PROJECT, requestPb);

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.bigquery;

import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.google.api.services.bigquery.model.QueryRequest;
Expand All @@ -31,7 +32,6 @@

public class QueryRequestInfoTest {

private static final String TEST_PROJECT_ID = "test-project-id";
private static final String QUERY = "BigQuery SQL";
private static final DatasetId DATASET_ID = DatasetId.of("dataset");
private static final TableId TABLE_ID = TableId.of("dataset", "table");
Expand Down Expand Up @@ -166,8 +166,7 @@ public void equalTo() {
}

private void compareQueryRequestInfo(QueryRequestInfo expected, QueryRequestInfo actual) {
assertEquals(expected, actual);
assertEquals(expected.hashCode(), actual.hashCode());
assertEquals(expected.toString(), actual.toString());
// requestId are expected to be different
assertThat(actual).isEqualToIgnoringGivenFields(expected, "requestId");
}
}
Expand Up @@ -1776,6 +1776,15 @@ public void testFastQueryMultipleRuns() throws InterruptedException {
assertNull(result.getNextPageToken());
assertFalse(result.hasNextPage());

// running the same QueryJobConfiguration with the same query again
TableResult result1Duplicate = bigquery.query(config);
assertEquals(QUERY_RESULT_SCHEMA, result1Duplicate.getSchema());
assertEquals(2, result.getTotalRows());
assertNull(result1Duplicate.getNextPage());
assertNull(result1Duplicate.getNextPageToken());
assertFalse(result1Duplicate.hasNextPage());

// running a new QueryJobConfiguration with the same query
QueryJobConfiguration config2 =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result2 = bigquery.query(config2);
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -121,6 +121,13 @@
<version>1.113.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<!-- use 2.9.1 for Java 7 projects -->
<version>2.9.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down