Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHubProducerClient stops sending batch and gives wrong maximumSizeInBytes in Exception #10682

Closed
3 tasks done
shubhambhattar opened this issue May 3, 2020 · 4 comments · Fixed by #10825
Closed
3 tasks done
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@shubhambhattar
Copy link
Contributor

shubhambhattar commented May 3, 2020

Describe the bug
EventHubProducerClient stops sending batch and gives different maximumSizeInBytes in Exception then what is supposed to be.

Exception or Stack Trace

20:33:45 [main] WARN  c.a.c.a.i.RetryUtil - Error is not a TimeoutException nor is it a retryable AMQP exception.
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: mmp-streams.servicebus.windows.net, PATH: billing, REFERENCE_ID: 6d0df19120c4489a8f405b86a7b5ded1_G31, LINK_CREDIT: 300]
20:33:45 [main] ERROR a.e.ProtoProducer - Exception: 
com.azure.core.amqp.exception.AmqpException: Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: mmp-streams.servicebus.windows.net, PATH: billing, REFERENCE_ID: 6d0df19120c4489a8f405b86a7b5ded1_G31, LINK_CREDIT: 300]
 at com.azure.core.amqp.implementation.ReactorSender.send(ReactorSender.java:217)
 at com.azure.messaging.eventhubs.EventHubProducerAsyncClient.lambda$send$6(EventHubProducerAsyncClient.java:437)
 at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
 at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
 at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
 at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
 at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267)
 at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
 at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230)
 at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
 at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
 at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
 at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
 at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
 at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:48)
 at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
 at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
 at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:220)
 at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
 at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
 at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:87)
 at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:50)
 at reactor.core.publisher.Mono.subscribe(Mono.java:4095)
 at reactor.core.publisher.Mono.block(Mono.java:1663)
 at com.azure.messaging.eventhubs.EventHubProducerClient.send(EventHubProducerClient.java:218)
 at app.eh.ProtoProducer.publish(ProtoProducer.java:67)
 at app.workers.Processor.lambda$deductFunds$6(Processor.java:216)
 at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
 at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
 at app.workers.Processor.deductFunds(Processor.java:208)
 at app.workers.Processor.process(Processor.java:263)
 at app.Main.main(Main.java:43)
 Suppressed: java.lang.Exception: #block terminated with an error
  at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
  at reactor.core.publisher.Mono.block(Mono.java:1664)
  ... 8 common frames omitted
Caused by: java.nio.BufferOverflowException: null
 at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.ensureRemaining(WritableBuffer.java:152)
 at org.apache.qpid.proton.codec.BinaryType.fastWrite(BinaryType.java:83)
 at org.apache.qpid.proton.codec.EncoderImpl.writeBinary(EncoderImpl.java:535)
 at org.apache.qpid.proton.codec.messaging.FastPathDataType.write(FastPathDataType.java:126)
 at org.apache.qpid.proton.codec.messaging.FastPathDataType.write(FastPathDataType.java:36)
 at org.apache.qpid.proton.codec.EncoderImpl.writeObject(EncoderImpl.java:734)
 at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:740)
 at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:696)
 at com.azure.core.amqp.implementation.ReactorSender.send(ReactorSender.java:210)
 ... 31 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Requested min remaining bytes(437) exceeds remaining(314) in underlying ByteBuffer: java.nio.HeapByteBuffer[pos=261829 lim=262143 cap=262144]
 at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.ensureRemaining(WritableBuffer.java:148)
 ... 39 common frames omitted

To Reproduce
Steps to reproduce the behavior:
Start sending messages with no maximumSizeInBytes value supplied (Library automatically fills it as 1022 KB) and simulate low traffic where it takes 30-35 mins to fill up a batch. Code being used is the same as shown here: https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/eventhubs/azure-messaging-eventhubs#create-an-event-hub-producer-and-publish-events and library version is 5.0.3

Code Snippet

try {
            if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
                eventHubProducerClient.send(eventDataBatch);

                eventDataBatch = eventHubProducerClient.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
                    log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
                }
            }
        } catch (Exception e) {
            eventDataBatch = eventHubProducerClient.createBatch();
            log.error("Exception: ", e);
        }

Expected behavior
If the message is too big then instead of the Exception, the already created batch should first be sent to EventHub and then a fresh batch is supposed to be created in which the message should be inserted. Instead the message payload size large exception is being thrown.

Additional context
This is also only happening during low traffic when it takes a lot of time to fill up a batch (with default size being taken as 1022KB). This has occured twice around the same time frame where traffic is low and successive eventHubProducerClient.send(eventDataBatch) had > 30 mins time gap.

Also want to point out that when I am not giving a maximumBatchSize in bytes (while creating a EventDataBatch), its automatically taken as 1022 KB and suddenly in the stack trace, the size is given to be 256 KB instead of 1022 KB.

From what I could figure out, the maximumLinkSize is taken as maximumSizeInBytes. Is it the case that if link is broken, it'll fallback to 256 KB and since the already created batch size is greater than 256 KB, we're getting this error?

I got this from this method in EventHubProducerAsyncClient class' following method:

public Mono<EventDataBatch> createBatch(CreateBatchOptions options) {
        if (options == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null."));
        } else {
            String partitionKey = options.getPartitionKey();
            String partitionId = options.getPartitionId();
            int batchMaxSize = options.getMaximumSizeInBytes();
            if (!CoreUtils.isNullOrEmpty(partitionKey) && !CoreUtils.isNullOrEmpty(partitionId)) {
                return FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "CreateBatchOptions.getPartitionKey() and CreateBatchOptions.getPartitionId() are both set. Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'", partitionKey, partitionId)));
            } else {
                return !CoreUtils.isNullOrEmpty(partitionKey) && partitionKey.length() > 128 ? FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "Partition key '%s' exceeds the maximum allowed length: '%s'.", partitionKey, 128))) : this.getSendLink(partitionId).flatMap((link) -> {
                    return link.getLinkSize().flatMap((size) -> {
                        int maximumLinkSize = size > 0 ? size : 262144;
                        if (batchMaxSize > maximumLinkSize) {
                            return FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "BatchOptions.maximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", batchMaxSize, maximumLinkSize)));
                        } else {
                            int batchSize = batchMaxSize > 0 ? batchMaxSize : maximumLinkSize;
                            Objects.requireNonNull(link);
                            return Mono.just(new EventDataBatch(batchSize, partitionId, partitionKey, link::getErrorContext, this.tracerProvider, link.getEntityPath(), link.getHostname()));
                        }
                    });
                });
            }
        }
    }

Library version is:

<dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.0.3</version>
        </dependency>

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels May 3, 2020
@shubhambhattar
Copy link
Contributor Author

shubhambhattar commented May 3, 2020

I can mitigate the issue if I remake the EventHubProducerClient once the AmqpException happens.

Following is the code I am using:

// tobePushed is -> List<Message> toBePushed = new ArrayList<>();
public void publish(Message message) {

        toBePushed.add(billing);
        try {
            if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
                eventHubProducerClient.send(eventDataBatch);
                toBePushed = new ArrayList<>();

                markBatchMetrics(eventDataBatch);
                eventDataBatch = eventHubProducerClient.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
                    log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
                }
            }
        } catch (AmqpException e) {

            log.error("AmqpException: ", e);

            eventHubProducerClient = new EventHubClientBuilder()
                    .connectionString(connectionString)
                    .buildProducerClient();
            eventDataBatch = eventHubProducerClient.createBatch();

            for (Message message1: messages) {
                if (!eventDataBatch.tryAdd(new EventData(message1.toByteArray()))) {
                    eventHubProducerClient.send(eventDataBatch);
                    toBePushed = new ArrayList<>();

                    eventDataBatch = eventHubProducerClient.createBatch();

                    // Try to add that event that couldn't fit before.
                    if (!eventDataBatch.tryAdd(new EventData(message1.toByteArray()))) {
                        log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
                    }
                }
            }
            eventHubProducerClient.send(eventDataBatch);
            eventDataBatch = eventHubProducerClient.createBatch();
        } catch (Exception e) {
            eventDataBatch = eventHubProducerClient.createBatch();
            log.error("Exception: ", e);
        }

Is there a config that would let me control for how long the EventHubProducerClient stays connected?

@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Event Hubs labels May 4, 2020
@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label May 4, 2020
@joshfree joshfree changed the title [BUG] EventHubProducerClient stops sending batch and gives wrong maximumSizeInBytes in Exception May 4, 2020
@joshfree
Copy link
Member

joshfree commented May 4, 2020

Thanks for reporting this issue @shubhambhattar. @srnagar can you please investigate?

/cc @conniey

@shubhambhattar
Copy link
Contributor Author

shubhambhattar commented May 4, 2020

I've created a small sample code to reproduce it. The above shared code is not complete and cannot be run as it is. This one can be run as it is just by including the dependency (version 5.0.3).

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;

import java.time.Instant;

public class ProducerTest {

    private static EventHubProducerClient eventHubProducerClient;
    private static EventDataBatch batch;

    private static void addToBatch() {
        try {
            for(int i = 0; i < 45000; i++)
                batch.tryAdd(new EventData("test".getBytes()));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    private static void send(EventDataBatch batch) {

        try {
            eventHubProducerClient.send(batch);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    public static void main(String[] args) {

        eventHubProducerClient = new EventHubClientBuilder()
                .connectionString("saskey")
                .buildProducerClient();

        batch = eventHubProducerClient.createBatch();
        addToBatch();
        System.out.println("MaxSize = " + batch.getMaxSizeInBytes() / 1024.0d + " KB, msgSize = " + batch.getSizeInBytes() / 1024.0d + " KB");
        send(batch);
        long timePushed = Instant.now().getEpochSecond();

        batch = eventHubProducerClient.createBatch();
        addToBatch();

        while(true) {

            // wait till 31 mins
            if (Instant.now().getEpochSecond() - timePushed > 31 * 60) {
                System.out.println("MaxSize = " + batch.getMaxSizeInBytes() / 1024.0d + " KB, msgSize = " + batch.getSizeInBytes() / 1024.0d + " KB");
                send(batch);
                break;
            }
        }
    }
}

The corresponding logs are:

19:50:43 [main] INFO  c.a.m.e.EventHubClientBuilder - connectionId[MF_fc8b42_1588602043945]: Emitting a single connection.
19:50:43 [main] INFO  c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Setting next AMQP channel.
19:50:43 [main] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Creating and starting connection to some-namespace.servicebus.windows.net:5671
19:50:44 [main] INFO  c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Starting reactor.]
19:50:44 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionInit hostname[some-namespace.servicebus.windows.net], connectionId[MF_fc8b42_1588602043945]
19:50:44 [single-1] INFO  c.a.c.a.i.h.ReactorHandler - connectionId[MF_fc8b42_1588602043945] reactor.onReactorInit
19:50:44 [main] INFO  c.a.c.a.i.AzureTokenManagerProvider - Creating new token manager for audience[amqp://some-namespace.servicebus.windows.net/dummy], resource[dummy]
19:50:44 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionLocalOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
19:50:44 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionBound hostname[some-namespace.servicebus.windows.net], connectionId[MF_fc8b42_1588602043945]
19:50:47 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], remoteContainer[68897ad4abc542519d69b6e61a1579ba_G18]
19:50:47 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_fc8b42_1588602043945], entityName[dummy], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
19:50:47 [single-1] INFO  c.a.c.a.i.ReactorConnection - Setting CBS channel.
19:50:47 [single-1] INFO  c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_fc8b42_1588602043945. entityPath: $cbs. linkName: cbs.
19:50:47 [single-1] INFO  c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Setting next AMQP channel.
19:50:47 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
19:50:47 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
19:50:47 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
19:50:47 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
19:50:48 [single-1] INFO  c.a.c.a.i.ActiveClientTokenManager - Scheduling refresh token task. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
19:50:49 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[dummy], remoteTarget[Target{address='dummy', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
MaxSize = 1022.0 KB, msgSize = 1021.982421875 KB
MaxSize = 1022.0 KB, msgSize = 1021.982421875 KB
20:06:58 [parallel-3] INFO  c.a.c.a.i.ActiveClientTokenManager - Refreshing token. scopes[amqp://some-namespace.servicebus.windows.net/dummy] 
20:06:59 [single-1] INFO  c.a.c.a.i.ActiveClientTokenManager - Authorization successful. Refreshing token in 971000 ms. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
20:21:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:21:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:21:05 [single-1] ERROR c.a.c.a.i.ReactorSender - [dummy] Error occurred in sender error handler.
Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: 68897ad4abc542519d69b6e61a1579ba_G18, LINK_CREDIT: 299]
20:21:05 [single-1] INFO  c.a.c.a.i.ReactorSession - linkName[dummy]: Error occurred. Removing and disposing send link.
20:21:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:26:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteClose connectionId[dummy], entityName[MF_fc8b42_1588602043945], condition[Error{condition=null, description='null', info=null}]
20:26:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_fc8b42_1588602043945], entityName[dummy], condition[null], description[null]
20:26:05 [single-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[dummy]: Complete. Removing and disposing session.
20:26:05 [single-1] INFO  c.a.c.a.i.ReactorSession - sessionId[dummy]: Disposing of session.
20:26:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] WARN  c.a.c.a.i.RequestResponseChannel<cbs-session> - Retry #1. Transient error occurred. Retrying after 4511 ms.
The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]
20:26:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteClose connectionId[cbs-session], entityName[MF_fc8b42_1588602043945], condition[Error{condition=null, description='null', info=null}]
20:26:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:26:05 [single-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[cbs-session]: Complete. Removing and disposing session.
20:26:05 [single-1] INFO  c.a.c.a.i.ReactorSession - sessionId[cbs-session]: Disposing of session.
20:26:05 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteClose hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO  c.a.m.e.i.EventHubConnectionProcessor - Channel closed.
20:26:05 [single-1] INFO  c.a.m.e.i.EventHubReactorAmqpConnection - connectionId[MF_fc8b42_1588602043945]: Disposing of connection.
20:26:05 [single-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Disposing of ReactorConnection.
20:26:05 [single-1] INFO  c.a.c.a.i.AmqpExceptionHandler - Shutdown received: ReactorExecutor.close() was called., isTransient[false], initiatedByClient[true]
20:26:10 [parallel-2] INFO  c.a.c.a.i.RequestResponseChannel<cbs-session> - Retry #1. Requesting from upstream.
20:26:10 [parallel-2] INFO  c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Connection not requested, yet. Requesting one.
20:26:10 [parallel-2] ERROR c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Connection is disposed. Cannot get CBS node.
20:26:10 [parallel-2] INFO  c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_fc8b42_1588602043945. entityPath: $cbs. linkName: cbs.
20:26:10 [parallel-2] INFO  c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Setting next AMQP channel.
20:27:05 [single-1] INFO  c.a.c.a.i.ReactorExecutor - Unable to acquire dispose reactor semaphore within timeout.
20:27:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionLocalClose hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionUnbound hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], state[CLOSED], remoteState[CLOSED]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945],  linkName[dummy]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[dummy], condition[null], description[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945],  linkName[cbs:sender]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945],  linkName[cbs:receiver]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945],  linkName[cbs:sender]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945],  linkName[cbs:receiver]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:27:05 [single-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[cbs-session]: Complete. Removing and disposing session.
20:27:05 [single-1] INFO  c.a.c.a.i.ReactorSession - sessionId[cbs-session]: Disposing of session.
20:27:05 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionFinal hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO  c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Processing all pending tasks and closing old reactor.]
20:27:05 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:27:05 [single-1] INFO  c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Stopping the reactor because thread was interrupted or the reactor has no more events to process.]
20:31:11 [main] INFO  c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Connection not requested, yet. Requesting one.
20:31:11 [main] INFO  c.a.m.e.EventHubClientBuilder - connectionId[MF_65d28d_1588604471389]: Emitting a single connection.
20:31:11 [main] INFO  c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Setting next AMQP channel.
20:31:11 [main] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_65d28d_1588604471389]: Creating and starting connection to some-namespace.servicebus.windows.net:5671
20:31:11 [main] INFO  c.a.c.a.i.ReactorExecutor - connectionId[MF_65d28d_1588604471389], message[Starting reactor.]
20:31:11 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionInit hostname[some-namespace.servicebus.windows.net], connectionId[MF_65d28d_1588604471389]
20:31:11 [main] INFO  c.a.c.a.i.AzureTokenManagerProvider - Creating new token manager for audience[amqp://some-namespace.servicebus.windows.net/dummy], resource[dummy]
20:31:11 [single-1] INFO  c.a.c.a.i.h.ReactorHandler - connectionId[MF_65d28d_1588604471389] reactor.onReactorInit
20:31:11 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionLocalOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_65d28d_1588604471389], errorCondition[null], errorDescription[null]
20:31:11 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionBound hostname[some-namespace.servicebus.windows.net], connectionId[MF_65d28d_1588604471389]
20:31:14 [single-1] INFO  c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_65d28d_1588604471389], remoteContainer[fa9aa3015eb84990a62817ad60423545_G8]
20:31:14 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_65d28d_1588604471389], entityName[dummy], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
20:31:14 [single-1] INFO  c.a.c.a.i.ReactorConnection - Setting CBS channel.
20:31:14 [single-1] INFO  c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_65d28d_1588604471389. entityPath: $cbs. linkName: cbs.
20:31:14 [single-1] INFO  c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_65d28d_1588604471389] entityPath[$cbs]: Setting next AMQP channel.
20:31:14 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:31:14 [single-1] INFO  c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_65d28d_1588604471389], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
20:31:14 [single-1] INFO  c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
20:31:14 [single-1] INFO  c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:31:14 [single-1] INFO  c.a.c.a.i.ActiveClientTokenManager - Scheduling refresh token task. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
20:31:14 [single-1] WARN  c.a.c.a.i.RetryUtil - Error is not a TimeoutException nor is it a retryable AMQP exception.
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: dummy, LINK_CREDIT: 0]
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: dummy, LINK_CREDIT: 0]

Process finished with exit code 0

This will probably be helpful.

I've simulated the case where it takes more than 30 mins (used 31, just to be safe) to completely fill up the batch and send it. The difference between the time when the batch is first initialized and the time the batch completely fills up has to be > 30 mins.

Do note the logs at the end which says its not a TimeoutException but rather says that the size is greater than 256 KB which shouldn't be the case given the max batch size when the batch was created is assumed to be 1022 KB.

@srnagar
Copy link
Member

srnagar commented May 4, 2020

@shubhambhattar I am looking into this issue. Thanks for providing a reproducible sample. I will have an update soon.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants