Skip to content

Commit

Permalink
feat(firestore): add opencensus tracing support (#2942)
Browse files Browse the repository at this point in the history
* feat(firestore): add opencensus tracing support

* add BeginTransaction span, rollback span delete

* use lowercase name for commit

Co-authored-by: Christopher Wilcox <crwilcox@google.com>
Co-authored-by: Chris Cotter <cjcotter@google.com>
  • Loading branch information
3 people committed Feb 4, 2021
1 parent b7438c2 commit 257f322
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 6 deletions.
13 changes: 11 additions & 2 deletions firestore/client.go
Expand Up @@ -199,7 +199,10 @@ func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*Docum
return c.getAll(ctx, docRefs, nil)
}

func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) ([]*DocumentSnapshot, error) {
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) (_ []*DocumentSnapshot, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.BatchGetDocuments")
defer func() { trace.EndSpan(ctx, err) }()

var docNames []string
docIndices := map[string][]int{} // doc name to positions in docRefs
for i, dr := range docRefs {
Expand Down Expand Up @@ -267,6 +270,9 @@ func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte)

// Collections returns an iterator over the top-level collections.
func (c *Client) Collections(ctx context.Context) *CollectionIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.ListCollectionIds")
defer func() { trace.EndSpan(ctx, nil) }()

it := &CollectionIterator{
client: c,
it: c.c.ListCollectionIds(
Expand All @@ -286,7 +292,10 @@ func (c *Client) Batch() *WriteBatch {
}

// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) ([]*WriteResult, error) {
func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit")
defer func() { trace.EndSpan(ctx, err) }()

req := &pb.CommitRequest{
Database: c.path(),
Writes: ws,
Expand Down
3 changes: 3 additions & 0 deletions firestore/docref.go
Expand Up @@ -677,6 +677,9 @@ func (d *DocumentRef) Update(ctx context.Context, updates []Update, preconds ...

// Collections returns an iterator over the immediate sub-collections of the document.
func (d *DocumentRef) Collections(ctx context.Context) *CollectionIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.DocumentRef.ListCollectionIds")
defer func() { trace.EndSpan(ctx, nil) }()

client := d.Parent.c
it := &CollectionIterator{
client: client,
Expand Down
4 changes: 4 additions & 0 deletions firestore/list_documents.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"

vkit "cloud.google.com/go/firestore/apiv1"
"cloud.google.com/go/internal/trace"
"google.golang.org/api/iterator"
pb "google.golang.org/genproto/googleapis/firestore/v1"
)
Expand All @@ -33,6 +34,9 @@ type DocumentRefIterator struct {
}

func newDocumentRefIterator(ctx context.Context, cr *CollectionRef, tid []byte) *DocumentRefIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.ListDocuments")
defer func() { trace.EndSpan(ctx, nil) }()

client := cr.c
req := &pb.ListDocumentsRequest{
Parent: cr.parentPath,
Expand Down
7 changes: 5 additions & 2 deletions firestore/query.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"cloud.google.com/go/internal/btree"
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/api/iterator"
pb "google.golang.org/genproto/googleapis/firestore/v1"
Expand Down Expand Up @@ -699,7 +700,10 @@ func newQueryDocumentIterator(ctx context.Context, q *Query, tid []byte) *queryD
}
}

func (it *queryDocumentIterator) next() (*DocumentSnapshot, error) {
func (it *queryDocumentIterator) next() (_ *DocumentSnapshot, err error) {
it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.Query.RunQuery")
defer func() { trace.EndSpan(it.ctx, err) }()

client := it.q.c
if it.streamClient == nil {
sq, err := it.q.toProto()
Expand All @@ -719,7 +723,6 @@ func (it *queryDocumentIterator) next() (*DocumentSnapshot, error) {
}
}
var res *pb.RunQueryResponse
var err error
for {
res, err = it.streamClient.Recv()
if err == io.EOF {
Expand Down
5 changes: 5 additions & 0 deletions firestore/transaction.go
Expand Up @@ -116,11 +116,13 @@ func (c *Client) RunTransaction(ctx context.Context, f func(context.Context, *Tr
// TODO(jba): get backoff time from gRPC trailer metadata? See
// extractRetryDelay in https://code.googlesource.com/gocloud/+/master/spanner/retry.go.
for i := 0; i < t.maxAttempts; i++ {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/firestore.Client.BeginTransaction")
var res *pb.BeginTransactionResponse
res, err = t.c.c.BeginTransaction(t.ctx, &pb.BeginTransactionRequest{
Database: db,
Options: txOpts,
})
trace.EndSpan(t.ctx, err)
if err != nil {
return err
}
Expand All @@ -136,11 +138,14 @@ func (c *Client) RunTransaction(ctx context.Context, f func(context.Context, *Tr
// Prefer f's returned error to rollback error.
return err
}
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/firestore.Client.Commit")
_, err = t.c.c.Commit(t.ctx, &pb.CommitRequest{
Database: t.c.path(),
Writes: t.writes,
Transaction: t.id,
})
trace.EndSpan(t.ctx, err)

// If a read-write transaction returns Aborted, retry.
// On success or other failures, return here.
if t.readOnly || status.Code(err) != codes.Aborted {
Expand Down
8 changes: 6 additions & 2 deletions firestore/watch.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"cloud.google.com/go/internal/btree"
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/firestore/v1"
Expand Down Expand Up @@ -492,9 +493,12 @@ func (s *watchStream) recv() (*pb.ListenResponse, error) {
}
}

func (s *watchStream) open() (pb.Firestore_ListenClient, error) {
func (s *watchStream) open() (lc pb.Firestore_ListenClient, err error) {
s.ctx = trace.StartSpan(s.ctx, "cloud.google.com/go/firestore.watchStream.Listen")
defer func() { trace.EndSpan(s.ctx, err) }()

dbPath := s.c.path()
lc, err := s.c.c.Listen(withResourceHeader(s.ctx, dbPath))
lc, err = s.c.c.Listen(withResourceHeader(s.ctx, dbPath))
if err == nil {
err = lc.Send(&pb.ListenRequest{
Database: dbPath,
Expand Down

0 comments on commit 257f322

Please sign in to comment.