Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add session and connection support #4754

Merged
merged 2 commits into from Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 60 additions & 15 deletions bigquery/integration_test.go
Expand Up @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
END`,
routine.FullyQualifiedName())

if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -2014,7 +2014,7 @@ func TestIntegration_DML(t *testing.T) {
('b', [1], STRUCT<BOOL>(FALSE)),
('c', [2], STRUCT<BOOL>(TRUE))`,
table.DatasetID, table.TableID)
stats, err := runQueryJob(ctx, sql)
_, stats, err := runQuerySQL(ctx, sql)
if err != nil {
t.Fatal(err)
}
Expand All @@ -2036,12 +2036,18 @@ func TestIntegration_DML(t *testing.T) {
}
}

// runQuerySQL runs arbitrary SQL text.
func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) {
return runQueryJob(ctx, client.Query(sql))
}

// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
var stats *QueryStatistics
func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) {
var jobStats *JobStatistics
var queryStats *QueryStatistics
var err error
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
job, err := client.Query(sql).Run(ctx)
job, err := q.Run(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
return true, err // fail on 4xx
Expand All @@ -2057,13 +2063,14 @@ func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
}
status := job.LastStatus()
if status.Statistics != nil {
jobStats = status.Statistics
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
stats = qStats
queryStats = qStats
}
}
return true, nil
})
return stats, err
return jobStats, queryStats, err
}

func TestIntegration_TimeTypes(t *testing.T) {
Expand Down Expand Up @@ -2103,7 +2110,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
"VALUES ('%s', '%s', '%s', '%s')",
table.DatasetID, table.TableID,
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
if _, err := runQueryJob(ctx, query); err != nil {
if _, _, err := runQuerySQL(ctx, query); err != nil {
t.Fatal(err)
}
wantRows = append(wantRows, wantRows[0])
Expand Down Expand Up @@ -2252,6 +2259,44 @@ func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
}

func TestIntegration_QuerySessionSupport(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo")
q.CreateSession = true
jobStats, _, err := runQueryJob(ctx, q)
if err != nil {
t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err)
}
if jobStats.SessionInfo == nil {
t.Fatalf("expected session info, was nil")
}
sessionID := jobStats.SessionInfo.SessionID
if len(sessionID) == 0 {
t.Errorf("expected non-empty sessionID")
}

q2 := client.Query("SELECT * FROM temptable")
q2.ConnectionProperties = []*ConnectionProperty{
{Key: "session_id", Value: sessionID},
}
jobStats, _, err = runQueryJob(ctx, q2)
if err != nil {
t.Errorf("error running SELECT: %v", err)
}
if jobStats.SessionInfo == nil {
t.Fatalf("expected sessionInfo in second query, was nil")
}
got := jobStats.SessionInfo.SessionID
if got != sessionID {
t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID)
}

}

func TestIntegration_QueryParameters(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down Expand Up @@ -2537,7 +2582,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
VALUES ('a', 1), ('b', 2), ('c', 3)`,
table.DatasetID, table.TableID)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
// Extract to a GCS object as CSV.
Expand Down Expand Up @@ -2963,7 +3008,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -3091,7 +3136,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
UNION ALL
SELECT 'b' AS f1, 3.8 AS label
)`, modelRef)
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer model.Delete(ctx)
Expand Down Expand Up @@ -3274,7 +3319,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
)`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -3334,7 +3379,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if _, err := runQueryJob(ctx, sql); err != nil {
if _, _, err := runQuerySQL(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down
27 changes: 27 additions & 0 deletions bigquery/job.go
Expand Up @@ -375,6 +375,9 @@ type JobStatistics struct {

// TransactionInfo indicates the transaction ID associated with the job, if any.
TransactionInfo *TransactionInfo

// SessionInfo contains information about the session if this job is part of one.
SessionInfo *SessionInfo
}

// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
Expand Down Expand Up @@ -884,6 +887,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics),
ReservationUsage: bqToReservationUsage(s.ReservationUsage),
TransactionInfo: bqToTransactionInfo(s.TransactionInfo),
SessionInfo: bqToSessionInfo(s.SessionInfo),
}
switch {
case s.Extract != nil:
Expand Down Expand Up @@ -1002,3 +1006,26 @@ func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
TransactionID: in.TransactionId,
}
}

// SessionInfo contains information about a session associated with a job.
type SessionInfo struct {
SessionID string
}

func (s *SessionInfo) toBQ() *bq.SessionInfo {
if s == nil {
return nil
}
return &bq.SessionInfo{
SessionId: s.SessionID,
}
}

func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo {
if in == nil {
return nil
}
return &SessionInfo{
SessionID: in.SessionId,
}
}
51 changes: 51 additions & 0 deletions bigquery/query.go
Expand Up @@ -132,6 +132,12 @@ type QueryConfig struct {
// Allows the schema of the destination table to be updated as a side effect of
// the query job.
SchemaUpdateOptions []string

// CreateSession will trigger creation of a new session when true.
CreateSession bool

// ConnectionProperties are optional key-values settings.
ConnectionProperties []*ConnectionProperty
codyoss marked this conversation as resolved.
Show resolved Hide resolved
}

func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
Expand All @@ -147,6 +153,7 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
Clustering: qc.Clustering.toBQ(),
DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
SchemaUpdateOptions: qc.SchemaUpdateOptions,
CreateSession: qc.CreateSession,
}
if len(qc.TableDefinitions) > 0 {
qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
Expand Down Expand Up @@ -195,6 +202,13 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
}
qconf.QueryParameters = append(qconf.QueryParameters, qp)
}
if len(qc.ConnectionProperties) > 0 {
bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
for k, v := range qc.ConnectionProperties {
bqcp[k] = v.toBQ()
}
qconf.ConnectionProperties = bqcp
}
return &bq.JobConfiguration{
Labels: qc.Labels,
DryRun: qc.DryRun,
Expand All @@ -219,6 +233,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
Clustering: bqToClustering(qq.Clustering),
DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
SchemaUpdateOptions: qq.SchemaUpdateOptions,
CreateSession: qq.CreateSession,
}
qc.UseStandardSQL = !qc.UseLegacySQL

Expand Down Expand Up @@ -255,6 +270,13 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
}
qc.Parameters = append(qc.Parameters, p)
}
if len(qq.ConnectionProperties) > 0 {
props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
for k, v := range qq.ConnectionProperties {
props[k] = bqToConnectionProperty(v)
}
qc.ConnectionProperties = props
}
return qc, nil
}

Expand Down Expand Up @@ -402,6 +424,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
pfalse := false
qRequest := &bq.QueryRequest{
Query: q.QueryConfig.Q,
CreateSession: q.CreateSession,
Location: q.Location,
UseLegacySql: &pfalse,
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
Expand All @@ -427,3 +450,31 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
}
return qRequest, nil
}

// ConnectionProperty represents a single key and value pair that can be sent alongside a query request.
type ConnectionProperty struct {
// Name of the connection property to set.
Key string
// Value of the connection property.
Value string
}

func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
if cp == nil {
return nil
}
return &bq.ConnectionProperty{
Key: cp.Key,
Value: cp.Value,
}
}

func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
if in == nil {
return nil
}
return &ConnectionProperty{
Key: in.Key,
Value: in.Value,
}
}
21 changes: 21 additions & 0 deletions bigquery/query_test.go
Expand Up @@ -323,6 +323,27 @@ func TestQuery(t *testing.T) {
return j
}(),
},
{
dst: c.Dataset("dataset-id").Table("table-id"),
src: &QueryConfig{
Q: "query string",
DefaultProjectID: "def-project-id",
DefaultDatasetID: "def-dataset-id",
ConnectionProperties: []*ConnectionProperty{
{Key: "key-a", Value: "value-a"},
{Key: "key-b", Value: "value-b"},
},
},
want: func() *bq.Job {
j := defaultQueryJob()
j.Configuration.Query.ForceSendFields = nil
j.Configuration.Query.ConnectionProperties = []*bq.ConnectionProperty{
{Key: "key-a", Value: "value-a"},
{Key: "key-b", Value: "value-b"},
}
return j
}(),
},
}
for i, tc := range testCases {
query := c.Query("")
Expand Down