Skip to content

MouMangTai/spring-cloud-stream-binder-dapr

Repository files navigation

Spring Cloud Stream Dapr Binder Reference Guide

Reference Guide

1. Usage

To use Dapr binder, you need to add spring-cloud-stream-binder-dapr as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:

<dependency>
   <groupId>com.azure.spring</groupId>
   <artifactId>spring-cloud-stream-binder-dapr</artifactId>
</dependency>

Alternatively, you can use the Spring Cloud Stream Dapr Starter, as follows:

<dependency>
   <groupId>com.azure.spring</groupId>
   <artifactId>spring-cloud-starter-stream-dapr</artifactId>
</dependency>

2. Dapr Binder Overview

Dapr uses a modular design where functionality is delivered as a component. Each component has an interface definition, All of the components are pluggable. The Pub Sub components provide a common way to interact with different message bus implementations to achieve reliable, high-scale scenarios based on event-driven async communications, while allowing users to opt-in to advanced capabilities using defined metadata.

Spring Cloud Stream Binder abstraction makes it possible for a Spring Cloud Stream application to be flexible in how it connects to middleware.

Combining spring cloud stream binder and dapr, on the one hand dapr can make up the dependent on a specific middleware library for Spring Cloud Stream, on the other hand, spring cloud stream can help dapr and dapr client decoupling.Therefore, the combination of the two is conducive to the maximum decoupling and enhanced features of both sides.

The following simplified diagram shows how the Dapr binder operates:

176439470 64c42ea4 ebff 48a5 81a3 e3f11bb87387
Figure 1. Dapr Binder

The Dapr Binder implementation maps each destination to a Dapr Topic. The message is sent to the specified topic, and then get message by subscribing to topic. For each consumer group, a Queue is bound to that Topic. Each consumer instance has a corresponding Dapr Consumer instance for its group’s Queue. Specify pubsubName to call the predefined Dapr Pub Sub component.

3. Configuration Options

This section contains the configuration options used by the Dapr binder.

For common configuration options and properties pertaining to the binder, see the binding properties in core documentation.

3.1. Dapr Binder Properties

The following properties are available for Dapr binder only and must be prefixed with spring.cloud.stream.dapr.binder..

spring.cloud.stream.dapr.binder.daprIp

The parameter of the channel, indicating the IP address of the dapr sidecar to which the message is finally sent through the channel.

Default: 127.0.0.1

spring.cloud.stream.dapr.binder.daprPort

The parameter of channel, indicating the port that the dapr sidecar to which the message is finally sent through the channel listens.

Default: 50001

spring.cloud.stream.dapr.binder.negotiationType

NegotiationType decide which connection method to use. TLS and PLAINTEXT are currently available.

  • PLAINTEXT: Use of a plaintext connection to the server. By default a secure connection mechanism such as TLS will be used. Should only be used for testing or for APIs where the use of such API or the data exchanged is not sensitive. This assumes prior knowledge that the target of this channel is using plaintext. It will not perform HTTP/1.1 upgrades.

  • TLS: Makes the client use TLS.

    Default: PLAINTEXT

spring.cloud.stream.dapr.binder.authority

Overrides the authority used with TLS and HTTP virtual hosting. It does not change what host is actually connected to. Is commonly in the form host:port.

This method is intended for testing, but may safely be used outside of tests as an alternative to DNS overrides.

spring.cloud.stream.dapr.binder.defaultLoadBalancingPolicy

Sets the default load-balancing policy that will be used if the service config doesn’t specify one.

This method is implemented by all stock channel builders that are shipped with gRPC, but may not be implemented by custom channel builders, in which case this method will throw.

spring.cloud.stream.dapr.binder.idleTimeout

Set the duration without ongoing RPCs before going to idle mode.

In idle mode the channel shuts down all connections, the NameResolver and the LoadBalancer. A new RPC would take the channel out of idle mode. A channel starts in idle mode. Defaults to 30 minutes.

This is an advisory option. Do not rely on any specific behavior related to this option.

TimeUnit: minutes

spring.cloud.stream.dapr.binder.keepAliveTime

Sets the time without read activity before sending a keepalive ping. An unreasonably small value might be increased, and Long.MAX_VALUE nano seconds or an unreasonably large value will disable keepalive. Defaults to infinite.

Clients must receive permission from the service owner before enabling this option. Keepalives can increase the load on services and are commonly "invisible" making it hard to notice when they are causing excessive load. Clients are strongly encouraged to use only as small of a value as necessary.

TimeUnit: minutes

spring.cloud.stream.dapr.binder.keepAliveTimeout

Sets the time waiting for read activity after sending a keepalive ping. If the time expires without any read activity on the connection, the connection is considered dead. An unreasonably small value might be increased. Defaults to 20 seconds.

This value should be at least multiple times the RTT to allow for lost packets.

TimeUnit: seconds

spring.cloud.stream.dapr.binder.perRpcBufferLimit

Sets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded. The implementation may only estimate the buffer size being used rather than count the exact physical memory allocated. It does not have any effect if retry is disabled by the client.

This method may not work as expected for the current release because retry is not fully implemented yet.

spring.cloud.stream.dapr.binder.retryBufferSize

Sets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel. The implementation may only estimate the buffer size being used rather than count the exact physical memory allocated. The method does not have any effect if retry is disabled by the client.

This method may not work as expected for the current release because retry is not fully implemented yet.

spring.cloud.stream.dapr.binder.keepAliveWithoutCalls

Sets whether keepalive will be performed when there are no outstanding RPC on a connection. Defaults to false.

Clients must receive permission from the service owner before enabling this option. Keepalives on unused connections can easilly accidentally consume a considerable amount of bandwidth and CPU. idleTimeout() should generally be used instead of this option.

spring.cloud.stream.dapr.binder.maxInboundMessageSize

Sets the maximum message size allowed to be received on the channel. If not called, defaults to 4 MiB. The default provides protection to clients who haven’t considered the possibility of receiving large messages while trying to be large enough to not be hit in normal usage.

This method is advisory, and implementations may decide to not enforce this. Currently, the only known transport to not enforce this is InProcessTransport.

spring.cloud.stream.dapr.binder.maxInboundMetadataSize

Sets the maximum size of metadata allowed to be received. Integer.MAX_VALUE disables the enforcement. The default is implementation-dependent, but is not generally less than 8 KiB and may be unlimited.

This is cumulative size of the metadata. The precise calculation is implementation-dependent, but implementations are encouraged to follow the calculation used for HTTP/2’s SETTINGS_MAX_HEADER_LIST_SIZE . It sums the bytes from each entry’s key and value, plus 32 bytes of overhead per entry.

spring.cloud.stream.dapr.binder.maxRetryAttempts

Sets the maximum number of retry attempts that may be configured by the service config. If the service config specifies a larger value it will be reduced to this value. Setting this number to zero is not effectively the same as disableRetry() because the former does not disable transparent retry .

This method may not work as expected for the current release because retry is not fully implemented yet.

spring.cloud.stream.dapr.binder.maxHedgedAttempts

Sets the maximum number of hedged attempts that may be configured by the service config. If the service config specifies a larger value it will be reduced to this value.

This method may not work as expected for the current release because retry is not fully implemented yet.

spring.cloud.stream.dapr.binder.maxTraceEvents

Sets the maximum number of channel trace events to keep in the tracer for each channel or subchannel. If set to 0, channel tracing is effectively disabled.

3.2. Dapr Producer Properties

The following properties are available for Dapr producers only and must be prefixed with spring.cloud.stream.dapr.bindings.<bindingTarget>.producer..

pubsubName

Specifies the name of the Pub/Sub component.

Note
PubsubName must be specified and has no default value.

3.3. Dapr Consumer Properties

3.4. Dapr Message Headers

The following table illustrates how Dapr message properties are mapped to Spring message headers.

Dapr Message Properties

Spring Message Header Constants

Type

Description

contentType

DaprHeaders#CONTENT_TYPE

String

The contentType tells Dapr which content type your data adheres to when constructing a CloudEvent envelope.

ttlInSeconds

DaprHeaders#TTL_IN_SECONDS

Long

The number of seconds for the message to expire.

rawPayload

DaprHeaders#RAW_PAY_LOAD

Boolean

Determine if Dapr should publish the event without wrapping it as CloudEvent. Not using CloudEvents disables support for tracing, event deduplication per messageId, content-type metadata, and any other features built using the CloudEvent schema.

specifiedBrokerMetadata

DaprHeaders#SPECIFIED_Broker_METADATA

Map<String, String>

Some metadata parameters are available based on each pubsub broker component.

Migration Guide

1. Update dependency

Remove spring-cloud-azure-stream-binder-eventhubs dependencies.

<dependency>
  <groupId>com.azure.spring</groupId>
  <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Add spring-cloud-stream-binder-dapr dependencies.

<dependency>
  <groupId>com.azure.spring</groupId>
  <artifactId>spring-cloud-stream-binder-dapr</artifactId>
</dependency>

2. Update application.yaml

  • Remove all configurations prefixed with spring.cloud.azure.eventhubs..

  • Remove all configurations prefixed with spring.cloud.stream.eventhubs..

  • Add configurations prefixed with spring.cloud.stream.dapr. and specify pubsubName.

The final pubsub.yaml is as follows:

  • spring.cloud.stream.bindings.<binding>.destination is configured the topic specified for sending messages.

  • spring.cloud.stream.dapr.bindings.<binding>.producer.pubsubName is configured the name of the Pub/Sub component specified for sending messages.

  • spring.cloud.stream.dapr.bindings.<binding>.consumer.pubsubName is configured the name of the Pub/Sub component specified for receiving messages.

spring:
  cloud:
    stream:
      function:
        definition: supply;consume
      bindings:
        supply-out-0:
          destination: <AZURE_EVENTHUB_NAME>
        consume-in-0:
          destination: <AZURE_EVENTHUB_NAME>
      dapr:
        bindings:
          supply-out-0:
            producer:
              pubsubName: eventhubs-pubsub
          consume-in-0:
            consumer:
              pubsubName: eventhubs-pubsub

3. Configure Azure Event Hubs component

Dapr integrates with Pub/Sub message buses to provide applications with the ability to create event-driven, loosely coupled architectures where producers send events to consumers via topics.

Dapr supports the configuration of multiple, named, Pub/Sub components per application. Each Pub/Sub component has a name and this name is used when publishing a message topic.

Pub/Sub components are extensible. A list of support Pub/Sub components is here and the implementations can be found in the components-contrib repo.

In this example, we configure the Azure Event Hubs Pub/Sub component described using the pubsub.yaml file:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    - name: connectionString
      value: "<AZURE_CONNECTION_STRING>"
    - name: storageAccountName
      value: "<AZURE_STORAGE_ACCOUNT_NAME>"
    - name: storageAccountKey
      value: "<AZURE_STORAGE_ACCOUNT_KEY>"
    - name: storageContainerName
      value: "<AZURE_STORAGE_CONTAINER_NAME>"

Follow the instructions here to manage the storage account access keys. See here on how to get the Event Hubs connection string.

4. Run application with Dapr sidecar

dapr run --app-id dapr-app --app-port 9090 --components-path ${componentsPath}  --app-protocol grpc --dapr-grpc-port ${daprPort} mvn spring-boot:run

This command specifies:

  • the id for your application with --app-id local-dapr, used for service discovery.

  • the port your application is listening on (default -1) with --app-port 9090.

  • the path for components directory with --components-path ./components.

  • the protocol (gRPC or HTTP) Dapr with --app-protocol grpc uses to talk to the application.

  • the gRPC port for Dapr to listen on (default -1) with --dapr-grpc-port 50001

5. Clean Up

To stop your services from running, simply stop the dapr run process. Alternatively, you can spin down your services with the Dapr CLI dapr stop command.

dapr stop --app-id dapr-app

Appendices

Appendix A: Building

Basic Compile and Test

Pre-requisites:

  • To compile, JDK 1.8 installed.

The build uses the Maven wrapper so you don’t have to install a specific version of Maven. The main build command is

$ ./mvnw clean install

You can also add -DskipTests if you like, to avoid running the tests.

Note
You can also install Maven (>=3.3.3) yourself and run the mvn command in place of ./mvnw in the examples below.
Note
Be aware that you might need to increase the amount of memory available to Maven by setting a MAVEN_OPTS environment variable with a value like -Xmx512m -XX:MaxPermSize=128m. We try to cover this in the .mvn configuration, so if you find you have to do it to make a build succeed, please raise a ticket to get the settings added to source control.

Working with the code

If you don’t have an IDE preference we would recommend that you use Spring Tools Suite or Eclipse when working with the code. We use the m2eclipe eclipse plugin for maven support. Other IDEs and tools should also work without issue.

Importing into eclipse with m2eclipse

We recommend the m2eclipe eclipse plugin when working with eclipse. If you don’t already have m2eclipse installed it is available from the "eclipse marketplace".

Unfortunately m2e does not yet support Maven 3.3, so once the projects are imported into Eclipse you will also need to tell m2eclipse to use the .settings.xml file for the projects. If you do not do this you may see many different errors related to the POMs in the projects. Open your Eclipse preferences, expand the Maven preferences, and select User Settings. In the User Settings field click Browse and navigate to the Spring Cloud project you imported selecting the .settings.xml file in that project. Click Apply and then OK to save the preference changes.

Note
Alternatively you can copy the repository settings from .settings.xml into your own ~/.m2/settings.xml.

Importing into eclipse without m2eclipse

If you prefer not to use m2eclipse you can generate eclipse project metadata using the following command:

$ ./mvnw eclipse:eclipse

The generated eclipse projects can be imported by selecting import existing projects from the file menu.

Appendix B: Contributing

Spring Cloud is released under the non-restrictive Apache 2.0 license, and follows a very standard Github development process, using Github tracker for issues and merging pull requests into main. If you want to contribute even something trivial please do not hesitate, but follow the guidelines below.

Sign the Contributor License Agreement

Before we accept a non-trivial patch or pull request we will need you to sign the contributor’s agreement. Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. Active contributors might be asked to join the core team, and given the ability to merge pull requests.

Code Conventions and Housekeeping

None of these is essential for a pull request, but they will all help. They can also be added after the original pull request but before a merge.

  • Use the Spring Framework code format conventions. If you use Eclipse you can import formatter settings using the eclipse-code-formatter.xml file from the Spring Cloud Build project. If using IntelliJ, you can use the Eclipse Code Formatter Plugin to import the same file.

  • Make sure all new .java files to have a simple Javadoc class comment with at least an @author tag identifying you, and preferably at least a paragraph on what the class is for.

  • Add the ASF license header comment to all new .java files (copy from existing files in the project)

  • Add yourself as an @author to the .java files that you modify substantially (more than cosmetic changes).

  • Add some Javadocs and, if you change the namespace, some XSD doc elements.

  • A few unit tests would help a lot as well — someone has to do it.

  • If no-one else is using your branch, please rebase it against the current main (or other target branch in the main project).

  • When writing a commit message please follow these conventions, if you are fixing an existing issue please add Fixes gh-XXXX at the end of the commit message (where XXXX is the issue number).