Skip to content

Commit

Permalink
feat: add opencensus tracing support (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
athakor committed Sep 9, 2020
1 parent acae383 commit edaa539
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 16 deletions.
Expand Up @@ -22,6 +22,9 @@
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -62,13 +65,18 @@ public void getPartitions(
request.setPartitionCount(desiredPartitionCount - 1);

final FirestoreClient.PartitionQueryPagedResponse response;
try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

@Nullable Object[] lastCursor = null;
Expand Down
Expand Up @@ -30,6 +30,9 @@
import com.google.firestore.v1.Document;
import com.google.firestore.v1.DocumentMask;
import com.google.firestore.v1.ListDocumentsRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -135,16 +138,20 @@ public Iterable<DocumentReference> listDocuments() {
request.setShowMissing(true);

final ListDocumentsPagedResponse response;

try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_LISTDOCUMENTS);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
FirestoreRpc client = rpcContext.getClient();
UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> callable =
client.listDocumentsPagedCallable();
ListDocumentsRequest build = request.build();
ApiFuture<ListDocumentsPagedResponse> future = rpcContext.sendRequest(build, callable);
response = ApiExceptions.callAndTranslateApiException(future);
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

return new Iterable<DocumentReference>() {
Expand Down
Expand Up @@ -25,6 +25,9 @@
import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.ListCollectionIdsRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -381,14 +384,18 @@ public Iterable<CollectionReference> listCollections() {
ListCollectionIdsRequest.Builder request = ListCollectionIdsRequest.newBuilder();
request.setParent(path.toString());
final ListCollectionIdsPagedResponse response;

try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_LISTCOLLECTIONIDS);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().listCollectionIdsPagedCallable()));
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

return new Iterable<CollectionReference>() {
Expand Down
Expand Up @@ -168,9 +168,14 @@ public void onNext(BatchGetDocumentsResponse response) {

numResponses++;
if (numResponses == 1) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: First response");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": First response");
} else if (numResponses % 100 == 0) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Received 100 responses");
tracer
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Received 100 responses");
}

switch (response.getResultCase()) {
Expand Down Expand Up @@ -199,13 +204,17 @@ public void onNext(BatchGetDocumentsResponse response) {

@Override
public void onError(Throwable throwable) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Error");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Error");
apiStreamObserver.onError(throwable);
}

@Override
public void onCompleted() {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Complete");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Complete");
apiStreamObserver.onCompleted();
}
};
Expand All @@ -228,7 +237,7 @@ public void onCompleted() {
tracer
.getCurrentSpan()
.addAnnotation(
"Firestore.BatchGet: Start",
TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Start",
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(documentReferences.length)));

Expand Down
Expand Up @@ -1417,7 +1417,7 @@ private void internalStream(
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"Firestore.Query: Start",
TraceUtil.SPAN_NAME_RUNQUERY + ": Start",
ImmutableMap.of(
"transactional", AttributeValue.booleanAttributeValue(transactionId != null)));

Expand Down
Expand Up @@ -17,11 +17,71 @@
package com.google.cloud.firestore;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.firestore.spi.v1.GrpcFirestoreRpc;
import io.opencensus.contrib.grpc.util.StatusConverter;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

/** Census tracing utilities. */
/**
* Helper class for tracing utility. It is used for instrumenting {@link GrpcFirestoreRpc} with
* OpenCensus APIs.
*
* <p>TraceUtil instances are created by the {@link TraceUtil#getInstance()} method.
*/
final class TraceUtil {

private final Tracer tracer = Tracing.getTracer();
private static final TraceUtil traceUtil = new TraceUtil();
static final String SPAN_NAME_GETDOCUMENT = "CloudFirestoreOperation.GetDocument";
static final String SPAN_NAME_CREATEDOCUMENT = "CloudFirestoreOperation.CreateDocument";
static final String SPAN_NAME_UPDATEDOCUMENT = "CloudFirestoreOperation.UpdateDocument";
static final String SPAN_NAME_DELETEDOCUMENT = "CloudFirestoreOperation.DeleteDocument";
static final String SPAN_NAME_LISTCOLLECTIONIDS = "CloudFirestoreOperation.ListCollectionIds";
static final String SPAN_NAME_LISTDOCUMENTS = "CloudFirestoreOperation.ListDocuments";
static final String SPAN_NAME_BEGINTRANSACTION = "CloudFirestoreOperation.BeginTransaction";
static final String SPAN_NAME_COMMIT = "CloudFirestoreOperation.Commit";
static final String SPAN_NAME_ROLLBACK = "CloudFirestoreOperation.Rollback";
static final String SPAN_NAME_RUNQUERY = "CloudFirestoreOperation.RunQuery";
static final String SPAN_NAME_PARTITIONQUERY = "CloudFirestoreOperation.partitionQuery";
static final String SPAN_NAME_LISTEN = "CloudFirestoreOperation.Listen";
static final String SPAN_NAME_BATCHGETDOCUMENTS = "CloudFirestoreOperation.BatchGetDocuments";
static final String SPAN_NAME_BATCHWRITE = "CloudFirestoreOperation.BatchWrite";
static final String SPAN_NAME_WRITE = "CloudFirestoreOperation.Write";

static final EndSpanOptions END_SPAN_OPTIONS =
EndSpanOptions.builder().setSampleToLocalSpanStore(true).build();

/**
* Starts a new span.
*
* @param spanName The name of the returned Span.
* @return The newly created {@link Span}.
*/
protected Span startSpan(String spanName) {
return tracer.spanBuilder(spanName).startSpan();
}

/**
* Return the global {@link Tracer}.
*
* @return The global {@link Tracer}.
*/
public Tracer getTracer() {
return tracer;
}

/**
* Return TraceUtil Object.
*
* @return An instance of {@link TraceUtil}
*/
public static TraceUtil getInstance() {
return traceUtil;
}

private TraceUtil() {}

public static Status statusFromApiException(ApiException exception) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.firestore.v1.TransactionOptions.ReadOnly;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.opencensus.trace.Tracing;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -83,6 +84,7 @@ Transaction wrapResult(ApiFuture<WriteResult> result) {

/** Starts a transaction and obtains the transaction id. */
ApiFuture<Void> begin() {
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_BEGINTRANSACTION);
BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder();
beginTransaction.setDatabase(firestore.getDatabaseName());

Expand Down Expand Up @@ -123,6 +125,7 @@ ApiFuture<List<WriteResult>> commit() {

/** Rolls a transaction back and releases all read locks. */
ApiFuture<Void> rollback() {
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_ROLLBACK);
RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder();
reqBuilder.setTransaction(transactionId);
reqBuilder.setDatabase(firestore.getDatabaseName());
Expand Down Expand Up @@ -150,7 +153,7 @@ public Void apply(Empty beginTransactionResponse) {
@Nonnull
public ApiFuture<DocumentSnapshot> get(@Nonnull DocumentReference documentRef) {
Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_GETDOCUMENT);
return ApiFutures.transform(
firestore.getAll(new DocumentReference[] {documentRef}, /*fieldMask=*/ null, transactionId),
new ApiFunction<List<DocumentSnapshot>, DocumentSnapshot>() {
Expand Down
Expand Up @@ -166,6 +166,7 @@ public T create(
private T performCreate(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
firestore, documentReference, fields, UserDataConverter.NO_DELETES);
Expand Down Expand Up @@ -548,7 +549,7 @@ private T performUpdate(
@Nonnull Precondition precondition) {
verifyNotCommitted();
Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty.");

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT);
Map<String, Object> deconstructedMap = expandObject(fields);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
Expand Down Expand Up @@ -611,6 +612,7 @@ public T delete(@Nonnull DocumentReference documentReference) {
private T performDelete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT);
Write.Builder write = Write.newBuilder().setDelete(documentReference.getName());

if (!precondition.isEmpty()) {
Expand All @@ -626,7 +628,7 @@ ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"CloudFirestore.Commit",
TraceUtil.SPAN_NAME_COMMIT,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final CommitRequest.Builder request = CommitRequest.newBuilder();
Expand Down Expand Up @@ -674,7 +676,7 @@ ApiFuture<List<BatchWriteResult>> bulkCommit() {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"CloudFirestore.BatchWrite",
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -395,6 +396,7 @@ public void run() {
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
Expand Down

0 comments on commit edaa539

Please sign in to comment.