From d3c965c475383109a7431ad2c6d4901a16feb77e Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Thu, 3 Dec 2020 16:46:09 +0100 Subject: [PATCH 1/7] feat(spanner): add support for RPC priority Adds support for setting a priority per RPC. This can be any read/query/update statement and it can be set for a commit RPC. --- spanner/client.go | 18 ++++- spanner/client_test.go | 171 +++++++++++++++++++++++++++++++++++++++++ spanner/pdml.go | 3 + spanner/transaction.go | 118 ++++++++++++++++++++-------- 4 files changed, 277 insertions(+), 33 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index eb5ff9798d3..933bb9e6e79 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -469,6 +469,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea } else { t = &ReadWriteTransaction{} } + t.options = options t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo @@ -491,6 +492,8 @@ type applyOption struct { // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud // Spanner at least once. atLeastOnce bool + // priority is the RPC priority that is used for the commit operation. + priority sppb.RequestOptions_Priority } // An ApplyOption is an optional argument to Apply. @@ -513,6 +516,14 @@ func ApplyAtLeastOnce() ApplyOption { } } +// Priority returns an ApplyOptions that sets the RPC priority to use for the +// commit operation. +func Priority(priority sppb.RequestOptions_Priority) ApplyOption { + return func(ao *applyOption) { + ao.priority = priority + } +} + // Apply applies a list of mutations atomically to the database. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { ao := &applyOption{} @@ -524,11 +535,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) defer func() { trace.EndSpan(ctx, err) }() if !ao.atLeastOnce { - return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { + resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { return t.BufferWrite(ms) - }) + }, TransactionOptions{CommitPriority: ao.priority}) + return resp.CommitTs, err } - t := &writeOnlyTransaction{c.idleSessions} + t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority} return t.applyAtLeastOnce(ctx, ms...) } diff --git a/spanner/client_test.go b/spanner/client_test.go index 92cac5564bb..16c4367b477 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -2239,3 +2239,174 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) { t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.InvalidArgument) } } + +func TestClient_ReadOnlyTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_HIGH}, + } { + for _, tx := range []*ReadOnlyTransaction{ + client.Single(), + client.ReadOnlyTransaction(), + } { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + if tx.singleUse { + tx = client.Single() + } + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority}) + tx.Close() + } + } +} + +func TestClient_ReadWriteTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM}, + } { + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_MEDIUM}, + } { + client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) + + return nil + }, to) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) + } + } +} + +func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {CommitPriority: sppb.RequestOptions_PRIORITY_LOW}, + } { + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_LOW}, + } { + tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to) + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority}) + tx.Commit(context.Background()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority}) + } + } +} + +func TestClient_PDML_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + for _, qo := range []QueryOptions{ + {}, + {Priority: sppb.RequestOptions_PRIORITY_HIGH}, + } { + client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority}) + } +} + +func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) { + reqs := drainRequestsFromServer(server) + reqOptions := []*sppb.RequestOptions{} + + for _, req := range reqs { + if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { + reqOptions = append(reqOptions, sqlReq.RequestOptions) + } + if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok { + reqOptions = append(reqOptions, batchReq.RequestOptions) + } + if readReq, ok := req.(*sppb.ReadRequest); ok { + reqOptions = append(reqOptions, readReq.RequestOptions) + } + } + + if got, want := len(reqOptions), reqCount; got != want { + t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want) + } + + for _, opts := range reqOptions { + var got sppb.RequestOptions_Priority + if opts != nil { + got = opts.Priority + } + want := ro.Priority + if got != want { + t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want) + } + } +} + +func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) { + reqs := drainRequestsFromServer(server) + var commit *sppb.CommitRequest + var ok bool + + for _, req := range reqs { + if commit, ok = req.(*sppb.CommitRequest); ok { + break + } + } + + if commit == nil { + t.Fatalf("Missing commit request") + } + + var got sppb.RequestOptions_Priority + if commit.RequestOptions != nil { + got = commit.RequestOptions.Priority + } + want := ro.Priority + if got != want { + t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want) + } +} diff --git a/spanner/pdml.go b/spanner/pdml.go index 52e56ab1d7f..245d9238ab5 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -69,6 +69,9 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt Params: params, ParamTypes: paramTypes, QueryOptions: options.Options, + RequestOptions: &sppb.RequestOptions{ + Priority: options.Priority, + }, } // Make a retryer for Aborted and certain Internal errors. diff --git a/spanner/transaction.go b/spanner/transaction.go index cd7ea25bcaf..b6e50f83cc9 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -76,6 +76,9 @@ type txReadOnly struct { // TransactionOptions provides options for a transaction. type TransactionOptions struct { + // CommitPriority is the priority to use for the Commit RPC for the + // transaction. + CommitPriority sppb.RequestOptions_Priority } // errSessionClosed returns error for using a recycled/destroyed session @@ -104,6 +107,9 @@ type ReadOptions struct { // The maximum number of rows to read. A limit value less than 1 means no // limit. Limit int + + // Priority is the RPC priority to use for the read operation. + Priority sppb.RequestOptions_Priority } // ReadWithOptions returns a RowIterator for reading multiple rows from the @@ -131,11 +137,15 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key } index := "" limit := 0 + var ro *sppb.RequestOptions if opts != nil { index = opts.Index if opts.Limit > 0 { limit = opts.Limit } + if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro = &sppb.RequestOptions{Priority: opts.Priority} + } } return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), @@ -143,14 +153,15 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { return client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: t.sh.getID(), - Transaction: ts, - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - ResumeToken: resumeToken, - Limit: int64(limit), + Session: t.sh.getID(), + Transaction: ts, + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + ResumeToken: resumeToken, + Limit: int64(limit), + RequestOptions: ro, }) }, t.replaceSessionFunc, @@ -223,27 +234,41 @@ func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index } } -// QueryOptions provides options for executing a sql query from a database. +// QueryOptions provides options for executing a sql query or update statement. type QueryOptions struct { Mode *sppb.ExecuteSqlRequest_QueryMode Options *sppb.ExecuteSqlRequest_QueryOptions + + // Priority is the RPC priority to use for the query/update. + Priority sppb.RequestOptions_Priority } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + Priority: qo.Priority, } if opts.Mode != nil { merged.Mode = opts.Mode } + if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + merged.Priority = opts.Priority + } proto.Merge(merged.Options, qo.Options) proto.Merge(merged.Options, opts.Options) return merged } +func (qo QueryOptions) createRequestOptions() (ro *sppb.RequestOptions) { + if qo.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro = &sppb.RequestOptions{Priority: qo.Priority} + } + return ro +} + // Query executes a query against the database. It returns a RowIterator for // retrieving the resulting rows. // @@ -340,14 +365,15 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti mode = *options.Mode } req := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: stmt.SQL, - QueryMode: mode, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, + Session: sid, + Transaction: ts, + Sql: stmt.SQL, + QueryMode: mode, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: options.createRequestOptions(), } return req, sh, nil } @@ -777,6 +803,8 @@ type ReadWriteTransaction struct { state txState // wb is the set of buffered mutations waiting to be committed. wb []*Mutation + // options contains additional options for the read/write transaction. + options TransactionOptions } // BufferWrite adds a list of mutations to the set of updates that will be @@ -812,8 +840,8 @@ func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowC } // UpdateWithOptions executes a DML statement against the database. It returns -// the number of affected rows. The sql query execution will be optimized -// based on the given query options. +// the number of affected rows. The given QueryOptions will be used for the +// execution of this statement. func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { return t.update(ctx, stmt, t.qo.merge(opts)) } @@ -842,6 +870,20 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts // affected rows for the given query at the same index. If an error occurs, // counts will be returned up to the query that encountered the error. func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { + return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{}) +} + +// BatchUpdateWithOptions groups one or more DML statements and sends them to +// Spanner in a single RPC. This is an efficient way to execute multiple DML +// statements. +// +// A slice of counts is returned, where each count represents the number of +// affected rows for the given query at the same index. If an error occurs, +// counts will be returned up to the query that encountered the error. +// +// The priority given in the QueryOptions will be included with the RPC. +// Any other options that are set in the QueryOptions struct will be ignored. +func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") defer func() { trace.EndSpan(ctx, err) }() @@ -870,10 +912,11 @@ func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statemen } resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ - Session: sh.getID(), - Transaction: ts, - Statements: sppbStmts, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Session: sh.getID(), + Transaction: ts, + Statements: sppbStmts, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + RequestOptions: opts.createRequestOptions(), }) if err != nil { return nil, ToSpannerError(err) @@ -968,6 +1011,7 @@ type CommitResponse struct { // returns the commit response for the transactions. func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) { resp := CommitResponse{} + t.mu.Lock() t.state = txClosed // No further operations after commit. mPb, err := mutationsProto(t.wb) @@ -975,6 +1019,11 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, erro if err != nil { return resp, err } + + var opts *sppb.RequestOptions + if t.options.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + opts = &sppb.RequestOptions{Priority: t.options.CommitPriority} + } // In case that sessionHandle was destroyed but transaction body fails to // report it. sid, client := t.sh.getID(), t.sh.getClient() @@ -987,7 +1036,8 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, erro Transaction: &sppb.CommitRequest_TransactionId{ TransactionId: t.tx, }, - Mutations: mPb, + Mutations: mPb, + RequestOptions: opts, }) if e != nil { return resp, toSpannerErrorWithCommitInfo(e, true) @@ -1108,7 +1158,8 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, } t = &ReadWriteStmtBasedTransaction{ ReadWriteTransaction: ReadWriteTransaction{ - tx: sh.getTransactionID(), + tx: sh.getTransactionID(), + options: options, }, } t.txReadOnly.sh = sh @@ -1153,6 +1204,8 @@ type writeOnlyTransaction struct { // sp is the session pool which writeOnlyTransaction uses to get Cloud // Spanner sessions for blind writes. sp *sessionPool + // commitPriority is the RPC priority to use for the commit operation. + commitPriority sppb.RequestOptions_Priority } // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, @@ -1163,14 +1216,18 @@ type writeOnlyTransaction struct { // 3) There is a malformed Mutation object. func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { var ( - ts time.Time - sh *sessionHandle + ts time.Time + sh *sessionHandle + opts *sppb.RequestOptions ) mPb, err := mutationsProto(ms) if err != nil { // Malformed mutation found, just return the error. return ts, err } + if t.commitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + opts = &sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_UNSPECIFIED} + } // Retry-loop for aborted transactions. // TODO: Replace with generic retryer. @@ -1194,7 +1251,8 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta }, }, }, - Mutations: mPb, + Mutations: mPb, + RequestOptions: opts, }) if err != nil && !isAbortedErr(err) { if isSessionNotFoundError(err) { From 4ec3f489bad02fcf52804e7dff302c516ddd69bf Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Thu, 3 Dec 2020 17:12:22 +0100 Subject: [PATCH 2/7] fix: fix ApplyAtLeastOnce + add tests --- spanner/client_test.go | 19 +++++++++++++++++++ spanner/transaction.go | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 16c4367b477..09bf8245540 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -2354,6 +2354,25 @@ func TestClient_PDML_Priority(t *testing.T) { } } +func TestClient_Apply_Priority(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH)) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM)) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM}) +} + func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) { reqs := drainRequestsFromServer(server) reqOptions := []*sppb.RequestOptions{} diff --git a/spanner/transaction.go b/spanner/transaction.go index b6e50f83cc9..44a316d8378 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -1226,7 +1226,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta return ts, err } if t.commitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - opts = &sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_UNSPECIFIED} + opts = &sppb.RequestOptions{Priority: t.commitPriority} } // Retry-loop for aborted transactions. From 3b1adddc5b79ab5ea597e2e7b441b27e85cae263 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Thu, 3 Dec 2020 17:14:31 +0100 Subject: [PATCH 3/7] fix: only include RequestOptions if necessary --- spanner/pdml.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/spanner/pdml.go b/spanner/pdml.go index 245d9238ab5..506154f528d 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -56,6 +56,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt if sh != nil { defer sh.recycle() } + var ro *sppb.RequestOptions + if options.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro = &sppb.RequestOptions{Priority: options.Priority} + } // Create the parameters and the SQL request, but without a transaction. // The transaction reference will be added by the executePdml method. @@ -64,14 +68,12 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt return 0, ToSpannerError(err) } req := &sppb.ExecuteSqlRequest{ - Session: sh.getID(), - Sql: statement.SQL, - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, - RequestOptions: &sppb.RequestOptions{ - Priority: options.Priority, - }, + Session: sh.getID(), + Sql: statement.SQL, + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: ro, } // Make a retryer for Aborted and certain Internal errors. From 04e7c6447b27271a6c021de71cf2c040b9e71331 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Thu, 3 Dec 2020 17:16:25 +0100 Subject: [PATCH 4/7] fix: remove unnecessary blank line --- spanner/transaction.go | 1 - 1 file changed, 1 deletion(-) diff --git a/spanner/transaction.go b/spanner/transaction.go index 44a316d8378..18cd1670cca 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -1011,7 +1011,6 @@ type CommitResponse struct { // returns the commit response for the transactions. func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) { resp := CommitResponse{} - t.mu.Lock() t.state = txClosed // No further operations after commit. mPb, err := mutationsProto(t.wb) From 067f55003948734c5136da3aef9b6f6b33da77d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 26 Mar 2021 09:40:43 +0100 Subject: [PATCH 5/7] fix: merge txOpts and options --- spanner/client.go | 1 - spanner/transaction.go | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index dd24b31968d..e201852256d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -466,7 +466,6 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea } else { t = &ReadWriteTransaction{} } - t.options = options t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo diff --git a/spanner/transaction.go b/spanner/transaction.go index fa05c8ac796..bddf8f925dc 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -809,7 +809,7 @@ type ReadWriteTransaction struct { // wb is the set of buffered mutations waiting to be committed. wb []*Mutation // options contains additional options for the read/write transaction. - options TransactionOptions + // options TransactionOptions } // BufferWrite adds a list of mutations to the set of updates that will be @@ -1032,8 +1032,8 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions } var opts *sppb.RequestOptions - if t.options.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - opts = &sppb.RequestOptions{Priority: t.options.CommitPriority} + if t.txOpts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + opts = &sppb.RequestOptions{Priority: t.txOpts.CommitPriority} } // In case that sessionHandle was destroyed but transaction body fails to // report it. @@ -1175,8 +1175,7 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, } t = &ReadWriteStmtBasedTransaction{ ReadWriteTransaction: ReadWriteTransaction{ - tx: sh.getTransactionID(), - options: options, + tx: sh.getTransactionID(), }, } t.txReadOnly.sh = sh From e7b42565ef6fd38b18bb6b0fb5cf0982e29b9665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 26 Mar 2021 09:57:09 +0100 Subject: [PATCH 6/7] fix: remove commented code --- spanner/transaction.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/spanner/transaction.go b/spanner/transaction.go index bddf8f925dc..136f95d4f34 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -808,8 +808,6 @@ type ReadWriteTransaction struct { state txState // wb is the set of buffered mutations waiting to be committed. wb []*Mutation - // options contains additional options for the read/write transaction. - // options TransactionOptions } // BufferWrite adds a list of mutations to the set of updates that will be From c4482afd5ca77cc875f99f6c6874fdd8b27c32f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Tue, 30 Mar 2021 10:41:42 +0200 Subject: [PATCH 7/7] fix: create generic createRequestOptions() method --- spanner/pdml.go | 6 +---- spanner/transaction.go | 54 ++++++++++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/spanner/pdml.go b/spanner/pdml.go index 506154f528d..5a663ee1ecf 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -56,10 +56,6 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt if sh != nil { defer sh.recycle() } - var ro *sppb.RequestOptions - if options.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - ro = &sppb.RequestOptions{Priority: options.Priority} - } // Create the parameters and the SQL request, but without a transaction. // The transaction reference will be added by the executePdml method. @@ -73,7 +69,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt Params: params, ParamTypes: paramTypes, QueryOptions: options.Options, - RequestOptions: ro, + RequestOptions: createRequestOptions(&options), } // Make a retryer for Aborted and certain Internal errors. diff --git a/spanner/transaction.go b/spanner/transaction.go index 136f95d4f34..019d35f5a2c 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -77,6 +77,11 @@ type txReadOnly struct { txOpts TransactionOptions } +// Internal interface for types that can configure the priority of an RPC. +type requestPrioritizer interface { + requestPriority() sppb.RequestOptions_Priority +} + // TransactionOptions provides options for a transaction. type TransactionOptions struct { CommitOptions CommitOptions @@ -86,6 +91,10 @@ type TransactionOptions struct { CommitPriority sppb.RequestOptions_Priority } +func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { + return to.CommitPriority +} + // errSessionClosed returns error for using a recycled/destroyed session func errSessionClosed(sh *sessionHandle) error { return spannerErrorf(codes.FailedPrecondition, @@ -117,6 +126,10 @@ type ReadOptions struct { Priority sppb.RequestOptions_Priority } +func (ro *ReadOptions) requestPriority() sppb.RequestOptions_Priority { + return ro.Priority +} + // ReadWithOptions returns a RowIterator for reading multiple rows from the // database. Pass a ReadOptions to modify the read operation. func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { @@ -148,9 +161,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key if opts.Limit > 0 { limit = opts.Limit } - if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - ro = &sppb.RequestOptions{Priority: opts.Priority} - } + ro = createRequestOptions(opts) } return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), @@ -248,6 +259,10 @@ type QueryOptions struct { Priority sppb.RequestOptions_Priority } +func (qo *QueryOptions) requestPriority() sppb.RequestOptions_Priority { + return qo.Priority +} + // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { @@ -267,9 +282,12 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { return merged } -func (qo QueryOptions) createRequestOptions() (ro *sppb.RequestOptions) { - if qo.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - ro = &sppb.RequestOptions{Priority: qo.Priority} +func createRequestOptions(prioritizer requestPrioritizer) (ro *sppb.RequestOptions) { + if prioritizer == nil { + return nil + } + if prioritizer.requestPriority() != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro = &sppb.RequestOptions{Priority: prioritizer.requestPriority()} } return ro } @@ -378,7 +396,7 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti Params: params, ParamTypes: paramTypes, QueryOptions: options.Options, - RequestOptions: options.createRequestOptions(), + RequestOptions: createRequestOptions(&options), } return req, sh, nil } @@ -919,7 +937,7 @@ func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts Transaction: ts, Statements: sppbStmts, Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - RequestOptions: opts.createRequestOptions(), + RequestOptions: createRequestOptions(&opts), }) if err != nil { return nil, ToSpannerError(err) @@ -1029,10 +1047,6 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions return resp, err } - var opts *sppb.RequestOptions - if t.txOpts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - opts = &sppb.RequestOptions{Priority: t.txOpts.CommitPriority} - } // In case that sessionHandle was destroyed but transaction body fails to // report it. sid, client := t.sh.getID(), t.sh.getClient() @@ -1045,7 +1059,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions Transaction: &sppb.CommitRequest_TransactionId{ TransactionId: t.tx, }, - RequestOptions: opts, + RequestOptions: createRequestOptions(&t.txOpts), Mutations: mPb, ReturnCommitStats: options.ReturnCommitStats, }) @@ -1230,6 +1244,10 @@ type writeOnlyTransaction struct { commitPriority sppb.RequestOptions_Priority } +func (t *writeOnlyTransaction) requestPriority() sppb.RequestOptions_Priority { + return t.commitPriority +} + // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, // unless one of the following happens: // @@ -1238,18 +1256,14 @@ type writeOnlyTransaction struct { // 3) There is a malformed Mutation object. func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { var ( - ts time.Time - sh *sessionHandle - opts *sppb.RequestOptions + ts time.Time + sh *sessionHandle ) mPb, err := mutationsProto(ms) if err != nil { // Malformed mutation found, just return the error. return ts, err } - if t.commitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - opts = &sppb.RequestOptions{Priority: t.commitPriority} - } // Retry-loop for aborted transactions. // TODO: Replace with generic retryer. @@ -1274,7 +1288,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta }, }, Mutations: mPb, - RequestOptions: opts, + RequestOptions: createRequestOptions(t), }) if err != nil && !isAbortedErr(err) { if isSessionNotFoundError(err) {