Skip to content

Commit

Permalink
Messaging (Event Hubs, Service Bus) logging improvements (#39904)
Browse files Browse the repository at this point in the history
* Review log levels: log transient issues as warnings, terminal as errors
  • Loading branch information
lmolkova committed Apr 30, 2024
1 parent 18f4b4d commit d773bdf
Show file tree
Hide file tree
Showing 21 changed files with 163 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
(amqpException, interval) -> {
final Duration lastRefresh = lastRefreshInterval.get();

LOGGER.atError()
LOGGER.atWarning()
.addKeyValue("scopes", scopes)
.addKeyValue(INTERVAL_KEY, interval)
.log("Error is transient. Rescheduling authorization task.", amqpException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void onError(Throwable throwable) {
}
});
} else {
logger.atWarning()
logger.atError()
.addKeyValue(TRY_COUNT_KEY, attemptsMade)
.log("Retry attempts exhausted or exception was not retriable.", throwable);

Expand Down Expand Up @@ -267,8 +267,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.emptySubscription());
actual.onError(lastError);
} else {
Operators.error(actual, logger.logExceptionAsError(
new IllegalStateException("Cannot subscribe. Processor is already terminated.")));
IllegalStateException error
= new IllegalStateException("Cannot subscribe. Processor is already terminated.");
Operators.error(actual, logger.logExceptionAsWarning(error));
}

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private <T> Mono<T> errorIfEmpty(RequestResponseChannel channel,
= String.format("entityPath[%s] deliveryState[%s] No response received from management channel.",
entityPath, deliveryState);
AmqpException exception = new AmqpException(true, error, channel.getErrorContext());
return logger.atError().addKeyValue(DELIVERY_STATE_KEY, deliveryState).log(exception);
return logger.atWarning().addKeyValue(DELIVERY_STATE_KEY, deliveryState).log(exception);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -176,7 +175,8 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
}
}).cache(1);

this.subscriptions = Disposables.composite(this.endpointStates.subscribe());
this.subscriptions = Disposables.composite(this.endpointStates.subscribe(null,
e -> logger.warning("Error occurred while processing connection state.", e)));
}

/**
Expand Down Expand Up @@ -222,8 +222,8 @@ public Flux<AmqpShutdownSignal> getShutdownSignals() {
public Mono<AmqpManagementNode> getManagementNode(String entityPath) {
return Mono.defer(() -> {
if (isDisposed()) {
return monoError(logger.atError().addKeyValue(ENTITY_PATH_KEY, entityPath), Exceptions
.propagate(new IllegalStateException("Connection is disposed. Cannot get management instance.")));
return monoError(logger.atWarning().addKeyValue(ENTITY_PATH_KEY, entityPath),
new IllegalStateException("Connection is disposed. Cannot get management instance."));
}

final AmqpManagementNode existing = managementNodes.get(entityPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) {
|| (error instanceof RejectedExecutionException));

if (!shouldRetry) {
logger.atWarning()
logger.atError()
.addKeyValue(TRY_COUNT_KEY, iteration)
.log("Exception is non-retriable, not retrying for a new connection.", error);
return Mono.error(error);
Expand All @@ -246,7 +246,7 @@ Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) {
final Duration backoff = retryPolicy.calculateRetryDelay(errorToUse, (int) attempts);

if (backoff == null) {
logger.atWarning()
logger.atError()
.addKeyValue(TRY_COUNT_KEY, iteration)
.log("Retry is disabled, not retrying for a new connection.", error);
return Mono.error(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliverySt
@Override
public Mono<Void> addCredits(int credits) {
if (isDisposed()) {
return monoError(logger, new IllegalStateException("Cannot add credits to closed link: " + getLinkName()));
return monoError(logger.atWarning(),
new IllegalStateException("Cannot add credits to closed link: " + getLinkName()));
}

return Mono.create(sink -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
Expand Down Expand Up @@ -70,6 +69,7 @@
import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
import static com.azure.core.amqp.implementation.ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS;
import static com.azure.core.util.FluxUtil.monoError;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
handler.getConnectionId(), handler.getLinkName());

this.endpointStates = this.handler.getEndpointStates().map(state -> {
logger.verbose("State {}", state);
logger.atVerbose().addKeyValue("state", state).log("onEndpointState");
this.hasConnected.set(state == EndpointState.ACTIVE);
return AmqpEndpointStateUtil.getConnectionState(state);
}).doOnError(error -> {
Expand Down Expand Up @@ -332,7 +332,7 @@ private byte[] batchBinaryDataSectionBytes(Message sectionMessage, int maxMessag
}

private Mono<Void> batchBufferOverflowError(int maxMessageSize) {
return FluxUtil.monoError(logger,
return monoError(logger,
new AmqpException(
false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US,
"Size of the payload exceeded maximum message size: %s kb", maxMessageSize / 1024),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.qpid.proton.engine.Session;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
Expand Down Expand Up @@ -61,6 +60,7 @@
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
import static com.azure.core.amqp.implementation.ClientConstants.SESSION_NAME_KEY;
import static com.azure.core.util.FluxUtil.monoError;

/**
* Represents an AMQP session using proton-j reactor.
Expand Down Expand Up @@ -297,11 +297,9 @@ Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean dis
@Override
public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
if (isDisposed()) {
return Mono.error(logger.atError()
.addKeyValue(SESSION_NAME_KEY, sessionName)
.log(new AmqpException(true, String
.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME),
sessionHandler.getErrorContext())));
return monoError(logger.atWarning().addKeyValue(SESSION_NAME_KEY, sessionName), new AmqpException(true,
String.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME),
sessionHandler.getErrorContext()));
}

final TransactionCoordinator existing = transactionCoordinator.get();
Expand Down Expand Up @@ -354,16 +352,13 @@ protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPat
ConsumerFactory consumerFactory) {

if (isDisposed()) {
LoggingEventBuilder logBuilder = logger.atError()
LoggingEventBuilder logBuilder = logger.atWarning()
.addKeyValue(SESSION_NAME_KEY, sessionName)
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue(LINK_NAME_KEY, linkName);

// TODO(limolkova) this can be simplified with FluxUtil.monoError(LoggingEventBuilder), not using it for now
// to allow using azure-core-amqp with stable azure-core 1.24.0 to simplify dependency management
// we should switch to it once monoError(LoggingEventBuilder) ships in stable azure-core
return Mono.error(logBuilder.log(Exceptions.propagate(new AmqpException(true,
"Cannot create receive link from a closed session.", sessionHandler.getErrorContext()))));
return monoError(logBuilder, new AmqpException(true, "Cannot create receive link from a closed session.",
sessionHandler.getErrorContext()));
}

final LinkSubscription<AmqpReceiveLink> existingLink = openReceiveLinks.get(linkName);
Expand Down Expand Up @@ -447,16 +442,13 @@ private Mono<AmqpSendLink> createProducer(String linkName, String entityPath,
Map<Symbol, Object> linkProperties, boolean requiresAuthorization) {

if (isDisposed()) {
LoggingEventBuilder logBuilder = logger.atError()
LoggingEventBuilder logBuilder = logger.atWarning()
.addKeyValue(SESSION_NAME_KEY, sessionName)
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue(LINK_NAME_KEY, linkName);

// TODO(limolkova) this can be simplified with FluxUtil.monoError(LoggingEventBuilder), not using it for now
// to allow using azure-core-amqp with stable azure-core 1.24.0 to simplify dependency management
// we should switch to it once monoError(LoggingEventBuilder) ships in stable azure-core
return Mono.error(logBuilder.log(Exceptions.propagate(new AmqpException(true,
"Cannot create send link from a closed session.", sessionHandler.getErrorContext()))));
return monoError(logBuilder, new AmqpException(true, "Cannot create send link from a closed session.",
sessionHandler.getErrorContext()));
}

final LinkSubscription<AmqpSendLink> existing = openSendLinks.get(linkName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,6 @@ public class ConnectionHandler extends Handler {
private final SslPeerDetails peerDetails;
private final AmqpMetricsProvider metricProvider;

/**
* Creates a handler that handles proton-j's connection events.
*
* @param connectionId Identifier for this connection.
* @param connectionOptions Options used when creating the AMQP connection.
* @param peerDetails The peer details for this connection.
* @deprecated use {@link ConnectionHandler#ConnectionHandler(String, ConnectionOptions, SslPeerDetails, AmqpMetricsProvider)} instead.
* @throws NullPointerException if {@code connectionOptions} or {@code peerDetails} is null.
*/
@Deprecated
public ConnectionHandler(final String connectionId, final ConnectionOptions connectionOptions,
SslPeerDetails peerDetails) {
this(connectionId, connectionOptions, peerDetails,
new AmqpMetricsProvider(null, connectionOptions.getFullyQualifiedNamespace(), null));
}

/**
* Creates a handler that handles proton-j's connection events.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,6 @@ public class ReceiveLinkHandler extends LinkHandler {
private final Set<Delivery> queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final String entityPath;

/**
* Creates a new instance of ReceiveLinkHandler.
*
* @param connectionId Identifier for the connection.
* @param hostname Hostname of the connection.
* @param linkName Name of the link.
* @param entityPath Address to the entity.
* @deprecated use {@link #ReceiveLinkHandler(String, String, String, String, AmqpMetricsProvider)} instead.
*/
@Deprecated
public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
this(connectionId, hostname, linkName, entityPath, new AmqpMetricsProvider(null, hostname, entityPath));
}

/**
* Creates a new instance of ReceiveLinkHandler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public boolean containsDelivery(UUID deliveryTag) {
*/
public Mono<Void> sendDisposition(String deliveryTag, DeliveryState desiredState) {
if (isTerminated.get()) {
return monoError(logger, DeliveryNotOnLinkException.linkClosed(deliveryTag, desiredState));
return monoError(logger.atWarning(), DeliveryNotOnLinkException.linkClosed(deliveryTag, desiredState));
} else {
return sendDispositionImpl(deliveryTag, desiredState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,6 @@ public class SendLinkHandler extends LinkHandler {
private final Sinks.Many<Integer> creditProcessor = Sinks.many().unicast().onBackpressureBuffer();
private final Sinks.Many<Delivery> deliveryProcessor = Sinks.many().multicast().onBackpressureBuffer();

/**
* Creates a new instance of SendLinkHandler.
*
* @param connectionId The identifier of the connection this link belongs to.
* @param hostname The hostname for the connection.
* @param linkName The name of the link.
* @param entityPath The entity path this link is connected to.
* @deprecated use {@link SendLinkHandler#SendLinkHandler(String, String, String, String, AmqpMetricsProvider)}
* instead.
*/
@Deprecated
public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
this(connectionId, hostname, linkName, entityPath, new AmqpMetricsProvider(null, hostname, null));
}

/**
* Creates a new instance of SendLinkHandler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,6 @@ public class SessionHandler extends Handler {
private final ReactorDispatcher reactorDispatcher;
private final AmqpMetricsProvider metricsProvider;

/**
* Creates a session handler.
*
* @param connectionId Identifier for the connection.
* @param hostname Hostname of the connection.
* @param sessionName Name of the session.
* @param reactorDispatcher Reactor dispatcher.
* @param openTimeout Timeout for opening the session.
* @deprecated use {@link #SessionHandler(String, String, String, ReactorDispatcher, Duration, AmqpMetricsProvider)}
* instead.
*/
@Deprecated
public SessionHandler(String connectionId, String hostname, String sessionName, ReactorDispatcher reactorDispatcher,
Duration openTimeout) {
this(connectionId, hostname, sessionName, reactorDispatcher, openTimeout,
new AmqpMetricsProvider(null, hostname, null));
}

/**
* Creates a session handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ public void onTransportError(Event event) {
final URI url = createURI(fullyQualifiedNamespace, port);
final InetSocketAddress address = new InetSocketAddress(hostNameParts[0], port);

logger.atError()
.log("Failed to connect to url: '{}', proxy host: '{}'", url, address.getHostString(), ioException);
logger.atWarning()
.addKeyValue("url", url)
.addKeyValue("proxyHost", address.getHostString())
.log("Failed to connect.", ioException);

final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void teardown() throws Exception {
@SuppressWarnings("deprecation")
public void constructorNull() {
// Act
assertThrows(NullPointerException.class, () -> new ReactorHandlerProvider(null));
assertThrows(NullPointerException.class, () -> new ReactorHandlerProvider(null, null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Mono<Void> setupAsync() {
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getAuthorizationScope());
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider, null);
final AmqpLinkProvider linkProvider = new AmqpLinkProvider();
final PerfMessageSerializer messageSerializer = new PerfMessageSerializer();
connection = new ReactorConnection(connectionId,
Expand Down

0 comments on commit d773bdf

Please sign in to comment.