Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add support for RPC priority #3341

Merged
merged 12 commits into from Mar 31, 2021
18 changes: 15 additions & 3 deletions spanner/client.go
Expand Up @@ -469,6 +469,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
} else {
t = &ReadWriteTransaction{}
}
t.options = options
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
Expand All @@ -491,6 +492,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
}

// An ApplyOption is an optional argument to Apply.
Expand All @@ -513,6 +516,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// Priority returns an ApplyOptions that sets the RPC priority to use for the
// commit operation.
func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
return func(ao *applyOption) {
ao.priority = priority
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -524,11 +535,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
defer func() { trace.EndSpan(ctx, err) }()

if !ao.atLeastOnce {
return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
})
}, TransactionOptions{CommitPriority: ao.priority})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{c.idleSessions}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
190 changes: 190 additions & 0 deletions spanner/client_test.go
Expand Up @@ -2239,3 +2239,193 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) {
t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.InvalidArgument)
}
}

func TestClient_ReadOnlyTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority})
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})

return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_LOW},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})
tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_PDML_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority})
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}

for _, req := range reqs {
if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
reqOptions = append(reqOptions, sqlReq.RequestOptions)
}
if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok {
reqOptions = append(reqOptions, batchReq.RequestOptions)
}
if readReq, ok := req.(*sppb.ReadRequest); ok {
reqOptions = append(reqOptions, readReq.RequestOptions)
}
}

if got, want := len(reqOptions), reqCount; got != want {
t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want)
}

for _, opts := range reqOptions {
var got sppb.RequestOptions_Priority
if opts != nil {
got = opts.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
var commit *sppb.CommitRequest
var ok bool

for _, req := range reqs {
if commit, ok = req.(*sppb.CommitRequest); ok {
break
}
}

if commit == nil {
t.Fatalf("Missing commit request")
}

var got sppb.RequestOptions_Priority
if commit.RequestOptions != nil {
got = commit.RequestOptions.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
}
}
15 changes: 10 additions & 5 deletions spanner/pdml.go
Expand Up @@ -56,6 +56,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
if sh != nil {
defer sh.recycle()
}
var ro *sppb.RequestOptions
if options.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
ro = &sppb.RequestOptions{Priority: options.Priority}
}

// Create the parameters and the SQL request, but without a transaction.
// The transaction reference will be added by the executePdml method.
Expand All @@ -64,11 +68,12 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
return 0, ToSpannerError(err)
}
req := &sppb.ExecuteSqlRequest{
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: ro,
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down