Skip to content

Commit

Permalink
ServiceBus: fix session tracing (#39962)
Browse files Browse the repository at this point in the history
* remove additional matrix

* Fix session processing and disposition instrumentation

* return matrix config

* review suggestions
  • Loading branch information
lmolkova committed May 3, 2024
1 parent 71b1905 commit 28a2b5a
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ protected void hookOnNext(ServiceBusMessageContext message) {
return;
}

Context span = instrumentation.startProcessInstrumentation("ServiceBus.process", message.getMessage(), Context.NONE);
Context span = instrumentation.startProcessInstrumentation("ServiceBus.process",
message.getMessage().getApplicationProperties(),
message.getMessage().getEnqueuedTime(),
Context.NONE);
message.getMessage().setContext(span);
AutoCloseable scope = tracer.makeSpanCurrent(span);
try {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,10 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
this.linkProcessor = null;
this.messageSerializer = messageSerializer;

final boolean useFluxTrace = instrumentation.isEnabled() && instrumentation.isAsyncReceiverInstrumentation();
if (useFluxTrace) {
// This ServiceBusAsyncConsumer is backing ServiceBusReceiverAsyncClient instance (client has instrumentation is enabled).
final Flux<ServiceBusReceivedMessage> deserialize = messageFlux
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
this.processor = new FluxTraceV2(deserialize, instrumentation);
} else {
// This ServiceBusAsyncConsumer is backing either
// 1. a ServiceBusReceiverAsyncClient instance (client has no instrumentation enabled)
// 2. Or a ServiceBusProcessorClient instance (processor client internally deal with instrumentation).
this.processor = messageFlux
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
}
// This ServiceBusAsyncConsumer is backing ServiceBusReceiverAsyncClient instance (client has instrumentation is enabled).
final Flux<ServiceBusReceivedMessage> deserialize = messageFlux
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
this.processor = TracingFluxOperator.create(deserialize, instrumentation);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private Mono<ServiceBusReceiverAsyncClient> acquireSpecificOrNextSession(String
session, null, receiverOptions.getMaxLockRenewDuration());

final ServiceBusSingleSessionManager sessionManager = new ServiceBusSingleSessionManager(LOGGER, identifier,
sessionReceiver, receiverOptions.getPrefetchCount(), messageSerializer, connectionCacheWrapper.getRetryOptions());
sessionReceiver, receiverOptions.getPrefetchCount(), messageSerializer, connectionCacheWrapper.getRetryOptions(), instrumentation);

final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(),
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -34,19 +36,23 @@ final class ServiceBusSingleSessionManager implements IServiceBusSessionManager
private final MessageSerializer serializer;
private final Duration operationTimeout;
private final ServiceBusSessionReactorReceiver sessionReceiver;
private final MessageFlux messageFlux;
private final Flux<Message> messageFlux;
private final ServiceBusReceiverInstrumentation instrumentation;

ServiceBusSingleSessionManager(ClientLogger logger, String identifier,
ServiceBusSessionReactorReceiver sessionReceiver, int prefetch,
MessageSerializer serializer, AmqpRetryOptions retryOptions) {
MessageSerializer serializer, AmqpRetryOptions retryOptions, ServiceBusReceiverInstrumentation instrumentation) {
this.logger = Objects.requireNonNull(logger, "logger cannot be null.");
this.identifier = identifier;
this.sessionReceiver = Objects.requireNonNull(sessionReceiver, "sessionReceiver cannot be null.");
this.serializer = Objects.requireNonNull(serializer, "serializer cannot be null.");
Objects.requireNonNull(retryOptions, "retryOptions cannot be null.");
this.operationTimeout = retryOptions.getTryTimeout();
final Flux<ServiceBusSessionReactorReceiver> messageFluxUpstream = new SessionReceiverStream(sessionReceiver).flux();
this.messageFlux = new MessageFlux(messageFluxUpstream, prefetch, CreditFlowMode.RequestDriven, NULL_RETRY_POLICY);
this.instrumentation = Objects.requireNonNull(instrumentation, "instrumentation cannot be null");
MessageFlux messageFluxLocal = new MessageFlux(messageFluxUpstream, prefetch, CreditFlowMode.RequestDriven,
NULL_RETRY_POLICY);
this.messageFlux = TracingFluxOperator.create(messageFluxLocal, instrumentation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ final class SessionsMessagePump {
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null.");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null.");
this.onTerminate = Objects.requireNonNull(onTerminate, "'onTerminate' cannot be null.");
this.receiversTracker = new SessionReceiversTracker(logger, maxConcurrentSessions, fullyQualifiedNamespace, entityPath, receiveMode);
this.receiversTracker = new SessionReceiversTracker(logger, maxConcurrentSessions, fullyQualifiedNamespace, entityPath, receiveMode, instrumentation);
this.nextSession = new NextSession(pumpId, fullyQualifiedNamespace, entityPath, sessionAcquirer).mono();
}

Expand Down Expand Up @@ -645,14 +645,16 @@ static final class SessionReceiversTracker {
private final String entityPath;
private final ServiceBusReceiveMode receiveMode;
private final ConcurrentHashMap<String, ServiceBusSessionReactorReceiver> receivers;
private final ServiceBusReceiverInstrumentation instrumentation;

private SessionReceiversTracker(ClientLogger logger, int size, String fullyQualifiedNamespace, String entityPath,
ServiceBusReceiveMode receiveMode) {
ServiceBusReceiveMode receiveMode, ServiceBusReceiverInstrumentation instrumentation) {
this.logger = logger;
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
this.entityPath = entityPath;
this.receiveMode = receiveMode;
this.receivers = new ConcurrentHashMap<>(size);
this.instrumentation = instrumentation;
}

private void track(ServiceBusSessionReactorReceiver receiver) {
Expand Down Expand Up @@ -749,11 +751,14 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
final ServiceBusSessionReactorReceiver receiver = receivers.get(sessionId);
final DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, deadLetterReason,
deadLetterDescription, propertiesToModify, transactionContext);

Mono<Void> updateDispositionMono;
if (receiver != null) {
return receiver.updateDisposition(message.getLockToken(), deliveryState);
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState);
} else {
return Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));
updateDispositionMono = Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));
}
return instrumentation.instrumentSettlement(updateDispositionMono, message, message.getContext(), dispositionStatus);
}

private Mono<Void> checkNull(Object options, ServiceBusTransactionContext transactionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;

import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* Flux operator that traces receive and process calls
*/
final class TracingFluxOperator<T> extends BaseSubscriber<T> {

public static <T> Flux<T> create(Flux<T> upstream, ServiceBusReceiverInstrumentation instrumentation) {
if (!instrumentation.isEnabled() && instrumentation.isAsyncReceiverInstrumentation()) {
return upstream;
}

return new FluxOperator<T, T>(upstream) {
@SuppressWarnings("unchecked")
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Objects.requireNonNull(actual, "'actual' cannot be null.");
source.subscribe(new TracingFluxOperator<T>(actual, (msg, handler) -> {
if (msg instanceof Message) {
instrumentation.instrumentProcess((Message) msg, ReceiverKind.ASYNC_RECEIVER,
(Function<Message, Throwable>) handler);
} else if (msg instanceof ServiceBusReceivedMessage) {
instrumentation.instrumentProcess((ServiceBusReceivedMessage) msg, ReceiverKind.ASYNC_RECEIVER,
(Function<ServiceBusReceivedMessage, Throwable>) handler);
}
}));
}
};
}

private final CoreSubscriber<? super T> downstream;
private final BiConsumer<T, Function<T, Throwable>> instrumentation;
private TracingFluxOperator(CoreSubscriber<? super T> downstream, BiConsumer<T, Function<T, Throwable>> instrumentation) {
this.downstream = downstream;
this.instrumentation = instrumentation;
}

@Override
public reactor.util.context.Context currentContext() {
return downstream.currentContext();
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
downstream.onSubscribe(this);
}

@Override
protected void hookOnNext(T message) {
instrumentation.accept(message, msg -> {
downstream.onNext(msg);
return null;
});
}

@Override
protected void hookOnError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
protected void hookOnComplete() {
downstream.onComplete();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,25 @@
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.Map;
import java.util.function.Function;

import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;

/**
* Contains convenience methods to instrument specific calls with traces and metrics.
*/
public final class ServiceBusReceiverInstrumentation {
private static final Symbol ENQUEUED_TIME_SYMBOL = Symbol.getSymbol(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
private final ServiceBusMeter meter;
private final ServiceBusTracer tracer;
private final ReceiverKind receiverKind;
Expand Down Expand Up @@ -67,37 +76,44 @@ public boolean isAsyncReceiverInstrumentation() {
* for sync receiver.
* Reports consumer lag metric.
*/
public Context startProcessInstrumentation(String name, ServiceBusReceivedMessage message, Context parent) {
if (message == null || (!tracer.isEnabled() && !meter.isConsumerLagEnabled())) {
public Context startProcessInstrumentation(String name, Map<String, Object> applicationProperties,
OffsetDateTime enqueuedTime, Context parent) {
if (applicationProperties == null || (!tracer.isEnabled() && !meter.isConsumerLagEnabled())) {
return parent;
}

Context span = (tracer.isEnabled() && receiverKind != ReceiverKind.SYNC_RECEIVER) ? tracer.startProcessSpan(name, message, parent) : parent;
Context span = (tracer.isEnabled() && receiverKind != ReceiverKind.SYNC_RECEIVER)
? tracer.startProcessSpan(name, applicationProperties, enqueuedTime, parent)
: parent;

// important to record metric after span is started
meter.reportConsumerLag(message.getEnqueuedTime(), span);
meter.reportConsumerLag(enqueuedTime, span);

return span;
}

public void instrumentProcess(ServiceBusReceivedMessage message, ReceiverKind caller, Function<ServiceBusReceivedMessage, Throwable> handleMessage) {
if (receiverKind != caller) {
if (receiverKind != caller || message == null) {
handleMessage.apply(message);
return;
}

Context span = startProcessInstrumentation("ServiceBus.process", message, Context.NONE);
Context span = startProcessInstrumentation("ServiceBus.process", message.getApplicationProperties(),
message.getEnqueuedTime(), Context.NONE);
ContextAccessor.setContext(message, span);
AutoCloseable scope = tracer.makeSpanCurrent(span);
Throwable error = null;
try {
error = handleMessage.apply(message);
} catch (Throwable t) {
error = t;
throw t;
} finally {
tracer.endSpan(error, span, scope);
wrap(span, message, handleMessage);
}

public void instrumentProcess(Message message, ReceiverKind caller, Function<Message, Throwable> handleMessage) {
if (receiverKind != caller || message == null || message.getApplicationProperties() == null) {
handleMessage.apply(message);
return;
}

Context span = startProcessInstrumentation("ServiceBus.process", message.getApplicationProperties().getValue(),
getEnqueuedTime(message), Context.NONE);
//ContextAccessor.setContext(message, span);
wrap(span, message, handleMessage);
}

/**
Expand Down Expand Up @@ -145,4 +161,30 @@ private static String getSettlementSpanName(DispositionStatus status) {
return "ServiceBus.unknown";
}
}

private <T> void wrap(Context span, T message, Function<T, Throwable> handleMessage) {
AutoCloseable scope = tracer.makeSpanCurrent(span);
Throwable error = null;
try {
error = handleMessage.apply(message);
} catch (Throwable t) {
error = t;
throw t;
} finally {
tracer.endSpan(error, span, scope);
}
}

private OffsetDateTime getEnqueuedTime(Message message) {
if (message.getMessageAnnotations() == null || message.getMessageAnnotations().getValue() == null) {
return null;
}

Object date = message.getMessageAnnotations().getValue().get(ENQUEUED_TIME_SYMBOL);
if (date instanceof Date) {
return ((Date) date).toInstant().atOffset(ZoneOffset.UTC);
}

return null;
}
}

0 comments on commit 28a2b5a

Please sign in to comment.