Skip to content

bsideup/liiklus

Repository files navigation

Liiklus

Liiklus [li:klus] ("traffic" in Estonian) - RSocket/gRPC-based Gateway for the event-based systems from the ones who think that Kafka is too low-level.

Why

  • horizontally scalable RSocket/gRPC streaming gateway
  • supports as many client languages as RSocket+gRPC do (Java, Go, C++, Python, etc...)
  • reactive first
  • Per-partition backpressure-aware sources
  • at-least-once/at-most-once delivery guarantees
  • pluggable event storage (Kafka, Pulsar, Kinesis, etc...)
  • pluggable positions storage (DynamoDB, Cassandra, Redis, etc...)
  • WIP: cold event storage support (S3, Minio, SQL, key/value, etc...)

Who is using

  • https://vivy.com/ - 25+ microservices, an abstraction in front of Kafka for the Shared Log Infrastructure (Event Sourcing / CQRS)

Quick Start

The easiest (and recommended) way to run Liiklus is with Docker:

$ docker run \
    -e kafka_bootstrapServers=some.kafka.host:9092 \
    -e storage_positions_type=MEMORY \ # only for testing, DO NOT use in production
    -p 6565:6565 \
    bsideup/liiklus:$LATEST_VERSION

Where the latest version is:

Now use LiiklusService.proto to generate your client.

The clients must implement the following algorithm:

  1. Subscribe to the assignments:
    stub.subscribe(SubscribeRequest(
        topic="your-topic",
        group="your-consumer-group",
        [autoOffsetReset="earliest|latest"]
    ))
    
  2. For every emitted reply of Subscribe, using the same channel, subscribe to the records:
    stub.receive(ReceiveRequest(
        assignment=reply.getAssignment()
    ))
    
  3. ACK records
    stub.ack(AckRequest(
        assignment=reply.getAssignment(),
        offset=record.getOffset()
    ))
    
    Note 1: If you ACK record before processing it you get at-most-once, after processing - at-least-once
    Note 2: It's recommended to ACK every n-th record, or every n seconds to reduce the load on the positions storage

Java example:

Example code using Project Reactor and reactive-grpc:

var stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
stub
    .subscribe(
        SubscribeRequest.newBuilder()
            .setTopic("user-events")
            .setGroup("analytics")
            .setAutoOffsetReset(AutoOffsetReset.EARLIEST)
            .build()
    )
    .flatMap(reply -> stub
        .receive(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build())
        .window(1000) // ACK every 1000th records
        .concatMap(
            batch -> batch
                .map(ReceiveReply::getRecord)
                // TODO process instead of Mono.delay(), i.e. by indexing to ElasticSearch
                .concatMap(record -> Mono.delay(Duration.ofMillis(100)))
                .sample(Duration.ofSeconds(5)) // ACK every 5 seconds
                .onBackpressureLatest()
                .delayUntil(record -> stub.ack(
                    AckRequest.newBuilder()
                        .setAssignment(reply.getAssignment())
                        .setOffset(record.getOffset())
                        .build()
                )),
            1
        )
    )
    .blockLast()

Also check examples/java/ for a complete example

Configuration

The project is based on Spring Boot and uses it's configuration system
Please check application.yml for the available configuration keys.

License

See LICENSE.