Skip to content

Commit

Permalink
return back some deprecate ctors, stop using them, log warn for conen…
Browse files Browse the repository at this point in the history
…ctio nerrors instead of dropping it
  • Loading branch information
lmolkova committed Apr 29, 2024
1 parent e728c7b commit eb6fcfa
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,28 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace,
this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}

/**
* Creates an instance of {@link AmqpChannelProcessor}.
*
* @param fullyQualifiedNamespace The fully qualified namespace for the AMQP connection.
* @param entityPath The entity path for the AMQP connection.
* @param endpointStatesFunction The function that returns the endpoint states for the AMQP connection.
* @param retryPolicy The retry policy for the AMQP connection.
* @param logger The logger to use for this processor.
* @deprecated Use constructor overload that does not take {@link ClientLogger}
*/
@Deprecated
public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) {
this.endpointStatesFunction
= Objects.requireNonNull(endpointStatesFunction, "'endpointStates' cannot be null.");
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
Map<String, Object> loggingContext = new HashMap<>(1);
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));
this.logger = new ClientLogger(getClass(), loggingContext);
this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.setOnce(UPSTREAM, this, subscription)) {
Expand Down Expand Up @@ -245,8 +267,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.emptySubscription());
actual.onError(lastError);
} else {
Operators.error(actual, logger.logExceptionAsWarning(
new IllegalStateException("Cannot subscribe. Processor is already terminated.")));
IllegalStateException error
= new IllegalStateException("Cannot subscribe. Processor is already terminated.");
Operators.error(actual, logger.logExceptionAsWarning(error));
}

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
}
}).cache(1);

this.subscriptions = Disposables.composite(this.endpointStates.subscribe());
this.subscriptions = Disposables.composite(this.endpointStates.subscribe(null,
e -> logger.warning("Error occurred while processing connection state.", e)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Retry retryWhenSpec(AmqpRetryPolicy retryPolicy) {
|| (error instanceof RejectedExecutionException));

if (!shouldRetry) {
logger.atWarning()
logger.atError()
.addKeyValue(TRY_COUNT_KEY, iteration)
.log("Exception is non-retriable, not retrying for a new connection.", error);
return Mono.error(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ public ReactorHandlerProvider(ReactorProvider provider, Meter meter) {
this.meter = meter;
}

/**
* Creates a new instance with the reactor provider to handle {@link ReactorDispatcher ReactorDispatchers} to its
* generated handlers.
*
* @param provider The provider that creates and manages {@link Reactor} instances.
*
* @throws NullPointerException If {@code provider} is {@code null}.
* @deprecated use {@link ReactorHandlerProvider#ReactorHandlerProvider(ReactorProvider, Meter)} instead.
*/
@Deprecated
public ReactorHandlerProvider(ReactorProvider provider) {
this(provider, null);
}

/**
* Creates a new connection handler with the given {@code connectionId} and {@code hostname}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.qpid.proton.engine.Session;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Mono<Void> setupAsync() {
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getAuthorizationScope());
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider, new TestMeter(false));
final AmqpLinkProvider linkProvider = new AmqpLinkProvider();
final PerfMessageSerializer messageSerializer = new PerfMessageSerializer();
connection = new ReactorConnection(connectionId,
Expand Down
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus-stress/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<version>2.7.18</version> <!-- {x-version-update;org.springframework.boot:spring-boot-starter;external_dependency} -->
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.10.0-beta.1</version>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus.stress.scenarios;

import com.azure.core.util.BinaryData;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.messaging.servicebus.stress.util.RateLimiter;
import com.azure.messaging.servicebus.stress.util.TestUtils;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import static com.azure.messaging.servicebus.stress.util.TestUtils.blockingWait;
import static com.azure.messaging.servicebus.stress.util.TestUtils.createMessagePayload;
import static com.azure.messaging.servicebus.stress.util.TestUtils.getSenderBuilder;

/**
* Test ServiceBusSenderAsyncClient
*/
@Component("MessageSenderReconnectAsync")
public class MessageSenderReconnectAsync extends ServiceBusScenario {
private final static ClientLogger LOGGER = new ClientLogger(MessageSenderReconnectAsync.class);
@Value("${BATCH_SIZE:2}")
private int batchSize;

@Value("${SEND_CONCURRENCY:5}")
private int sendConcurrency;

private BinaryData messagePayload;

@Override
public void run() {
messagePayload = createMessagePayload(options.getMessageSize());

toClose(Mono.just(1)
.repeat()
.flatMap(i -> singleRun(), sendConcurrency)
.take(options.getTestDuration())
.parallel(sendConcurrency, 1)
.runOn(Schedulers.boundedElastic())
.subscribe());

blockingWait(options.getTestDuration().plusSeconds(1));
}

@Override
public void recordRunOptions(Span span) {
super.recordRunOptions(span);
span.setAttribute(AttributeKey.longKey("sendConcurrency"), sendConcurrency);
span.setAttribute(AttributeKey.longKey("batchSize"), batchSize);
}


private Mono<Void> singleRun() {
return Mono.using(
() -> getSenderBuilder(options, false).buildAsyncClient(),
client -> singleSend(client),
ServiceBusSenderAsyncClient::close);
}
private Mono<Void> singleSend(ServiceBusSenderAsyncClient client) {
return client.createMessageBatch()
.flatMap(b -> {
for (int i = 0; i < batchSize; i ++) {
if (!b.tryAddMessage(new ServiceBusMessage(messagePayload))) {
telemetryHelper.recordError("batch is full", "createBatch");
break;
}
}
return client.sendMessages(b);
})
.onErrorResume(e -> {
telemetryHelper.recordError(e, "create and send batch");
return Mono.empty();
})
.doOnCancel(() -> telemetryHelper.recordError("cancelled", "create and send batch"));
}
}

0 comments on commit eb6fcfa

Please sign in to comment.