diff --git a/spanner/client.go b/spanner/client.go index afa35315e29..e201852256d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -487,6 +487,8 @@ type applyOption struct { // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud // Spanner at least once. atLeastOnce bool + // priority is the RPC priority that is used for the commit operation. + priority sppb.RequestOptions_Priority } // An ApplyOption is an optional argument to Apply. @@ -509,6 +511,14 @@ func ApplyAtLeastOnce() ApplyOption { } } +// Priority returns an ApplyOptions that sets the RPC priority to use for the +// commit operation. +func Priority(priority sppb.RequestOptions_Priority) ApplyOption { + return func(ao *applyOption) { + ao.priority = priority + } +} + // Apply applies a list of mutations atomically to the database. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { ao := &applyOption{} @@ -520,11 +530,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) defer func() { trace.EndSpan(ctx, err) }() if !ao.atLeastOnce { - return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { + resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { return t.BufferWrite(ms) - }) + }, TransactionOptions{CommitPriority: ao.priority}) + return resp.CommitTs, err } - t := &writeOnlyTransaction{c.idleSessions} + t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority} return t.applyAtLeastOnce(ctx, ms...) } diff --git a/spanner/client_test.go b/spanner/client_test.go index c7ec16bc535..7130c0ec9d4 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -869,7 +869,10 @@ func TestClient_ReadWriteStmtBasedTransactionWithOptions(t *testing.T) { _, client, teardown := setupMockedTestServer(t) defer teardown() ctx := context.Background() - tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, TransactionOptions{CommitOptions{ReturnCommitStats: true}}) + tx, err := NewReadWriteStmtBasedTransactionWithOptions( + ctx, + client, + TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}) if err != nil { t.Fatalf("Unexpected error when creating transaction: %v", err) } @@ -2344,6 +2347,196 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) { } } +func TestClient_ReadOnlyTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_HIGH}, + } { + for _, tx := range []*ReadOnlyTransaction{ + client.Single(), + client.ReadOnlyTransaction(), + } { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + if tx.singleUse { + tx = client.Single() + } + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority}) + tx.Close() + } + } +} + +func TestClient_ReadWriteTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM}, + } { + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_MEDIUM}, + } { + client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) + + return nil + }, to) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) + } + } +} + +func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {CommitPriority: sppb.RequestOptions_PRIORITY_LOW}, + } { + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_LOW}, + } { + tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to) + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) + tx.Commit(context.Background()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) + } + } +} + +func TestClient_PDML_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_HIGH}, + } { + client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority}) + } +} + +func TestClient_Apply_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH)) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM)) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM}) +} + +func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) { + reqs := drainRequestsFromServer(server) + reqOptions := []*sppb.RequestOptions{} + + for _, req := range reqs { + if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { + reqOptions = append(reqOptions, sqlReq.RequestOptions) + } + if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok { + reqOptions = append(reqOptions, batchReq.RequestOptions) + } + if readReq, ok := req.(*sppb.ReadRequest); ok { + reqOptions = append(reqOptions, readReq.RequestOptions) + } + } + + if got, want := len(reqOptions), reqCount; got != want { + t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want) + } + + for _, opts := range reqOptions { + var got sppb.RequestOptions_Priority + if opts != nil { + got = opts.Priority + } + want := ro.Priority + if got != want { + t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want) + } + } +} + +func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) { + reqs := drainRequestsFromServer(server) + var commit *sppb.CommitRequest + var ok bool + + for _, req := range reqs { + if commit, ok = req.(*sppb.CommitRequest); ok { + break + } + } + + if commit == nil { + t.Fatalf("Missing commit request") + } + + var got sppb.RequestOptions_Priority + if commit.RequestOptions != nil { + got = commit.RequestOptions.Priority + } + want := ro.Priority + if got != want { + t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want) + } +} + func TestClient_Single_Read_WithNumericKey(t *testing.T) { t.Parallel() diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 73287c4b8d6..88e8626af13 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -1058,7 +1058,7 @@ func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) { } } - txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}} + txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}} resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { // Query Foo's balance and Bar's balance. bf, e := readBalance(tx.Query(ctx, @@ -1237,7 +1237,7 @@ func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T } var resp CommitResponse - txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}} + txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}} for { tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts) if err != nil { diff --git a/spanner/pdml.go b/spanner/pdml.go index 52e56ab1d7f..5a663ee1ecf 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -64,11 +64,12 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt return 0, ToSpannerError(err) } req := &sppb.ExecuteSqlRequest{ - Session: sh.getID(), - Sql: statement.SQL, - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, + Session: sh.getID(), + Sql: statement.SQL, + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: createRequestOptions(&options), } // Make a retryer for Aborted and certain Internal errors. diff --git a/spanner/transaction.go b/spanner/transaction.go index 67bf2b7c16c..019d35f5a2c 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -77,9 +77,22 @@ type txReadOnly struct { txOpts TransactionOptions } +// Internal interface for types that can configure the priority of an RPC. +type requestPrioritizer interface { + requestPriority() sppb.RequestOptions_Priority +} + // TransactionOptions provides options for a transaction. type TransactionOptions struct { CommitOptions CommitOptions + + // CommitPriority is the priority to use for the Commit RPC for the + // transaction. + CommitPriority sppb.RequestOptions_Priority +} + +func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { + return to.CommitPriority } // errSessionClosed returns error for using a recycled/destroyed session @@ -108,6 +121,13 @@ type ReadOptions struct { // The maximum number of rows to read. A limit value less than 1 means no // limit. Limit int + + // Priority is the RPC priority to use for the read operation. + Priority sppb.RequestOptions_Priority +} + +func (ro *ReadOptions) requestPriority() sppb.RequestOptions_Priority { + return ro.Priority } // ReadWithOptions returns a RowIterator for reading multiple rows from the @@ -135,11 +155,13 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key } index := "" limit := 0 + var ro *sppb.RequestOptions if opts != nil { index = opts.Index if opts.Limit > 0 { limit = opts.Limit } + ro = createRequestOptions(opts) } return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), @@ -147,14 +169,15 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { return client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: t.sh.getID(), - Transaction: ts, - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - ResumeToken: resumeToken, - Limit: int64(limit), + Session: t.sh.getID(), + Transaction: ts, + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + ResumeToken: resumeToken, + Limit: int64(limit), + RequestOptions: ro, }) }, t.replaceSessionFunc, @@ -227,27 +250,48 @@ func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index } } -// QueryOptions provides options for executing a sql query from a database. +// QueryOptions provides options for executing a sql query or update statement. type QueryOptions struct { Mode *sppb.ExecuteSqlRequest_QueryMode Options *sppb.ExecuteSqlRequest_QueryOptions + + // Priority is the RPC priority to use for the query/update. + Priority sppb.RequestOptions_Priority +} + +func (qo *QueryOptions) requestPriority() sppb.RequestOptions_Priority { + return qo.Priority } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + Priority: qo.Priority, } if opts.Mode != nil { merged.Mode = opts.Mode } + if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + merged.Priority = opts.Priority + } proto.Merge(merged.Options, qo.Options) proto.Merge(merged.Options, opts.Options) return merged } +func createRequestOptions(prioritizer requestPrioritizer) (ro *sppb.RequestOptions) { + if prioritizer == nil { + return nil + } + if prioritizer.requestPriority() != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro = &sppb.RequestOptions{Priority: prioritizer.requestPriority()} + } + return ro +} + // Query executes a query against the database. It returns a RowIterator for // retrieving the resulting rows. // @@ -344,14 +388,15 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti mode = *options.Mode } req := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: stmt.SQL, - QueryMode: mode, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, + Session: sid, + Transaction: ts, + Sql: stmt.SQL, + QueryMode: mode, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: createRequestOptions(&options), } return req, sh, nil } @@ -816,8 +861,8 @@ func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowC } // UpdateWithOptions executes a DML statement against the database. It returns -// the number of affected rows. The sql query execution will be optimized -// based on the given query options. +// the number of affected rows. The given QueryOptions will be used for the +// execution of this statement. func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { return t.update(ctx, stmt, t.qo.merge(opts)) } @@ -846,6 +891,20 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts // affected rows for the given query at the same index. If an error occurs, // counts will be returned up to the query that encountered the error. func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { + return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{}) +} + +// BatchUpdateWithOptions groups one or more DML statements and sends them to +// Spanner in a single RPC. This is an efficient way to execute multiple DML +// statements. +// +// A slice of counts is returned, where each count represents the number of +// affected rows for the given query at the same index. If an error occurs, +// counts will be returned up to the query that encountered the error. +// +// The priority given in the QueryOptions will be included with the RPC. +// Any other options that are set in the QueryOptions struct will be ignored. +func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") defer func() { trace.EndSpan(ctx, err) }() @@ -874,10 +933,11 @@ func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statemen } resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ - Session: sh.getID(), - Transaction: ts, - Statements: sppbStmts, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Session: sh.getID(), + Transaction: ts, + Statements: sppbStmts, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + RequestOptions: createRequestOptions(&opts), }) if err != nil { return nil, ToSpannerError(err) @@ -986,6 +1046,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions if err != nil { return resp, err } + // In case that sessionHandle was destroyed but transaction body fails to // report it. sid, client := t.sh.getID(), t.sh.getClient() @@ -998,6 +1059,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions Transaction: &sppb.CommitRequest_TransactionId{ TransactionId: t.tx, }, + RequestOptions: createRequestOptions(&t.txOpts), Mutations: mPb, ReturnCommitStats: options.ReturnCommitStats, }) @@ -1178,6 +1240,12 @@ type writeOnlyTransaction struct { // sp is the session pool which writeOnlyTransaction uses to get Cloud // Spanner sessions for blind writes. sp *sessionPool + // commitPriority is the RPC priority to use for the commit operation. + commitPriority sppb.RequestOptions_Priority +} + +func (t *writeOnlyTransaction) requestPriority() sppb.RequestOptions_Priority { + return t.commitPriority } // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, @@ -1219,7 +1287,8 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta }, }, }, - Mutations: mPb, + Mutations: mPb, + RequestOptions: createRequestOptions(t), }) if err != nil && !isAbortedErr(err) { if isSessionNotFoundError(err) {