diff --git a/spanner/client.go b/spanner/client.go index 9659bb566fc..254468a0a90 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -497,6 +497,8 @@ type applyOption struct { // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud // Spanner at least once. atLeastOnce bool + // transactionTag will be included with the CommitRequest. + transactionTag string // priority is the RPC priority that is used for the commit operation. priority sppb.RequestOptions_Priority } @@ -521,6 +523,14 @@ func ApplyAtLeastOnce() ApplyOption { } } +// TransactionTag returns an ApplyOption that will include the given tag as a +// transaction tag for a write-only transaction. +func TransactionTag(tag string) ApplyOption { + return func(ao *applyOption) { + ao.transactionTag = tag + } +} + // Priority returns an ApplyOptions that sets the RPC priority to use for the // commit operation. func Priority(priority sppb.RequestOptions_Priority) ApplyOption { @@ -542,10 +552,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) if !ao.atLeastOnce { resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { return t.BufferWrite(ms) - }, TransactionOptions{CommitPriority: ao.priority}) + }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag}) return resp.CommitTs, err } - t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority} + t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag} return t.applyAtLeastOnce(ctx, ms...) } diff --git a/spanner/client_test.go b/spanner/client_test.go index 2b09d1b5ee2..099f1825c39 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -2530,6 +2530,141 @@ func TestClient_Apply_Priority(t *testing.T) { checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM}) } +func TestClient_ReadOnlyTransaction_Tag(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, qo := range []QueryOptions{ + {}, + {RequestTag: "tag-1"}, + } { + for _, tx := range []*ReadOnlyTransaction{ + client.Single(), + client.ReadOnlyTransaction(), + } { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + if tx.singleUse { + tx = client.Single() + } + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) + iter.Next() + iter.Stop() + + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{RequestTag: qo.RequestTag}) + tx.Close() + } + } +} + +func TestClient_ReadWriteTransaction_Tag(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {TransactionTag: "tx-tag-1"}, + } { + for _, qo := range []QueryOptions{ + {}, + {RequestTag: "request-tag-1"}, + } { + client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + + // Check for SQL requests inside the transaction to prevent the check to + // drain the commit request from the server. + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag}) + return nil + }, to) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag}) + } + } +} + +func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + for _, to := range []TransactionOptions{ + {}, + {TransactionTag: "tx-tag-1"}, + } { + for _, qo := range []QueryOptions{ + {}, + {RequestTag: "request-tag-1"}, + } { + tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to) + iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo) + iter.Next() + iter.Stop() + + iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag}) + iter.Next() + iter.Stop() + + tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + tx.BatchUpdateWithOptions(context.Background(), []Statement{ + NewStatement(UpdateBarSetFoo), + }, qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag}) + + tx.Commit(context.Background()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag}) + } + } +} + +func TestClient_PDML_Tag(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + for _, qo := range []QueryOptions{ + {}, + {RequestTag: "request-tag-1"}, + } { + client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo) + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: qo.RequestTag}) + } +} + +func TestClient_Apply_Tagging(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag")) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce()) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{}) + + client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag")) + checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"}) +} + func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) { reqs := drainRequestsFromServer(server) reqOptions := []*sppb.RequestOptions{} @@ -2559,6 +2694,12 @@ func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerSer if got != want { t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want) } + if got, want := opts.RequestTag, ro.RequestTag; got != want { + t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want) + } + if got, want := opts.TransactionTag, ro.TransactionTag; got != want { + t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want) + } } } @@ -2585,6 +2726,19 @@ func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServe if got != want { t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want) } + + var requestTag string + var transactionTag string + if commit.RequestOptions != nil { + requestTag = commit.RequestOptions.RequestTag + transactionTag = commit.RequestOptions.TransactionTag + } + if got, want := requestTag, ro.RequestTag; got != want { + t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want) + } + if got, want := transactionTag, ro.TransactionTag; got != want { + t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want) + } } func TestClient_Single_Read_WithNumericKey(t *testing.T) { diff --git a/spanner/pdml.go b/spanner/pdml.go index 5a663ee1ecf..d24a8767ab9 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -69,7 +69,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt Params: params, ParamTypes: paramTypes, QueryOptions: options.Options, - RequestOptions: createRequestOptions(&options), + RequestOptions: createRequestOptions(options.Priority, options.RequestTag, ""), } // Make a retryer for Aborted and certain Internal errors. diff --git a/spanner/pdml_test.go b/spanner/pdml_test.go index ca4ac323044..45fcf8b7eb0 100644 --- a/spanner/pdml_test.go +++ b/spanner/pdml_test.go @@ -166,3 +166,15 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) { }) } } + +func TestPartitionedUpdate_Tagging(t *testing.T) { + ctx := context.Background() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + _, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{RequestTag: "pdml-tag"}) + if err != nil { + t.Fatalf("expect no errors, but got %v", err) + } + checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"}) +} diff --git a/spanner/transaction.go b/spanner/transaction.go index 019d35f5a2c..b9da4f401fe 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -77,15 +77,15 @@ type txReadOnly struct { txOpts TransactionOptions } -// Internal interface for types that can configure the priority of an RPC. -type requestPrioritizer interface { - requestPriority() sppb.RequestOptions_Priority -} - // TransactionOptions provides options for a transaction. type TransactionOptions struct { CommitOptions CommitOptions + // The transaction tag to use for a read/write transaction. + // This tag is automatically included with each statement and the commit + // request of a read/write transaction. + TransactionTag string + // CommitPriority is the priority to use for the Commit RPC for the // transaction. CommitPriority sppb.RequestOptions_Priority @@ -95,6 +95,10 @@ func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { return to.CommitPriority } +func (to *TransactionOptions) requestTag() string { + return "" +} + // errSessionClosed returns error for using a recycled/destroyed session func errSessionClosed(sh *sessionHandle) error { return spannerErrorf(codes.FailedPrecondition, @@ -122,12 +126,11 @@ type ReadOptions struct { // limit. Limit int - // Priority is the RPC priority to use for the read operation. + // Priority is the RPC priority to use for the operation. Priority sppb.RequestOptions_Priority -} -func (ro *ReadOptions) requestPriority() sppb.RequestOptions_Priority { - return ro.Priority + // The request tag to use for this request. + RequestTag string } // ReadWithOptions returns a RowIterator for reading multiple rows from the @@ -155,13 +158,15 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key } index := "" limit := 0 - var ro *sppb.RequestOptions + prio := sppb.RequestOptions_PRIORITY_UNSPECIFIED + requestTag := "" if opts != nil { index = opts.Index if opts.Limit > 0 { limit = opts.Limit } - ro = createRequestOptions(opts) + prio = opts.Priority + requestTag = opts.RequestTag } return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), @@ -177,7 +182,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key KeySet: kset, ResumeToken: resumeToken, Limit: int64(limit), - RequestOptions: ro, + RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), }) }, t.replaceSessionFunc, @@ -257,23 +262,26 @@ type QueryOptions struct { // Priority is the RPC priority to use for the query/update. Priority sppb.RequestOptions_Priority -} -func (qo *QueryOptions) requestPriority() sppb.RequestOptions_Priority { - return qo.Priority + // The request tag to use for this request. + RequestTag string } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, - Priority: qo.Priority, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + RequestTag: qo.RequestTag, + Priority: qo.Priority, } if opts.Mode != nil { merged.Mode = opts.Mode } + if opts.RequestTag != "" { + merged.RequestTag = opts.RequestTag + } if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { merged.Priority = opts.Priority } @@ -282,12 +290,16 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { return merged } -func createRequestOptions(prioritizer requestPrioritizer) (ro *sppb.RequestOptions) { - if prioritizer == nil { - return nil +func createRequestOptions(prio sppb.RequestOptions_Priority, requestTag, transactionTag string) (ro *sppb.RequestOptions) { + ro = &sppb.RequestOptions{} + if prio != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + ro.Priority = prio } - if prioritizer.requestPriority() != sppb.RequestOptions_PRIORITY_UNSPECIFIED { - ro = &sppb.RequestOptions{Priority: prioritizer.requestPriority()} + if requestTag != "" { + ro.RequestTag = requestTag + } + if transactionTag != "" { + ro.TransactionTag = transactionTag } return ro } @@ -396,7 +408,7 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti Params: params, ParamTypes: paramTypes, QueryOptions: options.Options, - RequestOptions: createRequestOptions(&options), + RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), } return req, sh, nil } @@ -902,9 +914,13 @@ func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statemen // affected rows for the given query at the same index. If an error occurs, // counts will be returned up to the query that encountered the error. // -// The priority given in the QueryOptions will be included with the RPC. -// Any other options that are set in the QueryOptions struct will be ignored. +// The request tag and priority given in the QueryOptions are included with the +// RPC. Any other options that are set in the QueryOptions struct are ignored. func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { + return t.batchUpdateWithOptions(ctx, stmts, t.qo.merge(opts)) +} + +func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") defer func() { trace.EndSpan(ctx, err) }() @@ -937,7 +953,7 @@ func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts Transaction: ts, Statements: sppbStmts, Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - RequestOptions: createRequestOptions(&opts), + RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag), }) if err != nil { return nil, ToSpannerError(err) @@ -1059,7 +1075,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions Transaction: &sppb.CommitRequest_TransactionId{ TransactionId: t.tx, }, - RequestOptions: createRequestOptions(&t.txOpts), + RequestOptions: createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag), Mutations: mPb, ReturnCommitStats: options.ReturnCommitStats, }) @@ -1240,14 +1256,13 @@ type writeOnlyTransaction struct { // sp is the session pool which writeOnlyTransaction uses to get Cloud // Spanner sessions for blind writes. sp *sessionPool + // transactionTag is the tag that will be included with the CommitRequest + // of the write-only transaction. + transactionTag string // commitPriority is the RPC priority to use for the commit operation. commitPriority sppb.RequestOptions_Priority } -func (t *writeOnlyTransaction) requestPriority() sppb.RequestOptions_Priority { - return t.commitPriority -} - // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, // unless one of the following happens: // @@ -1288,7 +1303,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta }, }, Mutations: mPb, - RequestOptions: createRequestOptions(t), + RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), }) if err != nil && !isAbortedErr(err) { if isSessionNotFoundError(err) {