Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to configure the XTRIM or XADD #8592

Open
zhuangzibin opened this issue Apr 8, 2023 · 3 comments
Open

How to configure the XTRIM or XADD #8592

zhuangzibin opened this issue Apr 8, 2023 · 3 comments
Labels
Milestone

Comments

@zhuangzibin
Copy link

Hi, I am using spring-integration for redis stream, but the data will be kept in memory all the time, resulting in more and more memory, how I configure XTRIM or XADD?
XADD mystream MAXLEN ~ 10 * value
XTRIM mystream MAXLEN ~ 10

1. producerConfig.java

@Slf4j
@Configuration
public class ProducerConfig {

    @Bean
    public FluxMessageChannel fluxMessageChannel() {
        return new FluxMessageChannel();
    }

    /**
     * 生产者 
     * send message to redis stream
     *
     * @param reactiveRedisConnectionFactory reactiveRedisConnectionFactory
     * @return ReactiveRedisStreamMessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "fluxMessageChannel", reactive = @Reactive)
    public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext.<String, Object>newSerializationContext()
                .key(new StringRedisSerializer())
                .value(new GenericJackson2JsonRedisSerializer())
                .hashKey(new StringRedisSerializer())
                .hashValue(new StringRedisSerializer())
                .build();
        ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
                new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");
        reactiveStreamMessageHandler.setSerializationContext(serializationContext);
        reactiveStreamMessageHandler.setExtractPayload(true);
        return reactiveStreamMessageHandler;
    }
}

2. producer.java

@Component
@RequiredArgsConstructor
public class RedisStreamProducer {

    private final FluxMessageChannel fluxMessageChannel;

    public Mono<Boolean> send(BaseMessage downMessage) {
        var message = MessageBuilder.withPayload(downMessage).build();
        return Mono.fromCallable(() -> fluxMessageChannel.send(message));
    }
}
@zhuangzibin zhuangzibin added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Apr 8, 2023
@artembilan artembilan added in: redis status: on-hold-dependency Waiting for a fix to a another project and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Apr 10, 2023
@artembilan artembilan added this to the Backlog milestone Apr 10, 2023
@artembilan
Copy link
Member

The mentioned XTRIM command can be used via ReactiveStreamOperations:

	/**
	 * Trims the stream to {@code count} elements.
	 *
	 * @param key the stream key.
	 * @param count length of the stream.
	 * @return number of removed entries.
	 * @see <a href="https://redis.io/commands/xtrim">Redis Documentation: XTRIM</a>
	 */
	Mono<Long> trim(K key, long count);

The XADD is used from the ReactiveRedisStreamMessageHandler via ReactiveStreamOperations.add(), but looks like Spring Data for Redis does not expose a maxlen as a property on the Record abstraction.

So, you probably have to go via Lettuce API directly in your code to perform an XADD with a MAXLEN.

Please, consider to raise a GH issue in the https://github.com/spring-projects/spring-data-redis/issues to expose maxlen (and approximateTrimming) on the org.springframework.data.redis.connection.stream.Record abstraction.
Then we can expose that option on that ReactiveRedisStreamMessageHandler to let you to manipulate stream trimming from Spring Integration level.

@zhuangzibin
Copy link
Author

Thank you for your answer. I have another question: the producerConfig.java configures the topic, i.e. the stream key name. what if I want to send to another topic? All I can think of at the moment is to configure another ReactiveRedisStreamMessageHandler and FluxMessageChannel @bean?

Is there a similar method: send(String topic, Message message).

@artembilan
Copy link
Member

artembilan commented Apr 11, 2023

I'm not sure what do you see wrong with that ReactiveRedisStreamMessageHandler and its streamKeyExpression property:

	/**
	 * Create an instance based on provided {@link ReactiveRedisConnectionFactory} and expression for stream key.
	 * @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use
	 * @param streamKeyExpression the SpEL expression to evaluate a key for stream
	 */
	public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory,
			Expression streamKeyExpression) {

So, let's imaging your message you send to the input channel of this channel adapter has a header like my_stream.
You can configure that streamKeyExpression like this: new FunctionExpression<Message<?>>(message -> message.getHeaders().get("my_stream")).

That ReactiveRedisStreamMessageHandler has the further logic like this:

	protected Mono<Void> handleMessageInternal(Message<?> message) {
		return Mono
				.fromSupplier(() -> {
					String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
					Assert.notNull(streamKey, "'streamKey' must not be null");
					return streamKey;
				})
				.flatMap((streamKey) -> {
					Object value = message;
					if (this.extractPayload) {
						value = message.getPayload();
					}

					Record<String, ?> record =
							StreamRecords.objectBacked(value)
									.withStreamKey(streamKey);

					return this.reactiveStreamOperations.add(record);
				})
				.then();
	}

I'm not sure where have you found that send(String topic, Message message). But I believe it is not relevant.

Please, note that GH issues are really for issues and features requests.
To ask questions it is better to go to StackOverflow or Discussions tab here in GitHub project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants