Skip to content

Commit

Permalink
feat: add session support (#1652)
Browse files Browse the repository at this point in the history
Adds:
- CreateSession to Query config
- SessionInfo in JobStatistics to record session stats
  • Loading branch information
stephaniewang526 committed Oct 15, 2021
1 parent fa3b477 commit acc6cb8
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 2 deletions.
Expand Up @@ -45,6 +45,7 @@ public abstract class JobStatistics implements Serializable {
private final ScriptStatistics scriptStatistics;
private final List<ReservationUsage> reservationUsage;
private final TransactionInfo transactionInfo;
private final SessionInfo sessionInfo;

/** A Google BigQuery Copy Job statistics. */
public static class CopyStatistics extends JobStatistics {
Expand Down Expand Up @@ -1251,6 +1252,76 @@ static TransactionInfo fromPb(
}
}

// SessionInfo contains information about the session if this job is part of one.
public static class SessionInfo {

// Id of the session
private final String sessionId;

public static class Builder {

private String sessionId;

private Builder() {};

Builder setSessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}

SessionInfo build() {
return new SessionInfo(this);
}
}

private SessionInfo(Builder builder) {
this.sessionId = builder.sessionId;
}

public String getSessionId() {
return sessionId;
}

static Builder newBuilder() {
return new Builder();
}

ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this).add("sessionId", sessionId);
}

@Override
public String toString() {
return toStringHelper().toString();
}

@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(SessionInfo.class)
&& Objects.equals(toPb(), ((SessionInfo) obj).toPb());
}

@Override
public int hashCode() {
return Objects.hash(sessionId);
}

com.google.api.services.bigquery.model.SessionInfo toPb() {
com.google.api.services.bigquery.model.SessionInfo sessionInfo =
new com.google.api.services.bigquery.model.SessionInfo();
sessionInfo.setSessionId(sessionId);
return sessionInfo;
}

static SessionInfo fromPb(com.google.api.services.bigquery.model.SessionInfo sessionInfo) {
SessionInfo.Builder builder = newBuilder();
builder.setSessionId(sessionInfo.getSessionId());
return builder.build();
}
}

abstract static class Builder<T extends JobStatistics, B extends Builder<T, B>> {

private Long creationTime;
Expand All @@ -1261,6 +1332,7 @@ abstract static class Builder<T extends JobStatistics, B extends Builder<T, B>>
private ScriptStatistics scriptStatistics;
private List<ReservationUsage> reservationUsage;
private TransactionInfo transactionInfo;
private SessionInfo sessionInfo;

protected Builder() {}

Expand All @@ -1280,6 +1352,9 @@ protected Builder(com.google.api.services.bigquery.model.JobStatistics statistic
if (statisticsPb.getTransactionInfo() != null) {
this.transactionInfo = TransactionInfo.fromPb(statisticsPb.getTransactionInfo());
}
if (statisticsPb.getSessionInfo() != null) {
this.sessionInfo = SessionInfo.fromPb(statisticsPb.getSessionInfo());
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1314,6 +1389,7 @@ protected JobStatistics(Builder builder) {
this.scriptStatistics = builder.scriptStatistics;
this.reservationUsage = builder.reservationUsage;
this.transactionInfo = builder.transactionInfo;
this.sessionInfo = builder.sessionInfo;
}

/** Returns the creation time of the job in milliseconds since epoch. */
Expand Down Expand Up @@ -1362,6 +1438,11 @@ public TransactionInfo getTransactionInfo() {
return transactionInfo;
}

/** Info of the session if this job is part of one. */
public SessionInfo getSessionInfo() {
return sessionInfo;
}

ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this)
.add("creationTime", creationTime)
Expand All @@ -1371,7 +1452,8 @@ ToStringHelper toStringHelper() {
.add("parentJobId", parentJobId)
.add("scriptStatistics", scriptStatistics)
.add("reservationUsage", reservationUsage)
.add("transactionInfo", transactionInfo);
.add("transactionInfo", transactionInfo)
.add("sessionInfo", sessionInfo);
}

@Override
Expand All @@ -1388,7 +1470,8 @@ final int baseHashCode() {
parentJobId,
scriptStatistics,
reservationUsage,
transactionInfo);
transactionInfo,
sessionInfo);
}

final boolean baseEquals(JobStatistics jobStatistics) {
Expand All @@ -1413,6 +1496,9 @@ com.google.api.services.bigquery.model.JobStatistics toPb() {
if (transactionInfo != null) {
statistics.setTransactionInfo(transactionInfo.toPb());
}
if (sessionInfo != null) {
statistics.setSessionInfo(sessionInfo.toPb());
}
return statistics;
}

Expand Down
Expand Up @@ -56,6 +56,7 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final DatasetId defaultDataset;
private final Priority priority;
private final Boolean allowLargeResults;
private final Boolean createSession;
private final Boolean useQueryCache;
private final Boolean flattenResults;
private final Boolean dryRun;
Expand Down Expand Up @@ -108,6 +109,7 @@ public static final class Builder
private DatasetId defaultDataset;
private Priority priority;
private Boolean allowLargeResults;
private Boolean createSession;
private Boolean useQueryCache;
private Boolean flattenResults;
private Boolean dryRun;
Expand Down Expand Up @@ -142,6 +144,7 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.defaultDataset = jobConfiguration.defaultDataset;
this.priority = jobConfiguration.priority;
this.allowLargeResults = jobConfiguration.allowLargeResults;
this.createSession = jobConfiguration.createSession;
this.useQueryCache = jobConfiguration.useQueryCache;
this.flattenResults = jobConfiguration.flattenResults;
this.dryRun = jobConfiguration.dryRun;
Expand Down Expand Up @@ -185,6 +188,7 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
}
}
allowLargeResults = queryConfigurationPb.getAllowLargeResults();
createSession = queryConfigurationPb.getCreateSession();
useQueryCache = queryConfigurationPb.getUseQueryCache();
flattenResults = queryConfigurationPb.getFlattenResults();
useLegacySql = queryConfigurationPb.getUseLegacySql();
Expand Down Expand Up @@ -460,6 +464,16 @@ public Builder setPriority(Priority priority) {
return this;
}

/**
* Sets whether to create a new session. If {@code true} a random session id will be generated
* by BigQuery. If false, runs query with an existing session_id passed in ConnectionProperty,
* otherwise runs query in non-session mode."
*/
public Builder setCreateSession(Boolean createSession) {
this.createSession = createSession;
return this;
}

/**
* Sets whether the job is enabled to create arbitrarily large results. If {@code true} the
* query is allowed to create large results at a slight cost in performance. If {@code true}
Expand Down Expand Up @@ -656,6 +670,7 @@ private QueryJobConfiguration(Builder builder) {
namedParameters = ImmutableMap.copyOf(builder.namedParameters);
this.parameterMode = builder.parameterMode;
this.allowLargeResults = builder.allowLargeResults;
this.createSession = builder.createSession;
this.createDisposition = builder.createDisposition;
this.defaultDataset = builder.defaultDataset;
this.destinationTable = builder.destinationTable;
Expand Down Expand Up @@ -693,6 +708,15 @@ public Boolean allowLargeResults() {
return allowLargeResults;
}

/**
* Returns whether to create a new session.
*
* @see <a href="https://cloud.google.com/bigquery/docs/sessions-create">Create Sessions</a>
*/
public Boolean createSession() {
return createSession;
}

/**
* Returns whether the job is allowed to create new tables.
*
Expand Down Expand Up @@ -897,6 +921,7 @@ ToStringHelper toStringHelper() {
.add("destinationEncryptionConfiguration", destinationEncryptionConfiguration)
.add("defaultDataset", defaultDataset)
.add("allowLargeResults", allowLargeResults)
.add("createSession", createSession)
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
Expand Down Expand Up @@ -928,6 +953,7 @@ public int hashCode() {
return Objects.hash(
baseHashCode(),
allowLargeResults,
createSession,
createDisposition,
destinationTable,
defaultDataset,
Expand Down Expand Up @@ -988,6 +1014,9 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
if (allowLargeResults != null) {
queryConfigurationPb.setAllowLargeResults(allowLargeResults);
}
if (createSession != null) {
queryConfigurationPb.setCreateSession(createSession);
}
if (createDisposition != null) {
queryConfigurationPb.setCreateDisposition(createDisposition.toString());
}
Expand Down
Expand Up @@ -37,6 +37,7 @@ final class QueryRequestInfo {
private final String query;
private final List<QueryParameter> queryParameters;
private final String requestId;
private final Boolean createSession;
private final Boolean useQueryCache;
private final Boolean useLegacySql;

Expand All @@ -51,6 +52,7 @@ final class QueryRequestInfo {
this.query = config.getQuery();
this.queryParameters = config.toPb().getQuery().getQueryParameters();
this.requestId = UUID.randomUUID().toString();
this.createSession = config.createSession();
this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
}
Expand Down Expand Up @@ -97,6 +99,9 @@ QueryRequest toPb() {
if (queryParameters != null) {
request.setQueryParameters(queryParameters);
}
if (createSession != null) {
request.setCreateSession(createSession);
}
if (useLegacySql != null) {
request.setUseLegacySql(useLegacySql);
}
Expand All @@ -118,6 +123,7 @@ public String toString() {
.add("query", query)
.add("requestId", requestId)
.add("queryParameters", queryParameters)
.add("createSession", createSession)
.add("useQueryCache", useQueryCache)
.add("useLegacySql", useLegacySql)
.toString();
Expand All @@ -135,6 +141,7 @@ public int hashCode() {
query,
queryParameters,
requestId,
createSession,
useQueryCache,
useLegacySql);
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigquery.JobStatistics.ReservationUsage;
import com.google.cloud.bigquery.JobStatistics.ScriptStatistics;
import com.google.cloud.bigquery.JobStatistics.ScriptStatistics.ScriptStackFrame;
import com.google.cloud.bigquery.JobStatistics.SessionInfo;
import com.google.cloud.bigquery.JobStatistics.TransactionInfo;
import com.google.cloud.bigquery.QueryStage.QueryStep;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class JobStatisticsTest {
private static final String NAME = "reservation-name";
private static final Long SLOTMS = 12545L;
private static final String TRANSACTION_ID = UUID.randomUUID().toString().substring(0, 8);
private static final String SESSION_ID = UUID.randomUUID().toString().substring(0, 8);
private static final CopyStatistics COPY_STATISTICS =
CopyStatistics.newBuilder()
.setCreationTimestamp(CREATION_TIME)
Expand Down Expand Up @@ -222,6 +224,9 @@ public class JobStatisticsTest {
private static final TransactionInfo TRANSACTION_INFO =
TransactionInfo.newbuilder().setTransactionId(TRANSACTION_ID).build();

private static final SessionInfo SESSION_INFO =
SessionInfo.newBuilder().setSessionId(SESSION_ID).build();

@Test
public void testBuilder() {
assertEquals(CREATION_TIME, EXTRACT_STATISTICS.getCreationTime());
Expand Down Expand Up @@ -293,6 +298,7 @@ public void testBuilder() {
assertEquals(NAME, RESERVATION_USAGE.getName());
assertEquals(SLOTMS, RESERVATION_USAGE.getSlotMs());
assertEquals(TRANSACTION_ID, TRANSACTION_INFO.getTransactionId());
assertEquals(SESSION_ID, SESSION_INFO.getSessionId());
}

@Test
Expand All @@ -319,6 +325,7 @@ public void testToPbAndFromPb() {
}
compareReservation(RESERVATION_USAGE, ReservationUsage.fromPb(RESERVATION_USAGE.toPb()));
compareTransactionInfo(TRANSACTION_INFO, TransactionInfo.fromPb(TRANSACTION_INFO.toPb()));
compareSessionInfo(SESSION_INFO, SessionInfo.fromPb(SESSION_INFO.toPb()));
}

@Test
Expand Down Expand Up @@ -441,4 +448,12 @@ private void compareTransactionInfo(TransactionInfo expected, TransactionInfo va
assertEquals(expected.toPb(), value.toPb());
assertEquals(expected.getTransactionId(), value.getTransactionId());
}

private void compareSessionInfo(SessionInfo expected, SessionInfo value) {
assertEquals(expected, value);
assertEquals(expected.hashCode(), value.hashCode());
assertEquals(expected.toString(), value.toString());
assertEquals(expected.toPb(), value.toPb());
assertEquals(expected.getSessionId(), value.getSessionId());
}
}
Expand Up @@ -78,6 +78,7 @@ public class QueryJobConfigurationTest {
private static final Priority PRIORITY = Priority.BATCH;
private static final boolean ALLOW_LARGE_RESULTS = true;
private static final boolean USE_QUERY_CACHE = false;
private static final boolean CREATE_SESSION = true;
private static final boolean FLATTEN_RESULTS = true;
private static final boolean USE_LEGACY_SQL = true;
private static final Integer MAX_BILLING_TIER = 123;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class QueryJobConfigurationTest {
.setDestinationTable(TABLE_ID)
.setWriteDisposition(WRITE_DISPOSITION)
.setPriority(PRIORITY)
.setCreateSession(CREATE_SESSION)
.setFlattenResults(FLATTEN_RESULTS)
.setUserDefinedFunctions(USER_DEFINED_FUNCTIONS)
.setDryRun(true)
Expand Down Expand Up @@ -238,6 +240,7 @@ private void compareQueryJobConfiguration(
assertEquals(expected.getCreateDisposition(), value.getCreateDisposition());
assertEquals(expected.getDefaultDataset(), value.getDefaultDataset());
assertEquals(expected.getDestinationTable(), value.getDestinationTable());
assertEquals(expected.createSession(), value.createSession());
assertEquals(expected.flattenResults(), value.flattenResults());
assertEquals(expected.getPriority(), value.getPriority());
assertEquals(expected.getQuery(), value.getQuery());
Expand Down
Expand Up @@ -74,6 +74,7 @@ public class QueryRequestInfoTest {
private static final WriteDisposition WRITE_DISPOSITION = WriteDisposition.WRITE_APPEND;
private static final Priority PRIORITY = Priority.BATCH;
private static final boolean ALLOW_LARGE_RESULTS = true;
private static final boolean CREATE_SESSION = true;
private static final boolean USE_QUERY_CACHE = false;
private static final boolean FLATTEN_RESULTS = true;
private static final boolean USE_LEGACY_SQL = true;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class QueryRequestInfoTest {
.setLabels(LABELS)
.setConnectionProperties(CONNECTION_PROPERTIES)
.setPositionalParameters(POSITIONAL_PARAMETER)
.setCreateSession(CREATE_SESSION)
.setMaxResults(100L)
.build();
QueryRequestInfo REQUEST_INFO_SUPPORTED = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_SUPPORTED);
Expand Down

0 comments on commit acc6cb8

Please sign in to comment.