Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): add opencensus tracing/stats support #2804

Merged
merged 9 commits into from Sep 18, 2020
30 changes: 30 additions & 0 deletions datastore/client.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"cloud.google.com/go/internal"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/datastore/v1"
Expand Down Expand Up @@ -50,6 +51,9 @@ func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.Data
}

func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Lookup")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name should be cloud.google.com/go/datastore.datastoreClient.Lookup because it's a method on the client. Similar for other methods in this file. @codyoss do you agree?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated all method names with regard of it

defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Lookup(ctx, in, opts...)
return err
Expand All @@ -58,6 +62,9 @@ func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opt
}

func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.RunQuery")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.RunQuery(ctx, in, opts...)
return err
Expand All @@ -66,6 +73,9 @@ func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest,
}

func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.BeginTransaction")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.BeginTransaction(ctx, in, opts...)
return err
Expand All @@ -74,6 +84,9 @@ func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTra
}

func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Commit")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Commit(ctx, in, opts...)
return err
Expand All @@ -82,6 +95,9 @@ func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opt
}

func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Rollback")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Rollback(ctx, in, opts...)
return err
Expand All @@ -90,13 +106,27 @@ func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest,
}

func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.AllocateIds")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.AllocateIds(ctx, in, opts...)
return err
})
return res, err
}

func (dc *datastoreClient) ReserveIds(ctx context.Context, in *pb.ReserveIdsRequest, opts ...grpc.CallOption) (res *pb.ReserveIdsResponse, err error) {
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.ReserveIds")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.ReserveIds(ctx, in, opts...)
return err
})
return res, err
}

func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error {
ctx = metadata.NewOutgoingContext(ctx, dc.md)
return internal.Retry(ctx, gax.Backoff{Initial: 100 * time.Millisecond}, func() (stop bool, err error) {
Expand Down
17 changes: 11 additions & 6 deletions datastore/transaction.go
Expand Up @@ -110,17 +110,22 @@ func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption)
return c.newTransaction(ctx, newTransactionSettings(opts))
}

func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (*Transaction, error) {
func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) {
tritone marked this conversation as resolved.
Show resolved Hide resolved
req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
if s.readOnly {
req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
}
} else if s.prevID != nil {
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
PreviousTransaction: s.prevID,
}},
} else {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction")
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
defer func() { trace.EndSpan(ctx, err) }()

if s.prevID != nil {
req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
PreviousTransaction: s.prevID,
}},
}
}
}
resp, err := c.client.BeginTransaction(ctx, req)
Expand Down