Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redisson stream gets stucked after acquiring connection #5817

Closed
inampar opened this issue Apr 26, 2024 · 11 comments
Closed

Redisson stream gets stucked after acquiring connection #5817

inampar opened this issue Apr 26, 2024 · 11 comments
Labels
Milestone

Comments

@inampar
Copy link

inampar commented Apr 26, 2024

  • We are using Sentinel mode, After call to XREADGROUP , debug level suggests that connection acquired but I never see it being released in logs and we are unable to read the message because of this.Also no further messages streamed are being fetched.

Apr 26 2024 05:35:20 SimpleAsyncTaskExecutor-1 DEBUG redisson.command.RedisExecutor [] - acquired connection for command (XREADGRO
UP) and params [GROUP, <CONSUMER_GRP>, <CONSUMER_GRP>, STREAMS, [83, 78, 65, 80, 83, 72, 79, 84, 95, 6
7, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node <MASTER_HOST>/<MASTER_HOST>:6379... RedisConnection@410594001 [redisClient=[addr=redis://<MASTER_HOST>:6379], channel=[id: 0x676d95d6, L:/10.112.67.244:33
789 - R:<MASTER_HOST>/<MASTER_HOST>:6379], currentCommand=null, usage=1]

Also, I do not see any new stream messages being received here.

  • ConnectionWatchDog has some logs printed but not clear for me

Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@1996177982 [redisCl
ient=[addr=redis://<MASKED_HOST>.134:6379], channel=[id: 0x5cd3f64e, L:/<MASKED_HOST>.244:57665 ! R:<MASKED_HOST>.134/<MASKED_HOST>.134:6379], cur
rentCommand=null, usage=0] to <MASKED_HOST>.134/<MASKED_HOST>.134:6379
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@1091970061 [redisCl
ient=[addr=redis://<MASKED_HOST>.134:6379], channel=[id: 0x4de73e53, L:/<MASKED_HOST>.244:57652 ! R:<MASKED_HOST>.134/<MASKED_HOST>.134:6379], cur
rentCommand=null, usage=1] to <MASKED_HOST>.134/<MASKED_HOST>.134:6379
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@607887671 [redisCli
ent=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0x07db528a, L:/<MASKED_HOST>.244:57479 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379], current
Command=null, usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379
Apr 26 2024 05:35:20 redisson-netty-2-24 DEBUG edisson.client.RedisConnection [] - Connection created [addr=redis://<MASKED_HOST>.134:
6379]
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@451031975 [redisCli
ent=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0x22f8d9af, L:/<MASKED_HOST>.244:57494 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379], current
Command=null, usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379
Apr 26 2024 05:35:20 redisson-netty-2-25 DEBUG edisson.client.RedisConnection [] - Connection created [addr=redis://<MASKED_HOST>.134:
6379]
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisPubSubConnection@1903541546 [r
edisClient=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0xd01366a2, L:/<MASKED_HOST>.244:57471 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379],
currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@44064757[Not completed, 1 dependents], command=(PING),
params=[], codec=org.redisson.client.codec.StringCodec], usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379

@mrniko
Copy link
Member

mrniko commented Apr 26, 2024

How is the RStream.readGroup() method called?

ConnectionWatchDog has some logs printed but not clear for me

It looks like this connection in reconnection state. Can you share the full log?

@inampar
Copy link
Author

inampar commented Apr 27, 2024

@mrniko
Actually we are using spring library( org.springframework.data.redis.connection.stream) and the logs are too huge but I previously sent the ones which were suspicious.
This is working fine but at times consumer stops receiving messages over stream and nothing helps unless app service restart.

Also, please suggest if this is fine to use or we should use conventional way using a while(true) loop and fetch/poll using RStream continuously.

Below is the consumer side polling =>

private RedisConnectionFactory redisConnectionFactory; // RedissonConnectionFactory autowired via //RedissonAutoConfiguration

StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);


Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset), listener(channel, consumer, listener));


listenerContainer.start();

private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(

               Consumer consumer, StreamOffset<String> streamOffset) {
               return StreamMessageListenerContainer.StreamReadRequest
               .builder(streamOffset)
               .consumer(consumer)
               .autoAcknowledge(false)
               .cancelOnError((err) -> false)  // do not stop consuming after error
               .build();`
   }

  private <T> StreamListener listener(String channel, Consumer consumer, RedisStreamListener<T> listener) {

       return message -> {
           try {
               log.info("Acknowledging message: {}", message.getId());
               stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
               log.info("RECEIVED ({}) message: messageId = {}, sequence = {}, data = {}", consumer,
                       message.getId().getValue(), message.getId().getSequence(), message.getValue());
               listener.onReceive((T) message.getValue());
           } catch(Exception e) {
               log.error("Error while processing message:{}", message);
           }
       };
   }

 private <T> StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, T>> getOptions(

           RedisStreamListener<T> listener) {
       return StreamMessageListenerContainer
                       .StreamMessageListenerContainerOptions
                       .builder()
                       .pollTimeout(Duration.ofSeconds(0))
                       .targetType(listener.getMessageType())
                       .build();
   }

Publisher side code:


class RedisStreamPublisherImpl implements RedisStreamPublisher {

    @Resource(type = StringRedisTemplate.class)
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public <T extends Serializable> RecordId publish(String channel, T message) {
        RecordId recordId = null;
        try {
            ObjectRecord<String, T> record = StreamRecords.newRecord()
                    .ofObject(message)
                    .withStreamKey(channel);
            recordId = stringRedisTemplate.opsForStream().add(record);
            if (isNull(recordId)) {
                log.warn("STREAMING Error: topic = {}, data = {}, error: {}", channel, message, "RecordId returned as null");
            } else {
                log.info("STREAMING: recordId = {}, topic = {}, data = {}", recordId.getValue(), channel, message);
            }
        } catch (Throwable throwable) {
            log.error("STREAMING Errored", throwable);
        }
        return recordId;
    }
}

@mrniko
Copy link
Member

mrniko commented Apr 29, 2024

Unable to reproduce using the test below.

    @Test
    public void test1() throws InterruptedException {
        RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redisson);

        StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());


        Consumer consumer = Consumer.from("group", "consumer1");
        StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
        String channel = "test";
        AtomicInteger counter = new AtomicInteger();
        Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
                listener(redisConnectionFactory, channel, consumer, counter));

        StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
        t1.opsForStream().createGroup("test", "group");

        listenerContainer.start();

        ExecutorService s = Executors.newSingleThreadExecutor();

        s.submit(() -> {
            for (int i = 0; i < 10; i++) {
                StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
                ObjectRecord<String, String> record = StreamRecords.newRecord()
                        .ofObject("message")
                        .withStreamKey(channel);
                RecordId recordId = stringRedisTemplate.opsForStream().add(record);
                System.out.println("recordId " + recordId);
            }
        });

        Thread.sleep(5000);
        Assertions.assertThat(counter.get()).isEqualTo(10);
    }

    private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> getOptions() {
        return StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(String.class)
                .build();
    }

    private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(

            Consumer consumer, StreamOffset<String> streamOffset) {
        return StreamMessageListenerContainer.StreamReadRequest
                .builder(streamOffset)
                .consumer(consumer)
                .autoAcknowledge(false)
                .cancelOnError((err) -> false)  // do not stop consuming after error
                .build();
    }

    private <T> StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {

        return message -> {
            try {
                System.out.println("Acknowledging message: " + message.getId());
                StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
                stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
                System.out.println("RECEIVED " + consumer + " " + message);
                counter.incrementAndGet();
            } catch(Exception e) {
                e.printStackTrace();
            }
        };
    }

@inampar
Copy link
Author

inampar commented Apr 29, 2024

@mrniko
It normally occurs when there are no messages in stream for quite some time say 7-8 hours and then 1 message is pushed.
It is quite intermittent in nature.

Can you suggest me on this please -

Also, please suggest if this is fine to use or we should use conventional way using a while(true) loop and fetch/poll using RStream continuously.

I was wondering if mixing Redisson with spring library can cause this?
It would be helpful if you could send some code snippet to read continuously using RStream.readGroup() approach.

@mrniko
Copy link
Member

mrniko commented Apr 29, 2024

I was wondering if mixing Redisson with spring library can cause this?

No.

It would be helpful if you could send some code snippet to read continuously using RStream.readGroup() approach.

Your code should work fine.

Are there any exceptions in logs prior issue?

@inampar
Copy link
Author

inampar commented Apr 29, 2024

I was wondering if mixing Redisson with spring library can cause this?

No.

It would be helpful if you could send some code snippet to read continuously using RStream.readGroup() approach.

Your code should work fine.

Are there any exceptions in logs prior issue?

No exceptions, just some commands were (Completed exceptionally) in the promise and then the ConnectinWatchdog entries which I shared earlier.

@inampar
Copy link
Author

inampar commented May 7, 2024

@mrniko

I was able to reproduce the issue. Can you please help here.

Steps-

  1. Use above code which uses StreamMessageListenerContainer for consuming messages.
  2. Setup sentinel
  3. Start the service and publish message to stream, we will get that message.
  4. Now, failover sentinel ( lets say earlier master was node1:6379, now master should be node2:6379)
  5. Publish message to stream.
  6. Now we never get this message.

Ot works if we stop the StreamMessageListenerContainer and then call invoke it from start again like below-

RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redisson);

        StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());


        Consumer consumer = Consumer.from("group", "consumer1");
        StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
        String channel = "test";
        AtomicInteger counter = new AtomicInteger();
        Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
                listener(redisConnectionFactory, channel, consumer, counter));

        StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
        t1.opsForStream().createGroup("test", "group");

        listenerContainer.start();

@mrniko mrniko added this to the 3.30.0 milestone May 9, 2024
@mrniko
Copy link
Member

mrniko commented May 9, 2024

@inampar

Thanks for the test. It helped a lot.

Can you try attached version?

redisson-3.29.1-SNAPSHOT.jar.zip

@mrniko mrniko modified the milestones: 3.30.0, 3.30.1 May 14, 2024
@inampar
Copy link
Author

inampar commented May 17, 2024

Hi @mrniko
This works, but we cannot use this version in production.
Can you let us know on the planned release version and date for this fix?

Also, There is 1 more issue I found in RedissonConnectionFactory class.

public RedisSentinelConnection getSentinelConnection() {
     if (!this.redisson.getConfig().isSentinelConfig()) {
         throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
     } else {
         SentinelConnectionManager manager = (SentinelConnectionManager)((Redisson)this.redisson).getConnectionManager();
         Iterator var2 = manager.getSentinels().iterator();

         while(var2.hasNext()) {
             RedisClient client = (RedisClient)var2.next();
             org.redisson.client.RedisConnection connection = client.connect();

             try {
                 String res = (String)connection.sync(RedisCommands.PING, new Object[0]);
                 if ("pong".equalsIgnoreCase(res)) {
                     return new RedissonSentinelConnection(connection);
                 }
             } catch (Exception var6) {
                 log.warn("Can't connect to " + client, var6);
                 connection.closeAsync();
             }
         }

         throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
     }
 }

Issue-

  • It always tries to connect to 1 sentinel only which is defined in config based on iterator.If that sentinel process is down, it does not try connecting to others and then returning one.
  • The try should be placed right after while block ideally like below.
public RedisSentinelConnection getSentinelConnection() {
        if (!this.redisson.getConfig().isSentinelConfig()) {
            throw new InvalidDataAccessResourceUsageException("Redisson is not in Sentinel mode");
        } else {
            SentinelConnectionManager manager = (SentinelConnectionManager)((Redisson)this.redisson).getConnectionManager();
            Iterator var2 = manager.getSentinels().iterator();

            while(var2.hasNext()) {
                try {
                 RedisClient client = (RedisClient)var2.next();
                 org.redisson.client.RedisConnection connection = client.connect();

                
                    String res = (String)connection.sync(RedisCommands.PING, new Object[0]);
                    if ("pong".equalsIgnoreCase(res)) {
                        return new RedissonSentinelConnection(connection);
                    }
                } catch (Exception var6) {
                    log.warn("Can't connect to " + client, var6);
                    connection.closeAsync();
                }
            }

            throw new InvalidDataAccessResourceUsageException("Sentinels are not found");
        }
    }

@mrniko
Copy link
Member

mrniko commented May 17, 2024

Hello @inampar,

This works, but we cannot use this version in production.
Can you let us know on the planned release version and date for this fix?

Fixed in 1f86d11 a82bd11 commits included in 3.30.0 version. Which has already bean released.

The try should be placed right after while block ideally like below.

Good point.

@mrniko mrniko added bug and removed question labels May 17, 2024
mrniko pushed a commit that referenced this issue May 17, 2024
….getSentinelConnection() method throws error on the first offline sentinel #5817
@mrniko
Copy link
Member

mrniko commented May 17, 2024

Fixed

@mrniko mrniko closed this as completed May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants