Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Managed Consumer Interceptors #2244

Open
frosiere opened this issue Apr 27, 2022 · 2 comments
Open

Spring Managed Consumer Interceptors #2244

frosiere opened this issue Apr 27, 2022 · 2 comments

Comments

@frosiere
Copy link
Contributor

frosiere commented Apr 27, 2022

Following #2049, Spring Kafka may also support Spring managed interceptors for standard consumers and streams.

In pure Kafka, interceptors are specified through a configuration entry. This entry contains the list of classes related to the interceptors. This list of class is then converted into a list of interceptor instances (see AbstractConfig#getConfiguredInstances). As the interceptor is instantiated by Kafka itself, there is no way to inject dependencies into the interceptor expect by using the trick with the configure method (see https://docs.spring.io/spring-kafka/docs/current/reference/html/#interceptors).
An update in Kafka to support instances instead of classes (technical limitation?) could help a lot.

So, the proposal would be to extend ProducerConfig, ConsumerConfig and StreamsConfig as follow to complete the list of interceptors with the Spring managed interceptors.

// example for streams
public class SpringAwareStreamConfig extends StreamsConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;
    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareStreamConfig(Map<?, ?> props,
                                   boolean doLog,
                                   List<ProducerInterceptor<?, ?>> producerInterceptors,
                                   List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

These new Kafka config classes would then be respectively instantiated in

  • DefaultKafkaProducerFactory#createRawProducer
  • DefaultKafkaConsumerFactory#createRawConsumer
  • StreamsBuilderFactoryBean#start

This approach would let Kafka handle the calls to the interceptor methods instead of having to call these methods in Spring Kafka itself.

Any comment, feedback, other proposal is more than welcome.

@garyrussell
Copy link
Contributor

Interesting; I was not aware of those methods; much better than the hack in the docs you referenced.

Sounds like a plan. Looking forward to seeing a PR.

@frosiere
Copy link
Contributor Author

frosiere commented Apr 30, 2022

Unfortunately, the proposal I did is not applicable for producers and consumers as the constructor accepting a producer/consumer config is not public (default modifier).
Issue has been created on the Kafka project to increase the visibility of this constructor.
See https://issues.apache.org/jira/browse/KAFKA-13864

I could still drop a PR for streams but this would be incomplete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants