Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add more dml statistics to query statistics #4405

Merged
merged 5 commits into from Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 32 additions & 12 deletions bigquery/integration_test.go
Expand Up @@ -479,7 +479,7 @@ func TestIntegration_SnapshotAndRestore(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -872,7 +872,7 @@ func TestIntegration_DatasetUpdateAccess(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) {
END`,
routine.FullyQualifiedName())

if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -2013,7 +2013,8 @@ func TestIntegration_DML(t *testing.T) {
('b', [1], STRUCT<BOOL>(FALSE)),
('c', [2], STRUCT<BOOL>(TRUE))`,
table.DatasetID, table.TableID)
if err := runQueryJob(ctx, sql); err != nil {
stats, err := runQueryJob(ctx, sql)
if err != nil {
t.Fatal(err)
}
wantRows := [][]Value{
Expand All @@ -2022,11 +2023,23 @@ func TestIntegration_DML(t *testing.T) {
{"c", []Value{int64(2)}, []Value{true}},
}
checkRead(t, "DML", table.Read(ctx), wantRows)
if stats == nil {
t.Fatalf("no query stats")
}
if stats.DMLStats == nil {
t.Fatalf("no dml stats")
}
wantRowCount := int64(len(wantRows))
if stats.DMLStats.InsertedRowCount != wantRowCount {
t.Fatalf("dml stats mismatch. got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount)
}
}

// runQueryJob is useful for running queries where no row data is returned (DDL/DML).
func runQueryJob(ctx context.Context, sql string) error {
return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
func runQueryJob(ctx context.Context, sql string) (*QueryStatistics, error) {
var stats *QueryStatistics
var err error
err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
job, err := client.Query(sql).Run(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
Expand All @@ -2041,8 +2054,15 @@ func runQueryJob(ctx context.Context, sql string) error {
}
return false, err
}
status := job.LastStatus()
if status.Statistics != nil {
if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
stats = qStats
}
}
return true, nil
})
return stats, err
}

func TestIntegration_TimeTypes(t *testing.T) {
Expand Down Expand Up @@ -2082,7 +2102,7 @@ func TestIntegration_TimeTypes(t *testing.T) {
"VALUES ('%s', '%s', '%s', '%s')",
table.DatasetID, table.TableID,
d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
if err := runQueryJob(ctx, query); err != nil {
if _, err := runQueryJob(ctx, query); err != nil {
t.Fatal(err)
}
wantRows = append(wantRows, wantRows[0])
Expand Down Expand Up @@ -2506,7 +2526,7 @@ func TestIntegration_ExtractExternal(t *testing.T) {
sql := fmt.Sprintf(`INSERT %s.%s (name, num)
VALUES ('a', 1), ('b', 2), ('c', 3)`,
table.DatasetID, table.TableID)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
// Extract to a GCS object as CSV.
Expand Down Expand Up @@ -2932,7 +2952,7 @@ func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
FROM
UNNEST(GENERATE_ARRAY(0,999))
`, qualified)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatalf("couldn't instantiate base table: %v", err)
}

Expand Down Expand Up @@ -3060,7 +3080,7 @@ func TestIntegration_ModelLifecycle(t *testing.T) {
UNION ALL
SELECT 'b' AS f1, 3.8 AS label
)`, modelRef)
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer model.Delete(ctx)
Expand Down Expand Up @@ -3243,7 +3263,7 @@ func TestIntegration_RoutineComplexTypes(t *testing.T) {
(SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
)`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down Expand Up @@ -3303,7 +3323,7 @@ func TestIntegration_RoutineLifecycle(t *testing.T) {
sql := fmt.Sprintf(`
CREATE FUNCTION `+"`%s`"+`(x INT64) AS (x * 3);`,
routine.FullyQualifiedName())
if err := runQueryJob(ctx, sql); err != nil {
if _, err := runQueryJob(ctx, sql); err != nil {
t.Fatal(err)
}
defer routine.Delete(ctx)
Expand Down
26 changes: 26 additions & 0 deletions bigquery/job.go
Expand Up @@ -436,6 +436,10 @@ type QueryStatistics struct {
// statements INSERT, UPDATE or DELETE.
NumDMLAffectedRows int64

// DMLStats provides statistics about the row mutations performed by
// DML statements.
DMLStats *DMLStatistics

// Describes a timeline of job execution.
Timeline []*QueryTimelineSample

Expand Down Expand Up @@ -665,6 +669,27 @@ func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
}
}

// DMLStatistics contains counts of row mutations triggered by a DML query statement.
type DMLStatistics struct {
// Rows added by the statement.
InsertedRowCount int64
// Rows removed by the statement.
DeletedRowCount int64
// Rows changed by the statement.
UpdatedRowCount int64
}

func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
if q == nil {
return nil
}
return &DMLStatistics{
InsertedRowCount: q.InsertedRowCount,
DeletedRowCount: q.DeletedRowCount,
UpdatedRowCount: q.UpdatedRowCount,
}
}

func (*ExtractStatistics) implementsStatistics() {}
func (*LoadStatistics) implementsStatistics() {}
func (*QueryStatistics) implementsStatistics() {}
Expand Down Expand Up @@ -888,6 +913,7 @@ func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
TotalBytesProcessed: s.Query.TotalBytesProcessed,
TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy,
NumDMLAffectedRows: s.Query.NumDmlAffectedRows,
DMLStats: bqToDMLStatistics(s.Query.DmlStats),
QueryPlan: queryPlanFromProto(s.Query.QueryPlan),
Schema: bqToSchema(s.Query.Schema),
SlotMillis: s.Query.TotalSlotMs,
Expand Down