Skip to content

Flows, subtasks and more to integrate Cortex with a Kafka broker

License

Notifications You must be signed in to change notification settings

IntelligentAutomationCommunity/CTX-Kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 

Repository files navigation

CTX-Kafka

This repository contains an example flow, subtasks, and a Cortex-Kafka-Gateway sample implementation to connect Cortex to a kafka broker.

Cortex-Kafka Gateway

This provides the connectivity between Cortex and a Kafka broker. The Cortex-Kafka Gateway performs the following activities:

  • Subscribes to one or more Kafka topics available on the Kafka broker, and when a message is received starts an instance of a flow in Cortex
  • Listens on a port for a HTTP JSON message and posts it to the appropriate Kafka topic

This implementation is provided as a set of Node.js source files, and is for demonstration purposes only. It is not recommended as is for production use. The implementation will need to be un-zipped and then built using the command docker build . (with any additional parameters you may require. It can then be executed in a docker container - remember to map appropriately the docker host ports to the docker container ports.

Configuration

The file at config/config.json defines the configuration for the gateway and is split into three sections:

Kafka

This section defines how the Cortex-Kafka-gateway interacts with the Kafka broker. It has the following fields:

  • clientId: The name of the instance of the Cortex-Kafka Gateway
  • brokers: An array of objects representing Kafka broker addresses; each object has fields host and port
  • security: a structure containing a single element, SSL which takes values true or false
  • consumer: a structure containing two fields: -- groupId: A group Id -- topics: a list of objects; each object has fields topic (the name of the topic) and (optionally) fromBeginning

Cortex

This section defines how the Cortex-Kafka Gateway interacts with the Cortex Intelligent Automation platform when a Kafka message is received. It has the following fields:

  • api a structure defining the Cortex Flow API configuration to use to initiate a flow. It has fields server, https, port, async, contentType and verb
  • authorization: a structure containing the authorisation to use on the Cortex Flow API. It has fields type, username and password
  • flow: the name of the Cortex flow to execute; this must have a global structure variable named ($)Message
  • initiator: the owner of the Cortex flow execution

restAPI

This section defines the HTTP REST API interface that Cortex will use to post messages to a Kafka topic. It has the following fields:

  • port: the local port number on which the Cortex-Kafka Gateway will listen
  • path: the initial path used

CTX-Kafka-Send-Message

This subtask will send a message to a Kafka topic (using the Cortex-Kafka Gateway).

Input parameters

Name Type M/O Description
i_Message Text O The message to send. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided
i_Message-Structure Sructue O The message structure to send; this can be created using the subtask CTX-Kafka-Create-Message-Structure. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided
i_Message-List List O A list of message structures to send; each structure can be created using the subtask CTX-Kafka-Create-Message-Structure. Note that one of i_Message, i_Message-Structure and i_Message-List must be provided
i_Gateway-base-url Text M The Url (including the port) for the Cortex-Kafka Gateway
i_Topic Text M The Kafka topic to which the message is posted
i_Timeout Integer O The time to await a response in ms
i_acks Integer O Controls the number of required acks: -1 = all insync replicas must acknowledge; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge
i_compression Text O The name of the Kafka compression codec to use

Output parameters

This subtask returns a structure, o_result with fields CODE and REASON

CTX-Kafka-Create-Message-Structure

This subtask creates a message strcture which can be used in the i_Message-Structure or i_Message-List input parameters of the CTX-Kafka-Send-Message subtask.

Input parameters

Name| Type| M/O | Description| i_Message|Text|M|The message to send. i_key|Text|O|The key for the message i_partition|Text|O|The partition for the message

Output parameters

This subtask returns a structure, o_message-structure

CTX-Kafka-Example-Flow

This flow is an example flow which can be called by the Cortex-kafka Gateway when a message is receved on a subscribed topic. The flow simply logs the message and then posts a message to a different Kafka topic using the CTX-Kafka-Send-Message subtask.

This flow uses the [CTX_Configuration-Store](https://github.com/CortexIntelligentAutomation/CTX-Configuration-Store() area Kafka-gateway which has the following parameters:

  • URL The URL (including port) on which the Cortex-Kafka Gateway is listening
  • Default-topic The Kafka topic to which the response message is posted

Installation Instructions

Download the Studio Package file and Import it into your Cortex Environment. Don't forget to apply rights using the Studio Authorization module.

Dependencies

This library depends upon the following libraries:

👍 Enjoy! 😉