Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add opencensus tracing support #360

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,9 @@
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -62,13 +65,18 @@ public void getPartitions(
request.setPartitionCount(desiredPartitionCount - 1);

final FirestoreClient.PartitionQueryPagedResponse response;
try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

@Nullable Object[] lastCursor = null;
Expand Down
Expand Up @@ -30,6 +30,9 @@
import com.google.firestore.v1.Document;
import com.google.firestore.v1.DocumentMask;
import com.google.firestore.v1.ListDocumentsRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -135,16 +138,20 @@ public Iterable<DocumentReference> listDocuments() {
request.setShowMissing(true);

final ListDocumentsPagedResponse response;

try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_LISTDOCUMENTS);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
FirestoreRpc client = rpcContext.getClient();
UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> callable =
client.listDocumentsPagedCallable();
ListDocumentsRequest build = request.build();
ApiFuture<ListDocumentsPagedResponse> future = rpcContext.sendRequest(build, callable);
response = ApiExceptions.callAndTranslateApiException(future);
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

return new Iterable<DocumentReference>() {
Expand Down
Expand Up @@ -25,6 +25,9 @@
import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.ListCollectionIdsRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -381,14 +384,18 @@ public Iterable<CollectionReference> listCollections() {
ListCollectionIdsRequest.Builder request = ListCollectionIdsRequest.newBuilder();
request.setParent(path.toString());
final ListCollectionIdsPagedResponse response;

try {
final TraceUtil traceUtil = TraceUtil.getInstance();
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_LISTCOLLECTIONIDS);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().listCollectionIdsPagedCallable()));
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.apiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}

return new Iterable<CollectionReference>() {
Expand Down
Expand Up @@ -168,9 +168,14 @@ public void onNext(BatchGetDocumentsResponse response) {

numResponses++;
if (numResponses == 1) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: First response");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": First response");
} else if (numResponses % 100 == 0) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Received 100 responses");
tracer
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Received 100 responses");
}

switch (response.getResultCase()) {
Expand Down Expand Up @@ -199,13 +204,17 @@ public void onNext(BatchGetDocumentsResponse response) {

@Override
public void onError(Throwable throwable) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Error");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Error");
apiStreamObserver.onError(throwable);
}

@Override
public void onCompleted() {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Complete");
tracer
.getCurrentSpan()
.addAnnotation(TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Complete");
apiStreamObserver.onCompleted();
}
};
Expand All @@ -228,7 +237,7 @@ public void onCompleted() {
tracer
.getCurrentSpan()
.addAnnotation(
"Firestore.BatchGet: Start",
TraceUtil.SPAN_NAME_BATCHGETDOCUMENTS + ": Start",
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(documentReferences.length)));

Expand Down
Expand Up @@ -1331,7 +1331,7 @@ private void internalStream(
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"Firestore.Query: Start",
TraceUtil.SPAN_NAME_RUNQUERY + ": Start",
ImmutableMap.of(
"transactional", AttributeValue.booleanAttributeValue(transactionId != null)));

Expand Down
Expand Up @@ -17,11 +17,71 @@
package com.google.cloud.firestore;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.firestore.spi.v1.GrpcFirestoreRpc;
import io.opencensus.contrib.grpc.util.StatusConverter;
import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

/** Census tracing utilities. */
/**
* Helper class for tracing utility. It is used for instrumenting {@link GrpcFirestoreRpc} with
* OpenCensus APIs.
*
* <p>TraceUtil instances are created by the {@link TraceUtil#getInstance()} method.
*/
final class TraceUtil {

private final Tracer tracer = Tracing.getTracer();
private static final TraceUtil traceUtil = new TraceUtil();
static final String SPAN_NAME_GETDOCUMENT = "CloudFirestoreOperation.GetDocument";
static final String SPAN_NAME_CREATEDOCUMENT = "CloudFirestoreOperation.CreateDocument";
static final String SPAN_NAME_UPDATEDOCUMENT = "CloudFirestoreOperation.UpdateDocument";
static final String SPAN_NAME_DELETEDOCUMENT = "CloudFirestoreOperation.DeleteDocument";
static final String SPAN_NAME_LISTCOLLECTIONIDS = "CloudFirestoreOperation.ListCollectionIds";
static final String SPAN_NAME_LISTDOCUMENTS = "CloudFirestoreOperation.ListDocuments";
static final String SPAN_NAME_BEGINTRANSACTION = "CloudFirestoreOperation.BeginTransaction";
static final String SPAN_NAME_COMMIT = "CloudFirestoreOperation.Commit";
static final String SPAN_NAME_ROLLBACK = "CloudFirestoreOperation.Rollback";
static final String SPAN_NAME_RUNQUERY = "CloudFirestoreOperation.RunQuery";
static final String SPAN_NAME_PARTITIONQUERY = "CloudFirestoreOperation.partitionQuery";
static final String SPAN_NAME_LISTEN = "CloudFirestoreOperation.Listen";
static final String SPAN_NAME_BATCHGETDOCUMENTS = "CloudFirestoreOperation.BatchGetDocuments";
static final String SPAN_NAME_BATCHWRITE = "CloudFirestoreOperation.BatchWrite";
static final String SPAN_NAME_WRITE = "CloudFirestoreOperation.Write";

static final EndSpanOptions END_SPAN_OPTIONS =
EndSpanOptions.builder().setSampleToLocalSpanStore(true).build();

/**
* Starts a new span.
*
* @param spanName The name of the returned Span.
* @return The newly created {@link Span}.
*/
protected Span startSpan(String spanName) {
return tracer.spanBuilder(spanName).startSpan();
}

/**
* Return the global {@link Tracer}.
*
* @return The global {@link Tracer}.
*/
public Tracer getTracer() {
return tracer;
}

/**
* Return TraceUtil Object.
*
* @return An instance of {@link TraceUtil}
*/
public static TraceUtil getInstance() {
return traceUtil;
}

private TraceUtil() {}

public static Status statusFromApiException(ApiException exception) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.firestore.v1.TransactionOptions.ReadOnly;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.opencensus.trace.Tracing;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -83,6 +84,7 @@ Transaction wrapResult(ApiFuture<WriteResult> result) {

/** Starts a transaction and obtains the transaction id. */
ApiFuture<Void> begin() {
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_BEGINTRANSACTION);
BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder();
beginTransaction.setDatabase(firestore.getDatabaseName());

Expand Down Expand Up @@ -123,6 +125,7 @@ ApiFuture<List<WriteResult>> commit() {

/** Rolls a transaction back and releases all read locks. */
ApiFuture<Void> rollback() {
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_ROLLBACK);
RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder();
reqBuilder.setTransaction(transactionId);
reqBuilder.setDatabase(firestore.getDatabaseName());
Expand Down Expand Up @@ -150,7 +153,7 @@ public Void apply(Empty beginTransactionResponse) {
@Nonnull
public ApiFuture<DocumentSnapshot> get(@Nonnull DocumentReference documentRef) {
Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_GETDOCUMENT);
return ApiFutures.transform(
firestore.getAll(new DocumentReference[] {documentRef}, /*fieldMask=*/ null, transactionId),
new ApiFunction<List<DocumentSnapshot>, DocumentSnapshot>() {
Expand Down
Expand Up @@ -166,6 +166,7 @@ public T create(
private T performCreate(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
firestore, documentReference, fields, UserDataConverter.NO_DELETES);
Expand Down Expand Up @@ -548,7 +549,7 @@ private T performUpdate(
@Nonnull Precondition precondition) {
verifyNotCommitted();
Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty.");

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT);
Map<String, Object> deconstructedMap = expandObject(fields);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
Expand Down Expand Up @@ -611,6 +612,7 @@ public T delete(@Nonnull DocumentReference documentReference) {
private T performDelete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT);
Write.Builder write = Write.newBuilder().setDelete(documentReference.getName());

if (!precondition.isEmpty()) {
Expand All @@ -626,7 +628,7 @@ ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"CloudFirestore.Commit",
TraceUtil.SPAN_NAME_COMMIT,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final CommitRequest.Builder request = CommitRequest.newBuilder();
Expand Down Expand Up @@ -674,7 +676,7 @@ ApiFuture<List<BatchWriteResult>> bulkCommit() {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
"CloudFirestore.BatchWrite",
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -395,6 +396,7 @@ public void run() {
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
Expand Down