From e846dfdefbba88320088667525e5fdd966c80c4b Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 21 Sep 2021 15:17:47 -0700 Subject: [PATCH] feat(bigquery): add session and connection support (#4754) Adds: * CreateSession to Query config * ConnectionProperties to specify connection keys/values in Query config * SessionInfo in JobStatistics to record session stats --- bigquery/integration_test.go | 75 ++++++++++++++++++++++++++++-------- bigquery/job.go | 27 +++++++++++++ bigquery/query.go | 51 ++++++++++++++++++++++++ bigquery/query_test.go | 21 ++++++++++ 4 files changed, 159 insertions(+), 15 deletions(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 2526392b37c..de9b2a58479 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) { FROM UNNEST(GENERATE_ARRAY(0,999)) `, qualified) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatalf("couldn't instantiate base table: %v", err) } @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) { sql := fmt.Sprintf(` CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, routine.FullyQualifiedName()) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { END`, routine.FullyQualifiedName()) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -2014,7 +2014,7 @@ func TestIntegration_DML(t *testing.T) { ('b', [1], STRUCT(FALSE)), ('c', [2], STRUCT(TRUE))`, table.DatasetID, table.TableID) - stats, err := runQueryJob(ctx, sql) + _, stats, err := runQuerySQL(ctx, sql) if err != nil { t.Fatal(err) } @@ -2036,12 +2036,18 @@ func TestIntegration_DML(t *testing.T) { } } +// runQuerySQL runs arbitrary SQL text. +func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) { + return runQueryJob(ctx, client.Query(sql)) +} + // runQueryJob is useful for running queries where no row data is returned (DDL/DML). -func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) { - var stats *QueryStatistics +func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) { + var jobStats *JobStatistics + var queryStats *QueryStatistics var err error err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { - job, err := client.Query(sql).Run(ctx) + job, err := q.Run(ctx) if err != nil { if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { return true, err // fail on 4xx @@ -2057,13 +2063,14 @@ func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) { } status := job.LastStatus() if status.Statistics != nil { + jobStats = status.Statistics if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok { - stats = qStats + queryStats = qStats } } return true, nil }) - return stats, err + return jobStats, queryStats, err } func TestIntegration_TimeTypes(t *testing.T) { @@ -2103,7 +2110,7 @@ func TestIntegration_TimeTypes(t *testing.T) { "VALUES ('%s', '%s', '%s', '%s')", table.DatasetID, table.TableID, d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05")) - if _, err := runQueryJob(ctx, query); err != nil { + if _, _, err := runQuerySQL(ctx, query); err != nil { t.Fatal(err) } wantRows = append(wantRows, wantRows[0]) @@ -2275,6 +2282,44 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}}) } +func TestIntegration_QuerySessionSupport(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo") + q.CreateSession = true + jobStats, _, err := runQueryJob(ctx, q) + if err != nil { + t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err) + } + if jobStats.SessionInfo == nil { + t.Fatalf("expected session info, was nil") + } + sessionID := jobStats.SessionInfo.SessionID + if len(sessionID) == 0 { + t.Errorf("expected non-empty sessionID") + } + + q2 := client.Query("SELECT * FROM temptable") + q2.ConnectionProperties = []*ConnectionProperty{ + {Key: "session_id", Value: sessionID}, + } + jobStats, _, err = runQueryJob(ctx, q2) + if err != nil { + t.Errorf("error running SELECT: %v", err) + } + if jobStats.SessionInfo == nil { + t.Fatalf("expected sessionInfo in second query, was nil") + } + got := jobStats.SessionInfo.SessionID + if got != sessionID { + t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID) + } + +} + func TestIntegration_QueryParameters(t *testing.T) { if client == nil { t.Skip("Integration tests skipped") @@ -2560,7 +2605,7 @@ func TestIntegration_ExtractExternal(t *testing.T) { sql := fmt.Sprintf(`INSERT %s.%s (name, num) VALUES ('a', 1), ('b', 2), ('c', 3)`, table.DatasetID, table.TableID) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } // Extract to a GCS object as CSV. @@ -2986,7 +3031,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) { FROM UNNEST(GENERATE_ARRAY(0,999)) `, qualified) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatalf("couldn't instantiate base table: %v", err) } @@ -3114,7 +3159,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) { UNION ALL SELECT 'b' AS f1, 3.8 AS label )`, modelRef) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } defer model.Delete(ctx) @@ -3297,7 +3342,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) { (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) )`, routine.FullyQualifiedName()) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -3357,7 +3402,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) { sql := fmt.Sprintf(` CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, routine.FullyQualifiedName()) - if _, err := runQueryJob(ctx, sql); err != nil { + if _, _, err := runQuerySQL(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) diff --git a/bigquery/job.go b/bigquery/job.go index 657d338e4b4..510cdda0543 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -375,6 +375,9 @@ type JobStatistics struct { // TransactionInfo indicates the transaction ID associated with the job, if any. TransactionInfo *TransactionInfo + + // SessionInfo contains information about the session if this job is part of one. + SessionInfo *SessionInfo } // Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics. @@ -884,6 +887,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) { ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics), ReservationUsage: bqToReservationUsage(s.ReservationUsage), TransactionInfo: bqToTransactionInfo(s.TransactionInfo), + SessionInfo: bqToSessionInfo(s.SessionInfo), } switch { case s.Extract != nil: @@ -1002,3 +1006,26 @@ func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo { TransactionID: in.TransactionId, } } + +// SessionInfo contains information about a session associated with a job. +type SessionInfo struct { + SessionID string +} + +func (s *SessionInfo) toBQ() *bq.SessionInfo { + if s == nil { + return nil + } + return &bq.SessionInfo{ + SessionId: s.SessionID, + } +} + +func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo { + if in == nil { + return nil + } + return &SessionInfo{ + SessionID: in.SessionId, + } +} diff --git a/bigquery/query.go b/bigquery/query.go index 3ce018e9427..b64ae062114 100644 --- a/bigquery/query.go +++ b/bigquery/query.go @@ -132,6 +132,12 @@ type QueryConfig struct { // Allows the schema of the destination table to be updated as a side effect of // the query job. SchemaUpdateOptions []string + + // CreateSession will trigger creation of a new session when true. + CreateSession bool + + // ConnectionProperties are optional key-values settings. + ConnectionProperties []*ConnectionProperty } func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { @@ -147,6 +153,7 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { Clustering: qc.Clustering.toBQ(), DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(), SchemaUpdateOptions: qc.SchemaUpdateOptions, + CreateSession: qc.CreateSession, } if len(qc.TableDefinitions) > 0 { qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration) @@ -195,6 +202,13 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { } qconf.QueryParameters = append(qconf.QueryParameters, qp) } + if len(qc.ConnectionProperties) > 0 { + bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties)) + for k, v := range qc.ConnectionProperties { + bqcp[k] = v.toBQ() + } + qconf.ConnectionProperties = bqcp + } return &bq.JobConfiguration{ Labels: qc.Labels, DryRun: qc.DryRun, @@ -219,6 +233,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { Clustering: bqToClustering(qq.Clustering), DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration), SchemaUpdateOptions: qq.SchemaUpdateOptions, + CreateSession: qq.CreateSession, } qc.UseStandardSQL = !qc.UseLegacySQL @@ -255,6 +270,13 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { } qc.Parameters = append(qc.Parameters, p) } + if len(qq.ConnectionProperties) > 0 { + props := make([]*ConnectionProperty, len(qq.ConnectionProperties)) + for k, v := range qq.ConnectionProperties { + props[k] = bqToConnectionProperty(v) + } + qc.ConnectionProperties = props + } return qc, nil } @@ -402,6 +424,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) { pfalse := false qRequest := &bq.QueryRequest{ Query: q.QueryConfig.Q, + CreateSession: q.CreateSession, Location: q.Location, UseLegacySql: &pfalse, MaximumBytesBilled: q.QueryConfig.MaxBytesBilled, @@ -427,3 +450,31 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) { } return qRequest, nil } + +// ConnectionProperty represents a single key and value pair that can be sent alongside a query request. +type ConnectionProperty struct { + // Name of the connection property to set. + Key string + // Value of the connection property. + Value string +} + +func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty { + if cp == nil { + return nil + } + return &bq.ConnectionProperty{ + Key: cp.Key, + Value: cp.Value, + } +} + +func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty { + if in == nil { + return nil + } + return &ConnectionProperty{ + Key: in.Key, + Value: in.Value, + } +} diff --git a/bigquery/query_test.go b/bigquery/query_test.go index f3d96602016..3b292b77b9e 100644 --- a/bigquery/query_test.go +++ b/bigquery/query_test.go @@ -323,6 +323,27 @@ func TestQuery(t *testing.T) { return j }(), }, + { + dst: c.Dataset("dataset-id").Table("table-id"), + src: &QueryConfig{ + Q: "query string", + DefaultProjectID: "def-project-id", + DefaultDatasetID: "def-dataset-id", + ConnectionProperties: []*ConnectionProperty{ + {Key: "key-a", Value: "value-a"}, + {Key: "key-b", Value: "value-b"}, + }, + }, + want: func() *bq.Job { + j := defaultQueryJob() + j.Configuration.Query.ForceSendFields = nil + j.Configuration.Query.ConnectionProperties = []*bq.ConnectionProperty{ + {Key: "key-a", Value: "value-a"}, + {Key: "key-b", Value: "value-b"}, + } + return j + }(), + }, } for i, tc := range testCases { query := c.Query("")