Skip to content

Commit

Permalink
feat(bigquery): add more dml statistics to query statistics (#4405)
Browse files Browse the repository at this point in the history
This adds the new DMLStats submessage, which indicates the number of
rows inserted/updated/deleted by a DML statement.

It also makes a change to test utility functions to pass back query
statistics.
  • Loading branch information
shollyman committed Jul 9, 2021
1 parent 9dc78e0 commit 99d5728
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 12 deletions.
44 changes: 32 additions & 12 deletions bigquery/integration_test.go
Expand Up @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
END`,
routine.FullyQualifiedName())

if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -2013,7 +2013,8 @@ func TestIntegration_DML(t *testing.T) {
('b', [1], STRUCT<BOOL>(FALSE)),
('c', [2], STRUCT<BOOL>(TRUE))`,
table.DatasetID, table.TableID)
if err := runQueryJob(ctx, sql); err != nil {
stats, err := runQueryJob(ctx, sql)
if err != nil {
t.Fatal(err)
}
wantRows := [][]Value{
Expand All @@ -2022,11 +2023,23 @@ func TestIntegration_DML(t *testing.T) {
{"c", []Value{int64(2)}, []Value{true}},
}
checkRead(t, "DML", table.Read(ctx), wantRows)
if stats == nil {
t.Fatalf("no query stats")
}
if stats.DMLStats == nil {
t.Fatalf("no dml stats")
}
wantRowCount := int64(len(wantRows))
if stats.DMLStats.InsertedRowCount != wantRowCount {
t.Fatalf("dml stats mismatch. got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount)
}
}

// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
func runQueryJob(ctx context.Context, sql string) error {
return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
var stats *QueryStatistics
var err error
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
job, err := client.Query(sql).Run(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
Expand All @@ -2041,8 +2054,15 @@ func runQueryJob(ctx context.Context, sql string) error {
}
return false, err
}
status := job.LastStatus()
if status.Statistics != nil {
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
stats = qStats
}
}
return true, nil
})
return stats, err
}

func TestIntegration_TimeTypes(t *testing.T) {
Expand Down Expand Up @@ -2082,7 +2102,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
"VALUES ('%s', '%s', '%s', '%s')",
table.DatasetID, table.TableID,
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
if err := runQueryJob(ctx, query); err != nil {
if _, err := runQueryJob(ctx, query); err != nil {
t.Fatal(err)
}
wantRows = append(wantRows, wantRows[0])
Expand Down Expand Up @@ -2506,7 +2526,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
VALUES ('a', 1), ('b', 2), ('c', 3)`,
table.DatasetID, table.TableID)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
// Extract to a GCS object as CSV.
Expand Down Expand Up @@ -2932,7 +2952,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -3060,7 +3080,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
UNION ALL
SELECT 'b' AS f1, 3.8 AS label
)`, modelRef)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer model.Delete(ctx)
Expand Down Expand Up @@ -3243,7 +3263,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
)`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -3303,7 +3323,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down
26 changes: 26 additions & 0 deletions bigquery/job.go
Expand Up @@ -436,6 +436,10 @@ type QueryStatistics struct {
// statements INSERT, UPDATE or DELETE.
NumDMLAffectedRows int64

// DMLStats provides statistics about the row mutations performed by
// DML statements.
DMLStats *DMLStatistics

// Describes a timeline of job execution.
Timeline []*QueryTimelineSample

Expand Down Expand Up @@ -665,6 +669,27 @@ func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
}
}

// DMLStatistics contains counts of row mutations triggered by a DML query statement.
type DMLStatistics struct {
// Rows added by the statement.
InsertedRowCount int64
// Rows removed by the statement.
DeletedRowCount int64
// Rows changed by the statement.
UpdatedRowCount int64
}

func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
if q == nil {
return nil
}
return &DMLStatistics{
InsertedRowCount: q.InsertedRowCount,
DeletedRowCount: q.DeletedRowCount,
UpdatedRowCount: q.UpdatedRowCount,
}
}

func (*ExtractStatistics) implementsStatistics() {}
func (*LoadStatistics) implementsStatistics() {}
func (*QueryStatistics) implementsStatistics() {}
Expand Down Expand Up @@ -888,6 +913,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
TotalBytesProcessed: s.Query.TotalBytesProcessed,
TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy,
NumDMLAffectedRows: s.Query.NumDmlAffectedRows,
DMLStats: bqToDMLStatistics(s.Query.DmlStats),
QueryPlan: queryPlanFromProto(s.Query.QueryPlan),
Schema: bqToSchema(s.Query.Schema),
SlotMillis: s.Query.TotalSlotMs,
Expand Down

0 comments on commit 99d5728

Please sign in to comment.