Skip to content

Commit

Permalink
feat: add option to enable ApiTracer
Browse files Browse the repository at this point in the history
Adds an option to enable an ApiTracer for the client. An ApiTracer will
add traces for all RPCs that are being executed. The traces will only
be exported if an OpenTelemetry or OpenCensus trace exporter has been
configured.
An ApiTracer adds additional detail information about the time that a
single RPC took. It can also be used to determine whether an RPC was
retried or not.
  • Loading branch information
olavloite committed May 6, 2024
1 parent 5394139 commit 49e6ac3
Show file tree
Hide file tree
Showing 7 changed files with 568 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* {@link com.google.api.gax.tracing.ApiTracer} for use with OpenTelemetry. Based on {@link
* com.google.api.gax.tracing.OpencensusTracer}.
*/
public class OpenTelemetryApiTracer extends BaseApiTracer {
private final AttributeKey<Long> ATTEMPT_COUNT_KEY = AttributeKey.longKey("attempt.count");
private final AttributeKey<Long> TOTAL_REQUEST_COUNT_KEY =
AttributeKey.longKey("total_request_count");
private final AttributeKey<Long> TOTAL_RESPONSE_COUNT_KEY =
AttributeKey.longKey("total_response_count");
private final AttributeKey<String> EXCEPTION_MESSAGE_KEY =
AttributeKey.stringKey("exception.message");
private final AttributeKey<Long> ATTEMPT_NUMBER_KEY = AttributeKey.longKey("attempt.number");
private final AttributeKey<Long> ATTEMPT_REQUEST_COUNT_KEY =
AttributeKey.longKey("attempt.request_count");
private final AttributeKey<Long> ATTEMPT_RESPONSE_COUNT_KEY =
AttributeKey.longKey("attempt.response_count");
private final AttributeKey<String> CONNECTION_ID_KEY = AttributeKey.stringKey("connection");
private final AttributeKey<Long> RETRY_DELAY_KEY = AttributeKey.longKey("delay_ms");
private static final AttributeKey<Long> BATCH_SIZE_KEY = AttributeKey.longKey("batch.size");
private static final AttributeKey<Long> BATCH_COUNT_KEY = AttributeKey.longKey("batch.count");

private final Tracer tracer;
private final Span span;
private final OperationType operationType;

private volatile String lastConnectionId;
private volatile long currentAttemptId;
private AtomicLong attemptSentMessages = new AtomicLong(0);
private long attemptReceivedMessages = 0;
private AtomicLong totalSentMessages = new AtomicLong(0);
private long totalReceivedMessages = 0;

OpenTelemetryApiTracer(
@Nonnull Tracer tracer, @Nonnull Span span, @Nonnull OperationType operationType) {
this.tracer = Preconditions.checkNotNull(tracer);
this.span = Preconditions.checkNotNull(span);
this.operationType = Preconditions.checkNotNull(operationType);
}

Span getSpan() {
return this.span;
}

@Override
public Scope inScope() {
final io.opentelemetry.context.Scope scope = span.makeCurrent();
return scope::close;
}

@Override
public void operationSucceeded() {
span.setAllAttributes(baseOperationAttributes());
span.end();
}

@Override
public void operationCancelled() {
span.setAllAttributes(baseOperationAttributes());
span.setStatus(StatusCode.ERROR, "Cancelled by caller");
}

@Override
public void operationFailed(Throwable error) {
span.setAllAttributes(baseOperationAttributes());
span.setStatus(StatusCode.ERROR, error.getMessage());
}

@Override
public void lroStartFailed(Throwable error) {
span.addEvent(
"Operation failed to start", Attributes.of(EXCEPTION_MESSAGE_KEY, error.getMessage()));
}

@Override
public void lroStartSucceeded() {
span.addEvent("Operation started");
}

@Override
public void connectionSelected(String id) {
lastConnectionId = id;
}

@Override
public void attemptStarted(int attemptNumber) {
currentAttemptId = attemptNumber;
attemptSentMessages.set(0);
attemptReceivedMessages = 0;

// Attempts start counting a zero, so more than zero indicates a retry.
if (attemptNumber > 0 && this.span != null) {
// Add an event if the RPC retries, as this is otherwise transparent to the user. Retries
// would then show up as higher latency without any logical explanation.
span.addEvent("Starting RPC retry " + attemptNumber);
}
}

@Override
public void attemptStarted(Object request, int attemptNumber) {
attemptStarted(attemptNumber);
}

@Override
public void attemptSucceeded() {
Attributes attributes = baseAttemptAttributes();

// Same infrastructure is used for both polling and retries, so need to disambiguate it here.
if (operationType == OperationType.LongRunning) {
span.addEvent("Polling completed", attributes);
} else {
span.addEvent("Attempt succeeded", attributes);
}
}

@Override
public void attemptCancelled() {
Attributes attributes = baseAttemptAttributes();

// Same infrastructure is used for both polling and retries, so need to disambiguate it here.
if (operationType == OperationType.LongRunning) {
span.addEvent("Polling was cancelled", attributes);
} else {
span.addEvent("Attempt cancelled", attributes);
}
lastConnectionId = null;
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
AttributesBuilder builder = baseAttemptAttributesBuilder();
builder.put(RETRY_DELAY_KEY, delay.toMillis());
builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage());
Attributes attributes = builder.build();

// Same infrastructure is used for both polling and retries, so need to disambiguate it here.
if (operationType == OperationType.LongRunning) {
// The poll RPC was successful, but it indicated that the operation is still running.
span.addEvent("Scheduling next poll", attributes);
} else {
span.addEvent("Attempt failed, scheduling next attempt", attributes);
}
lastConnectionId = null;
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
AttributesBuilder builder = baseAttemptAttributesBuilder();
builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage());
Attributes attributes = builder.build();

// Same infrastructure is used for both polling and retries, so need to disambiguate it here.
if (operationType == OperationType.LongRunning) {
span.addEvent("Polling attempts exhausted", attributes);
} else {
span.addEvent("Attempts exhausted", attributes);
}
lastConnectionId = null;
}

@Override
public void attemptPermanentFailure(Throwable error) {
AttributesBuilder builder = baseAttemptAttributesBuilder();
builder.put(EXCEPTION_MESSAGE_KEY, error.getMessage());
Attributes attributes = builder.build();

// Same infrastructure is used for both polling and retries, so need to disambiguate it here.
if (operationType == OperationType.LongRunning) {
span.addEvent("Polling failed", attributes);
} else {
span.addEvent("Attempt failed, error not retryable", attributes);
}
lastConnectionId = null;
}

@Override
public void responseReceived() {
attemptReceivedMessages++;
totalReceivedMessages++;
}

@Override
public void requestSent() {
attemptSentMessages.incrementAndGet();
totalSentMessages.incrementAndGet();
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
span.setAllAttributes(
Attributes.of(BATCH_COUNT_KEY, elementCount, BATCH_SIZE_KEY, requestSize));
}

private Attributes baseOperationAttributes() {
AttributesBuilder builder = Attributes.builder();
builder.put(ATTEMPT_COUNT_KEY, currentAttemptId + 1);
long localTotalSentMessages = totalSentMessages.get();
if (localTotalSentMessages > 0) {
builder.put(TOTAL_REQUEST_COUNT_KEY, localTotalSentMessages);
}
if (totalReceivedMessages > 0) {
builder.put(TOTAL_RESPONSE_COUNT_KEY, totalReceivedMessages);
}
return builder.build();
}

private Attributes baseAttemptAttributes() {
return baseAttemptAttributesBuilder().build();
}

private AttributesBuilder baseAttemptAttributesBuilder() {
AttributesBuilder builder = Attributes.builder();
populateAttemptNumber(builder);

long localAttemptSentMessages = attemptSentMessages.get();
if (localAttemptSentMessages > 0) {
builder.put(ATTEMPT_REQUEST_COUNT_KEY, localAttemptSentMessages);
}
if (attemptReceivedMessages > 0) {
builder.put(ATTEMPT_RESPONSE_COUNT_KEY, attemptReceivedMessages);
}
String localLastConnectionId = lastConnectionId;
if (localLastConnectionId != null) {
builder.put(CONNECTION_ID_KEY, localLastConnectionId);
}

return builder;
}

private void populateAttemptNumber(AttributesBuilder builder) {
builder.put(ATTEMPT_NUMBER_KEY, currentAttemptId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import javax.annotation.Nonnull;

/** {@link ApiTracerFactory} that can be used with OpenTelemetry tracing. */
public class OpenTelemetryApiTracerFactory extends BaseApiTracerFactory {
@Nonnull private final Tracer internalTracer;
@Nonnull private final Attributes spanAttributes;

OpenTelemetryApiTracerFactory(
@Nonnull Tracer internalTracer, @Nonnull Attributes spanAttributes) {
this.internalTracer = Preconditions.checkNotNull(internalTracer);
this.spanAttributes = spanAttributes;
}

@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
// Default to the current in context span. This is used for outermost tracers that inherit
// the caller's parent span.
Span parentSpan = Span.current();

// If an outer callable started a span, use it as the parent.
if (parent instanceof OpenTelemetryApiTracer) {
parentSpan = ((OpenTelemetryApiTracer) parent).getSpan();
}

Span span =
internalTracer
.spanBuilder(spanName.toString())
.setParent(Context.current().with(parentSpan))
.setAllAttributes(spanAttributes)
.startSpan();

return new OpenTelemetryApiTracer(internalTracer, span, operationType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,13 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.build();
final ApiFuture<Transaction> requestFuture =
spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader);
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, getOptions(), routeToLeader);
}
requestFuture.addListener(
() -> {
try (IScope s = tracer.withSpan(span)) {
try (IScope ignore = tracer.withSpan(span)) {
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
Expand Down

0 comments on commit 49e6ac3

Please sign in to comment.