Skip to content

Spring Cloud Stream 2.0.0 Release Notes

Ben Arena edited this page Jul 24, 2018 · 38 revisions

Key Features

  • Polling Consumer
  • Micrometer

Notable changes, enhancements and improvements

New Test Binder

Binder backed by Spring Integration https://github.com/spring-cloud/spring-cloud-stream/pull/1241

Web

Given https://github.com/spring-cloud/spring-cloud-stream/commit/0463f7939d654138d955d9897ae2d27a82fea67f web is no longer included, so users can switch manually between Tomcat and Netty by simply switching dependencies

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
</dependency>

and

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Actuator

  • optional, requires web (above) and
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    <optional>true</optional>
</dependency>

Retry Template

. . .is now injectable https://github.com/spring-cloud/spring-cloud-stream/issues/858 - fill in

Actuator Endpoint for Bindings

Stop: curl -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/stop/inOne

Start: curl -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/start/inOne

. . . where inOne is the channel name.

The http://localhost:8080/actuator/bindings/ (GET) will display the list of existing bindings

Content Type

  • Default Content type is set to application/json which needs to be taken into consideration when migrating 1.3 application and/or operating in the mixed mode (i.e., 1.3 producer -> 2.0 consumer).
  • Messages with textual payloads and "contentType" text/* or */json will be converted to Message<String> to maintain the behavioral compatibility with the previous version of the framework. Message's payload will still be converted to the appropriate argument type by the argument resolvers (MessageConverter's) if such argument is not a String (i.e., POJO) essentially resulting in secondary conversion which is not necessary for most cases. We are considering a flag to override this behavior to avoid secondary conversion.
  • [TODO:Add to the DOC] @StreamMessageConverter - to define custom MessageConverter's used by argument resolvers. Added to the top of the list of existing MessageConverters
  • List of MessageConverter's configured by default (in order):
    • TupleJsonMessageConverter - tbd
    • ByteArrayMessageConverter - tbd
    • ObjectStringMessageConverter - tbd
    • JavaSerializationMessageConverter (DEPRECATED) - tbd
    • KryoMessageConverter (DEPRECATED) - tbd
    • JsonUnmarshallingMessageConverter - tbd
  • The contentType as a hint to help select the appropriate MessageConverter. For example, if payload is byte[] and argument is Foo which converter to use?
  • Add note about the behavior of @Transformer around header propagation bug (unless SI fix is available before the release)

NOTE: explain the behavior change for typeless handlers such as: handle(Message<?> message) vs handle(Message<byte[]> message) OR handle(Object val) vs handle(byte[] val) (edited)

ServiceActivator vs. StreamListener

  • there is still an issue with regard to consistency when dealing with json payloads and collections public List<Employee<Person>> echo(List<Employee<Person>> value)

Notable Deprecations

  • JavaSerializationMessageConverter (DEPRECATED) - tbd
  • KryoMessageConverter (DEPRECATED) - tbd

Partitioning

  • Note about partitionKeyExtractorClass deprecation in favor of Spring configured beans
  • [TODO: ensure error is thrown at init] partitionCount must be accompanied by 'partitionKeyExtractor' otherwise it's an error

TODO 1.3 to 2.0 need migration pass for KryoConversion see #1142

Overriding MessageConverters

  • When upgrading from 1.3 to 2.0, it is possible to override MessageConverters by simply configuring @StreamMessageConverter and annotating the targeted content type:
@Bean
@StreamMessageConverter
public AlwaysStringKryoMessageConverter kryoOverrideMessageConverter() {
	return new AlwaysStringKryoMessageConverter(MimeType.valueOf("application/x-java-object"));
}

This can also be used to consume messages with a MessageConverter of type "avro/bytes".

StreamListener Infrastructure enhancements

StreamListener annotation post processor (StreamListenerAnnotationBeanPostProcessor) behavior is enhanced in 2.0 to address the needs of downstream implementations. This section is primarily applied to changes at the framework level (i.e. a new Binder requires a different behavior from the post processor). In a normal context, the users don't have to deal with these type of changes.

StreamListenerSetupMethodOrchestrator

StreamListenerMethodSetupOrchestrator is an API hook that allows downstream binder implementations or applications to inject custom strategies to alter the default StreamListener adapter method invocations.

Primary motivations for the new API

The default StreamListenerAnnotationBeanPostProcessor behaves in a strict manner enforcing various rules and validations. For example, it doesn't allow to have SendTo annotation with multiple destinations or multiple Output annotations present on a method annotated with StreamListener. There might be use cases in which a method needs to return a collection type or an array. Then, based on some rules it wants to send the data to multiple destinations through various bindings. If we rely on the default StreamListenerAnnotationBeanPostProcessor it is not possible to have this behavior for the StreamListener methods in a natural way. There is an extension mechanism already provided by the bean post processor to enhance the behavior, but this is not sufficient to satisfy this use case as the default validations still apply.

Contract for StreamListenerSetupMethodOrchestrator

Here is the contract of the StreamListenerSetupMethodOrchestrator interface

Implemenation Details

On the inbound side, the interface provides a default implementation which is equivalent to the current existing behavior, with the exception that this method is now available to be overridden by a potential downstream implementation. The main change though is introduced with the following methods.

boolean supports(Method method)
void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean)

The supports method takes a method and checks if the implementation can support orchestrating this method. The orchestrateStreamListenerSetupMethod allows an implementer to orchestrate the method by altering method invocation strategies. For instance, the implementation can allow having multiple bindings/destinations on the outbound side, change the way the method is invoked etc.

There is a default internal implementation provided for the framework that is part of the StreamListenerAnnotationBeanPostProcessor. This implementation is not for extension or used outside of this bean post processor. By default, this internal implementation supports any method that's annotated with StreamListener and therefore is used out of the box. However, if the user provides an implementation - be it either as part of a binder implementation or a new type of target type adapter (such as the Kafka Streams target type) - and register it as a Spring bean in the ApplicationContext, in that case, this bean is checked to see if the StreamListener method can be invoked using this implementation of the StreamListenerSetupMethodOrchestrator.

Structural changes in the Apache Kafka Streams binder

  • Binder will be called spring-cloud-stream-binder-kafka-streams starting with Elmhurst.RC1 (2.0.0.RC1 of the binder)
  • Many classes those had KStream in its name are replaced with KafkaStreams(Details below)
  • Many classes are removed from the public API to internal starting with 2.0.0.RC1 (Details below)
  • All the properties that required kstream will need kafka.streams starting from 2.0.0.RC1. For example, earlier it was - spring.cloud.stream.kstream.binder.* or spring.cloud.stream.kstream.bindings.*, but now they are spring.cloud.stream.kafka.streams.binder.* and spring.cloud.stream.kafka.streams.bindings.* respectively.

Classes Renamed or Removed.

The following classes are renamed of removed starting with 2.0.0.RC1

  • Package org.springframework.cloud.stream.binder.kstream -> org.springframework.cloud.stream.binder.kafka.streams

  • Package org.springframework.cloud.stream.binder.kstream.annotations -> org.springframework.cloud.stream.binder.kafka.streams.annotations

  • KStreamProcessor -> KafkaStreamsProcessor

  • KStreamBinderConfiguration -> KafkaStreamsBinderConfiguration

  • KStreamBinderSupportAutoConfiguration -> KafkaStreamsBinderSupportAutoConfiguration

  • KStreamApplicationSupportAutoConfiguration -> KafkaStreamsApplicaitonSupportAutoConfiguration

  • KStreamApplicationSupportProperties -> org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties

  • KStreamBindingProperties -> org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBindingProperties

  • KStreamCommonProperties (Class removed)

  • KStreamConsumerProperties -> org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties

  • KStreamProducerProperties -> org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties

  • KStreamExtendedBindingProperties -> org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties

  • KStreamBinder (No longer available as a public class)

  • KStreamBoundElementFactory (No longer available as a public class)

  • KStreamListenerParameterAdapter (No longer available as a public class)

  • KStreamStreamListenerResultAdapter (No longer available as a public class)

All the availble properties through the kafka streams binder, please refer to the reference docs.