From 99d57282f6668de91390ad29a888a89209689f39 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 9 Jul 2021 13:56:11 -0700 Subject: [PATCH] feat(bigquery): add more dml statistics to query statistics (#4405) This adds the new DMLStats submessage, which indicates the number of rows inserted/updated/deleted by a DML statement. It also makes a change to test utility functions to pass back query statistics. --- bigquery/integration_test.go | 44 ++++++++++++++++++++++++++---------- bigquery/job.go | 26 +++++++++++++++++++++ 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 9bf2f91030e..af969e387b3 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) { FROM UNNEST(GENERATE_ARRAY(0,999)) `, qualified) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatalf("couldn't instantiate base table: %v", err) } @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) { sql := fmt.Sprintf(` CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, routine.FullyQualifiedName()) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { END`, routine.FullyQualifiedName()) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -2013,7 +2013,8 @@ func TestIntegration_DML(t *testing.T) { ('b', [1], STRUCT(FALSE)), ('c', [2], STRUCT(TRUE))`, table.DatasetID, table.TableID) - if err := runQueryJob(ctx, sql); err != nil { + stats, err := runQueryJob(ctx, sql) + if err != nil { t.Fatal(err) } wantRows := [][]Value{ @@ -2022,11 +2023,23 @@ func TestIntegration_DML(t *testing.T) { {"c", []Value{int64(2)}, []Value{true}}, } checkRead(t, "DML", table.Read(ctx), wantRows) + if stats == nil { + t.Fatalf("no query stats") + } + if stats.DMLStats == nil { + t.Fatalf("no dml stats") + } + wantRowCount := int64(len(wantRows)) + if stats.DMLStats.InsertedRowCount != wantRowCount { + t.Fatalf("dml stats mismatch. got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount) + } } // runQueryJob is useful for running queries where no row data is returned (DDL/DML). -func runQueryJob(ctx context.Context, sql string) error { - return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { +func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) { + var stats *QueryStatistics + var err error + err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) { job, err := client.Query(sql).Run(ctx) if err != nil { if e, ok := err.(*googleapi.Error); ok && e.Code < 500 { @@ -2041,8 +2054,15 @@ func runQueryJob(ctx context.Context, sql string) error { } return false, err } + status := job.LastStatus() + if status.Statistics != nil { + if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok { + stats = qStats + } + } return true, nil }) + return stats, err } func TestIntegration_TimeTypes(t *testing.T) { @@ -2082,7 +2102,7 @@ func TestIntegration_TimeTypes(t *testing.T) { "VALUES ('%s', '%s', '%s', '%s')", table.DatasetID, table.TableID, d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05")) - if err := runQueryJob(ctx, query); err != nil { + if _, err := runQueryJob(ctx, query); err != nil { t.Fatal(err) } wantRows = append(wantRows, wantRows[0]) @@ -2506,7 +2526,7 @@ func TestIntegration_ExtractExternal(t *testing.T) { sql := fmt.Sprintf(`INSERT %s.%s (name, num) VALUES ('a', 1), ('b', 2), ('c', 3)`, table.DatasetID, table.TableID) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } // Extract to a GCS object as CSV. @@ -2932,7 +2952,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) { FROM UNNEST(GENERATE_ARRAY(0,999)) `, qualified) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatalf("couldn't instantiate base table: %v", err) } @@ -3060,7 +3080,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) { UNION ALL SELECT 'b' AS f1, 3.8 AS label )`, modelRef) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } defer model.Delete(ctx) @@ -3243,7 +3263,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) { (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem) )`, routine.FullyQualifiedName()) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) @@ -3303,7 +3323,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) { sql := fmt.Sprintf(` CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`, routine.FullyQualifiedName()) - if err := runQueryJob(ctx, sql); err != nil { + if _, err := runQueryJob(ctx, sql); err != nil { t.Fatal(err) } defer routine.Delete(ctx) diff --git a/bigquery/job.go b/bigquery/job.go index 6bdcbcc3c32..0499da7d129 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -436,6 +436,10 @@ type QueryStatistics struct { // statements INSERT, UPDATE or DELETE. NumDMLAffectedRows int64 + // DMLStats provides statistics about the row mutations performed by + // DML statements. + DMLStats *DMLStatistics + // Describes a timeline of job execution. Timeline []*QueryTimelineSample @@ -665,6 +669,27 @@ func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame { } } +// DMLStatistics contains counts of row mutations triggered by a DML query statement. +type DMLStatistics struct { + // Rows added by the statement. + InsertedRowCount int64 + // Rows removed by the statement. + DeletedRowCount int64 + // Rows changed by the statement. + UpdatedRowCount int64 +} + +func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics { + if q == nil { + return nil + } + return &DMLStatistics{ + InsertedRowCount: q.InsertedRowCount, + DeletedRowCount: q.DeletedRowCount, + UpdatedRowCount: q.UpdatedRowCount, + } +} + func (*ExtractStatistics) implementsStatistics() {} func (*LoadStatistics) implementsStatistics() {} func (*QueryStatistics) implementsStatistics() {} @@ -888,6 +913,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) { TotalBytesProcessed: s.Query.TotalBytesProcessed, TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy, NumDMLAffectedRows: s.Query.NumDmlAffectedRows, + DMLStats: bqToDMLStatistics(s.Query.DmlStats), QueryPlan: queryPlanFromProto(s.Query.QueryPlan), Schema: bqToSchema(s.Query.Schema), SlotMillis: s.Query.TotalSlotMs,