Skip to content

Commit

Permalink
feat(bigquery): add session and connection support (#4754)
Browse files Browse the repository at this point in the history
Adds:
* CreateSession to Query config
* ConnectionProperties to specify connection keys/values in Query config
* SessionInfo in JobStatistics to record session stats
  • Loading branch information
shollyman committed Sep 21, 2021
1 parent 6861b30 commit e846dfd
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 15 deletions.
75 changes: 60 additions & 15 deletions bigquery/integration_test.go
Expand Up @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
END`,
routine.FullyQualifiedName())

if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -2014,7 +2014,7 @@ func TestIntegration_DML(t *testing.T) {
('b', [1], STRUCT<BOOL>(FALSE)),
('c', [2], STRUCT<BOOL>(TRUE))`,
table.DatasetID, table.TableID)
stats, err := runQueryJob(ctx, sql)
_, stats, err := runQuerySQL(ctx, sql)
if err != nil {
t.Fatal(err)
}
Expand All @@ -2036,12 +2036,18 @@ func TestIntegration_DML(t *testing.T) {
}
}

// runQuerySQL runs arbitrary SQL text.
func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) {
return runQueryJob(ctx, client.Query(sql))
}

// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
var stats *QueryStatistics
func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) {
var jobStats *JobStatistics
var queryStats *QueryStatistics
var err error
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
job, err := client.Query(sql).Run(ctx)
job, err := q.Run(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
return true, err // fail on 4xx
Expand All @@ -2057,13 +2063,14 @@ func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
}
status := job.LastStatus()
if status.Statistics != nil {
jobStats = status.Statistics
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
stats = qStats
queryStats = qStats
}
}
return true, nil
})
return stats, err
return jobStats, queryStats, err
}

func TestIntegration_TimeTypes(t *testing.T) {
Expand Down Expand Up @@ -2103,7 +2110,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
"VALUES ('%s', '%s', '%s', '%s')",
table.DatasetID, table.TableID,
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
if _, err := runQueryJob(ctx, query); err != nil {
if _, _, err := runQuerySQL(ctx, query); err != nil {
t.Fatal(err)
}
wantRows = append(wantRows, wantRows[0])
Expand Down Expand Up @@ -2275,6 +2282,44 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
}

func TestIntegration_QuerySessionSupport(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo")
q.CreateSession = true
jobStats, _, err := runQueryJob(ctx, q)
if err != nil {
t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err)
}
if jobStats.SessionInfo == nil {
t.Fatalf("expected session info, was nil")
}
sessionID := jobStats.SessionInfo.SessionID
if len(sessionID) == 0 {
t.Errorf("expected non-empty sessionID")
}

q2 := client.Query("SELECT * FROM temptable")
q2.ConnectionProperties = []*ConnectionProperty{
{Key: "session_id", Value: sessionID},
}
jobStats, _, err = runQueryJob(ctx, q2)
if err != nil {
t.Errorf("error running SELECT: %v", err)
}
if jobStats.SessionInfo == nil {
t.Fatalf("expected sessionInfo in second query, was nil")
}
got := jobStats.SessionInfo.SessionID
if got != sessionID {
t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID)
}

}

func TestIntegration_QueryParameters(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down Expand Up @@ -2560,7 +2605,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
VALUES ('a', 1), ('b', 2), ('c', 3)`,
table.DatasetID, table.TableID)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
// Extract to a GCS object as CSV.
Expand Down Expand Up @@ -2986,7 +3031,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -3114,7 +3159,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
UNION ALL
SELECT 'b' AS f1, 3.8 AS label
)`, modelRef)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer model.Delete(ctx)
Expand Down Expand Up @@ -3297,7 +3342,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
)`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -3357,7 +3402,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down
27 changes: 27 additions & 0 deletions bigquery/job.go
Expand Up @@ -375,6 +375,9 @@ type JobStatistics struct {

// TransactionInfo indicates the transaction ID associated with the job, if any.
TransactionInfo *TransactionInfo

// SessionInfo contains information about the session if this job is part of one.
SessionInfo *SessionInfo
}

// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
Expand Down Expand Up @@ -884,6 +887,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics),
ReservationUsage: bqToReservationUsage(s.ReservationUsage),
TransactionInfo: bqToTransactionInfo(s.TransactionInfo),
SessionInfo: bqToSessionInfo(s.SessionInfo),
}
switch {
case s.Extract != nil:
Expand Down Expand Up @@ -1002,3 +1006,26 @@ func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
TransactionID: in.TransactionId,
}
}

// SessionInfo contains information about a session associated with a job.
type SessionInfo struct {
SessionID string
}

func (s *SessionInfo) toBQ() *bq.SessionInfo {
if s == nil {
return nil
}
return &bq.SessionInfo{
SessionId: s.SessionID,
}
}

func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo {
if in == nil {
return nil
}
return &SessionInfo{
SessionID: in.SessionId,
}
}
51 changes: 51 additions & 0 deletions bigquery/query.go
Expand Up @@ -132,6 +132,12 @@ type QueryConfig struct {
// Allows the schema of the destination table to be updated as a side effect of
// the query job.
SchemaUpdateOptions []string

// CreateSession will trigger creation of a new session when true.
CreateSession bool

// ConnectionProperties are optional key-values settings.
ConnectionProperties []*ConnectionProperty
}

func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
Expand All @@ -147,6 +153,7 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
Clustering: qc.Clustering.toBQ(),
DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
SchemaUpdateOptions: qc.SchemaUpdateOptions,
CreateSession: qc.CreateSession,
}
if len(qc.TableDefinitions) > 0 {
qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
Expand Down Expand Up @@ -195,6 +202,13 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
}
qconf.QueryParameters = append(qconf.QueryParameters, qp)
}
if len(qc.ConnectionProperties) > 0 {
bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
for k, v := range qc.ConnectionProperties {
bqcp[k] = v.toBQ()
}
qconf.ConnectionProperties = bqcp
}
return &bq.JobConfiguration{
Labels: qc.Labels,
DryRun: qc.DryRun,
Expand All @@ -219,6 +233,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
Clustering: bqToClustering(qq.Clustering),
DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
SchemaUpdateOptions: qq.SchemaUpdateOptions,
CreateSession: qq.CreateSession,
}
qc.UseStandardSQL = !qc.UseLegacySQL

Expand Down Expand Up @@ -255,6 +270,13 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
}
qc.Parameters = append(qc.Parameters, p)
}
if len(qq.ConnectionProperties) > 0 {
props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
for k, v := range qq.ConnectionProperties {
props[k] = bqToConnectionProperty(v)
}
qc.ConnectionProperties = props
}
return qc, nil
}

Expand Down Expand Up @@ -402,6 +424,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
pfalse := false
qRequest := &bq.QueryRequest{
Query: q.QueryConfig.Q,
CreateSession: q.CreateSession,
Location: q.Location,
UseLegacySql: &pfalse,
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
Expand All @@ -427,3 +450,31 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
}
return qRequest, nil
}

// ConnectionProperty represents a single key and value pair that can be sent alongside a query request.
type ConnectionProperty struct {
// Name of the connection property to set.
Key string
// Value of the connection property.
Value string
}

func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
if cp == nil {
return nil
}
return &bq.ConnectionProperty{
Key: cp.Key,
Value: cp.Value,
}
}

func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
if in == nil {
return nil
}
return &ConnectionProperty{
Key: in.Key,
Value: in.Value,
}
}
21 changes: 21 additions & 0 deletions bigquery/query_test.go
Expand Up @@ -323,6 +323,27 @@ func TestQuery(t *testing.T) {
return j
}(),
},
{
dst: c.Dataset("dataset-id").Table("table-id"),
src: &QueryConfig{
Q: "query string",
DefaultProjectID: "def-project-id",
DefaultDatasetID: "def-dataset-id",
ConnectionProperties: []*ConnectionProperty{
{Key: "key-a", Value: "value-a"},
{Key: "key-b", Value: "value-b"},
},
},
want: func() *bq.Job {
j := defaultQueryJob()
j.Configuration.Query.ForceSendFields = nil
j.Configuration.Query.ConnectionProperties = []*bq.ConnectionProperty{
{Key: "key-a", Value: "value-a"},
{Key: "key-b", Value: "value-b"},
}
return j
}(),
},
}
for i, tc := range testCases {
query := c.Query("")
Expand Down

0 comments on commit e846dfd

Please sign in to comment.