Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer is stuck #162

Open
RomanOlegovich opened this issue Apr 6, 2022 · 9 comments
Open

Consumer is stuck #162

RomanOlegovich opened this issue Apr 6, 2022 · 9 comments

Comments

@RomanOlegovich
Copy link

After some time idle, consumer stop to read messages from Kafka (without errors and logs). I need to re-run consumer to start reading again. Have you any ideas?

@BEagle1984
Copy link
Owner

It's strange that no log is written at all. Have you tried lowering the log level for the Silverback namespace?

The message broker also has a log and you should be able to see the reason why it got disconnected.

It's hard to help you further without a log or any other hint. What I can say is that it isn't a known issue and we would have noticed by now if this were a systematic bug.
Can you maybe post your configuration code (endpoint configuration)?

@RomanOlegovich
Copy link
Author

RomanOlegovich commented Apr 6, 2022

Thank for quick answer.

  1. I did not change the log level. I'll try it.

  2. I not sure it got disconnected or not....

  3. I have app events, each event has own kafka topic and multiple consumers in different modules (cs projects).

Example from one module configuration.

            services.ConfigureSilverback()
                .AddEndpointsConfigurator(x =>
                {
                    return new NotificationsEndpointsConfigurator(moduleOptions.BridgeOptions);
                })
                // Add Subscribers
                .AddScopedSubscriber<DrivingAssignedEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingRequestConfirmedQueueReader>(filter)
                .AddScopedSubscriber<DrivingCanceledEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingRequestCanceledEventQueueReader>(filter)
                .AddScopedSubscriber<SurveyCreatedEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingStartingQueueReader>(filter)
                .AddScopedSubscriber<OffsetFailedEventQueueReader>(filter)
                .AddScopedSubscriber<NotifyPaymentEventQueueReader>(filter)
                ;

Note:
all consumers connect to a single broker.
I use topic per event to integrate with system which are not using silverback
Each module has own IEndpointsConfigurator with same BootstrapServers bot different ConsumerGroupId

    public class NotificationsEndpointsConfigurator : IEndpointsConfigurator
    {
        private readonly BridgeOptions bridgeOptions;

        public NotificationsEndpointsConfigurator(BridgeOptions bridgeOptions)
        {
            this.bridgeOptions = bridgeOptions;
        }

        public void Configure(IEndpointsConfigurationBuilder builder)
        {

            Action<KafkaConsumerConfig> consumersConfig = config =>
            {
                config.BootstrapServers =
                       bridgeOptions.BootstrapServers;
                config.GroupId = bridgeOptions.ConsumerGroupId;
                config.AutoOffsetReset = bridgeOptions.AutoOffsetReset;
            };

            Action<KafkaProducerConfig> producerConfig = config =>
            {
                config.BootstrapServers =
                    bridgeOptions.BootstrapServers;
            };

            builder
            .AddKafkaEndpoints(
                endpoints => endpoints
                        .AddOutbound<AddNotification>(endpoint => endpoint.ProduceTo(nameof(AddNotification)).Configure(producerConfig).SerializeAsJsonUsingNewtonsoft())

                        .AddInbound<DrivingRequestEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingRequestConfirmedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestConfirmedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingAssignedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingAssignedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingRequestCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<SurveyCreatedEvent>(endpoint => endpoint.ConsumeFrom(nameof(SurveyCreatedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingStartingEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingStartingEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<OffsetFailedEvent>(endpoint => endpoint.ConsumeFrom(nameof(OffsetFailedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<NotifyPaymentEvent>(endpoint => endpoint.ConsumeFrom(nameof(NotifyPaymentEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                    );
        }
    }

@RomanOlegovich
Copy link
Author

What logging levels do you recommend https://github.com/BEagle1984/silverback/blob/master/docs/concepts/logging.md ?

  • Core
  • Integration
  • Kafka

@BEagle1984
Copy link
Owner

Just add this to your app.settings to have all possible logs (it's gonna be verbose):

{
  "Logging": {
    "LogLevel": {
      "Silverback": "Trace"
    }
  }

The configuration is really basic and I don't see anything wrong. If the consumer disconnects you should see the related info entries in the log (consumer disconnected, partitions revoked, etc.).
The underlying library notifies about protocol errors and also in that case you should have a warning or error in the log.

When you say "after some time idle" do you mean that you don't produce any message for a while and when you start producing again your consumers don't pull them? Did I get it correctly?

@RomanOlegovich
Copy link
Author

RomanOlegovich commented Apr 6, 2022

LogLevel changed, we will check for errors...

Yes you are right.

Also I have a problem. Maybe it's related.
I don't have persistent storage for kafka and zookeper yet. So the topics don't exist before the consumers start. If you run the project on a local PC then: all consumers are trying to read from the topic in an infinite loop (I see a warning "unknown topic"). but after deploying to kubernetes, consumers make several attempts and stop.

Can mode of build (Debug|Release, ASPNETCORE_ENVIRONMENT etc) change behavior?

@RomanOlegovich
Copy link
Author

I have event A and B, consumer A and B, topic A and B.
On event A produced, consumer B stoped (and vice versa) with OperationCanceledException. What do you think about this?

@BEagle1984
Copy link
Owner

It should work exactly the same on the local machine and in kubernetes. Unless you implemented some Kafka-related health check that causes your pod to be stopped in that situation.

The consumers/producers in Silverback are completely unrelated and I never observed the behavior you describe.

It's very difficult to help you with the information I have. Are you able to provide a sample project that reproduces the issue on a local environment (Kafka started via docker compose)? Or at least the trace logs of your service where I can try to figure out what happened?

@RomanOlegovich
Copy link
Author

I was no longer able to reproduce the problem, if it appears again I will let you know. The only thing I did was to remove the call to ConfigureSilverback in the application modules. But I'm not sure if that had any effect.

@BEagle1984
Copy link
Owner

BEagle1984 commented Apr 27, 2022

OK, keep me posted.

No, calling ConfigureSilverback is fine. At SwissPost we also use a vertical slice architecture in our bigger .net projects and they all use Silverback as well, without issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants