Skip to content

Commit

Permalink
feat(datastore): add opencensus tracing/stats support (#2804)
Browse files Browse the repository at this point in the history

Co-authored-by: Chris Cotter <cjcotter@google.com>
  • Loading branch information
AlisskaPie and tritone committed Sep 18, 2020
1 parent c19e141 commit 5e6c350
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
19 changes: 19 additions & 0 deletions datastore/client.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"cloud.google.com/go/internal"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/datastore/v1"
Expand Down Expand Up @@ -50,6 +51,9 @@ func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.Data
}

func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Lookup")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Lookup(ctx, in, opts...)
return err
Expand All @@ -58,6 +62,9 @@ func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opt
}

func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.RunQuery")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.RunQuery(ctx, in, opts...)
return err
Expand All @@ -66,6 +73,9 @@ func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest,
}

func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.BeginTransaction")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.BeginTransaction(ctx, in, opts...)
return err
Expand All @@ -74,6 +84,9 @@ func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTra
}

func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Commit")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Commit(ctx, in, opts...)
return err
Expand All @@ -82,6 +95,9 @@ func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opt
}

func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Rollback")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.Rollback(ctx, in, opts...)
return err
Expand All @@ -90,6 +106,9 @@ func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest,
}

func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.AllocateIds")
defer func() { trace.EndSpan(ctx, err) }()

err = dc.invoke(ctx, func(ctx context.Context) error {
res, err = dc.c.AllocateIds(ctx, in, opts...)
return err
Expand Down
8 changes: 7 additions & 1 deletion datastore/transaction.go
Expand Up @@ -110,13 +110,19 @@ func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption)
return c.newTransaction(ctx, newTransactionSettings(opts))
}

func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (*Transaction, error) {
func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ *Transaction, err error) {
req := &pb.BeginTransactionRequest{ProjectId: c.dataset}
if s.readOnly {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadOnlyTransaction")
defer func() { trace.EndSpan(ctx, err) }()

req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadOnly_{ReadOnly: &pb.TransactionOptions_ReadOnly{}},
}
} else if s.prevID != nil {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Transaction.ReadWriteTransaction")
defer func() { trace.EndSpan(ctx, err) }()

req.TransactionOptions = &pb.TransactionOptions{
Mode: &pb.TransactionOptions_ReadWrite_{ReadWrite: &pb.TransactionOptions_ReadWrite{
PreviousTransaction: s.prevID,
Expand Down

0 comments on commit 5e6c350

Please sign in to comment.