Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): add opencensus tracing/stats support #2804

Merged
merged 9 commits into from Sep 18, 2020
19 changes: 19 additions & 0 deletions datastore/client.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"cloud.google.com/go/internal"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/datastore/v1"
Expand Down Expand Up @@ -50,6 +51,9 @@ func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.Data
}

func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Lookup")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Lookup(ctx, in, opts...)
return err
Expand All @@ -58,6 +62,9 @@ func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opt
}

func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.RunQuery")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.RunQuery(ctx, in, opts...)
return err
Expand All @@ -66,6 +73,9 @@ func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest,
}

func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.BeginTransaction")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.BeginTransaction(ctx, in, opts...)
return err
Expand All @@ -74,6 +84,9 @@ func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTra
}

func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Commit")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Commit(ctx, in, opts...)
return err
Expand All @@ -82,6 +95,9 @@ func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opt
}

func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Rollback")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Rollback(ctx, in, opts...)
return err
Expand All @@ -90,6 +106,9 @@ func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest,
}

func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.AllocateIds")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.AllocateIds(ctx, in, opts...)
return err
Expand Down
8 changes: 7 additions & 1 deletion datastore/transaction.go
Expand Up @@ -110,13 +110,19 @@ func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption)
return c.newTransaction(ctx, newTransactionSettings(opts))
}

func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (*Transaction, error) {
func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) {
tritone marked this conversation as resolved.
Show resolved Hide resolved
req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
if s.readOnly {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction")
defer func() { trace.EndSpan(ctx, err) }()

req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
}
} else if s.prevID != nil {
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction")
AlisskaPie marked this conversation as resolved.
Show resolved Hide resolved
defer func() { trace.EndSpan(ctx, err) }()

req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
PreviousTransaction: s.prevID,
Expand Down