New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redisson stream gets stucked after acquiring connection #5817
Comments
How is the RStream.readGroup() method called?
It looks like this connection in reconnection state. Can you share the full log? |
@mrniko Also, please suggest if this is fine to use or we should use conventional way using a while(true) loop and fetch/poll using RStream continuously. Below is the consumer side polling =>
Publisher side code:
|
Unable to reproduce using the test below. @Test
public void test1() throws InterruptedException {
RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redisson);
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());
Consumer consumer = Consumer.from("group", "consumer1");
StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
String channel = "test";
AtomicInteger counter = new AtomicInteger();
Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
listener(redisConnectionFactory, channel, consumer, counter));
StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
t1.opsForStream().createGroup("test", "group");
listenerContainer.start();
ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> {
for (int i = 0; i < 10; i++) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject("message")
.withStreamKey(channel);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
System.out.println("recordId " + recordId);
}
});
Thread.sleep(5000);
Assertions.assertThat(counter.get()).isEqualTo(10);
}
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> getOptions() {
return StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(String.class)
.build();
}
private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(
Consumer consumer, StreamOffset<String> streamOffset) {
return StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.consumer(consumer)
.autoAcknowledge(false)
.cancelOnError((err) -> false) // do not stop consuming after error
.build();
}
private <T> StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {
return message -> {
try {
System.out.println("Acknowledging message: " + message.getId());
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
System.out.println("RECEIVED " + consumer + " " + message);
counter.incrementAndGet();
} catch(Exception e) {
e.printStackTrace();
}
};
} |
@mrniko Can you suggest me on this please - Also, please suggest if this is fine to use or we should use conventional way using a while(true) loop and fetch/poll using RStream continuously. I was wondering if mixing Redisson with spring library can cause this? |
No.
Your code should work fine. Are there any exceptions in logs prior issue? |
No exceptions, just some commands were (Completed exceptionally) in the promise and then the ConnectinWatchdog entries which I shared earlier. |
I was able to reproduce the issue. Can you please help here. Steps-
Ot works if we stop the StreamMessageListenerContainer and then call invoke it from start again like below-
|
Thanks for the test. It helped a lot. Can you try attached version? |
Hi @mrniko Also, There is 1 more issue I found in RedissonConnectionFactory class.
Issue-
|
Hello @inampar,
Fixed in 1f86d11 a82bd11 commits included in 3.30.0 version. Which has already bean released.
Good point. |
….getSentinelConnection() method throws error on the first offline sentinel #5817
Fixed |
Apr 26 2024 05:35:20 SimpleAsyncTaskExecutor-1 DEBUG redisson.command.RedisExecutor [] - acquired connection for command (XREADGRO
UP) and params [GROUP, <CONSUMER_GRP>, <CONSUMER_GRP>, STREAMS, [83, 78, 65, 80, 83, 72, 79, 84, 95, 6
7, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node <MASTER_HOST>/<MASTER_HOST>:6379... RedisConnection@410594001 [redisClient=[addr=redis://<MASTER_HOST>:6379], channel=[id: 0x676d95d6, L:/10.112.67.244:33
789 - R:<MASTER_HOST>/<MASTER_HOST>:6379], currentCommand=null, usage=1]
Also, I do not see any new stream messages being received here.
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@1996177982 [redisCl
ient=[addr=redis://<MASKED_HOST>.134:6379], channel=[id: 0x5cd3f64e, L:/<MASKED_HOST>.244:57665 ! R:<MASKED_HOST>.134/<MASKED_HOST>.134:6379], cur
rentCommand=null, usage=0] to <MASKED_HOST>.134/<MASKED_HOST>.134:6379
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@1091970061 [redisCl
ient=[addr=redis://<MASKED_HOST>.134:6379], channel=[id: 0x4de73e53, L:/<MASKED_HOST>.244:57652 ! R:<MASKED_HOST>.134/<MASKED_HOST>.134:6379], cur
rentCommand=null, usage=1] to <MASKED_HOST>.134/<MASKED_HOST>.134:6379
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@607887671 [redisCli
ent=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0x07db528a, L:/<MASKED_HOST>.244:57479 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379], current
Command=null, usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379
Apr 26 2024 05:35:20 redisson-netty-2-24 DEBUG edisson.client.RedisConnection [] - Connection created [addr=redis://<MASKED_HOST>.134:
6379]
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisConnection@451031975 [redisCli
ent=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0x22f8d9af, L:/<MASKED_HOST>.244:57494 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379], current
Command=null, usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379
Apr 26 2024 05:35:20 redisson-netty-2-25 DEBUG edisson.client.RedisConnection [] - Connection created [addr=redis://<MASKED_HOST>.134:
6379]
Apr 26 2024 05:35:20 redisson-timer-4-1 DEBUG ent.handler.ConnectionWatchdog [] - reconnecting RedisPubSubConnection@1903541546 [r
edisClient=[addr=redis://<MASKED_HOST>.101:6379], channel=[id: 0xd01366a2, L:/<MASKED_HOST>.244:57471 ! R:<MASKED_HOST>.101/<MASKED_HOST>.101:6379],
currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@44064757[Not completed, 1 dependents], command=(PING),
params=[], codec=org.redisson.client.codec.StringCodec], usage=0] to <MASKED_HOST>.101/<MASKED_HOST>.101:6379
The text was updated successfully, but these errors were encountered: