Skip to content

caravan/streaming

Repository files navigation

Caravan Streaming

Go Report Card Build Status Test Coverage Maintainability GitHub

Caravan is a set of in-process message streaming tools for Go applications. Think "Kafka", but for the internal workings of your software. Caravan Streaming includes basic features for building Message Streams and Tables.

This is a work in progress. The basics are there, but not yet ready for production use. Use at your own risk

Example

Creates a Producer and two Consumers, each of which consumes from the Producer independently.

package main

import (
	"fmt"
	"math/rand"

	"github.com/caravan/essentials"
	"github.com/caravan/streaming"
	"github.com/caravan/streaming/stream/node"
)

func main() {
	// Create new topics with permanent retention
	left := essentials.NewTopic[int]()
	right := essentials.NewTopic[int]()
	out := essentials.NewTopic[int]()

	s := streaming.NewStream(
		node.Join(
			node.Bind(
				node.TopicConsumer(left),
				node.Filter(func(i int) bool {
					// Filter out numbers greater than or equal to 200
					return i < 200
				}),
			),
			node.Bind(
				node.TopicConsumer(right),
				node.Filter(func(i int) bool {
					// Filter out numbers less than or equal to 100
					return i > 100
				}),
			),
			func(l int, r int) bool {
				// Only join if the left is even, and the right is odd
				return l%2 == 0 && r%2 == 1
			},
			func(l int, r int) int {
				// Join by multiplying the numbers
				return l * r
			},
		),
		node.TopicProducer(out),
	).Start()

	done := make(chan struct{})

	go func() {
		// Start sending stuff to the topic
		lp := left.NewProducer()
		rp := right.NewProducer()
		for i := 0; i < 10000; i++ {
			lp.Send() <- rand.Intn(1000)
			rp.Send() <- rand.Intn(1000)
		}
		lp.Close()
		rp.Close()
	}()

	go func() {
		c := out.NewConsumer()
		for i := 0; i < 10; i++ {
			// Display the first ten that come out
			fmt.Println(<-c.Receive())
		}
		c.Close()
		close(done)
	}()

	<-done
	_ = s.Stop()
}

About

Basic features for building Streams and Tables

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published