Skip to content

Commit

Permalink
Revert "revert(spanner): support request and transaction tags (#3233) (
Browse files Browse the repository at this point in the history
…#3989)" (#4336)

This reverts commit a18f1bc.
  • Loading branch information
olavloite committed Jun 29, 2021
1 parent 5451b30 commit f08c73a
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 36 deletions.
14 changes: 12 additions & 2 deletions spanner/client.go
Expand Up @@ -497,6 +497,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// transactionTag will be included with the CommitRequest.
transactionTag string
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
}
Expand All @@ -521,6 +523,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// TransactionTag returns an ApplyOption that will include the given tag as a
// transaction tag for a write-only transaction.
func TransactionTag(tag string) ApplyOption {
return func(ao *applyOption) {
ao.transactionTag = tag
}
}

// Priority returns an ApplyOptions that sets the RPC priority to use for the
// commit operation.
func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
Expand All @@ -542,10 +552,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
154 changes: 154 additions & 0 deletions spanner/client_test.go
Expand Up @@ -2530,6 +2530,141 @@ func TestClient_Apply_Priority(t *testing.T) {
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
}

func TestClient_ReadOnlyTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{RequestTag: "tag-1"},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{RequestTag: qo.RequestTag})
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

// Check for SQL requests inside the transaction to prevent the check to
// drain the commit request from the server.
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})
return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})

tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
}
}
}

func TestClient_PDML_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: qo.RequestTag})
}
}

func TestClient_Apply_Tagging(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}
Expand Down Expand Up @@ -2559,6 +2694,12 @@ func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerSer
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.RequestTag, ro.RequestTag; got != want {
t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.TransactionTag, ro.TransactionTag; got != want {
t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

Expand All @@ -2585,6 +2726,19 @@ func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServe
if got != want {
t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
}

var requestTag string
var transactionTag string
if commit.RequestOptions != nil {
requestTag = commit.RequestOptions.RequestTag
transactionTag = commit.RequestOptions.TransactionTag
}
if got, want := requestTag, ro.RequestTag; got != want {
t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := transactionTag, ro.TransactionTag; got != want {
t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}

func TestClient_Single_Read_WithNumericKey(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion spanner/pdml.go
Expand Up @@ -69,7 +69,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(&options),
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, ""),
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down
12 changes: 12 additions & 0 deletions spanner/pdml_test.go
Expand Up @@ -166,3 +166,15 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) {
})
}
}

func TestPartitionedUpdate_Tagging(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{RequestTag: "pdml-tag"})
if err != nil {
t.Fatalf("expect no errors, but got %v", err)
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
}

0 comments on commit f08c73a

Please sign in to comment.