Skip to content

superstreamlabs/superstream.net

Repository files navigation

Superstream

Installation

 dotnet add package Superstream -v ${SUPERSTREAM_VERSION}

Importing

using Superstream;

Producer

To use Superstream with kafka producer, first define the producer configurations:

var config = new ProducerConfig { BootstrapServers = brokerList };
var options = new ProducerBuildOptions(config);

Then create a new instance of kafka producer and use SuperstreamInitializer.Init to initialize the producer with Superstream options:

var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
  .Build();
using var producer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>", kafkaProducer, options);

Finally, to produce messages to kafka, use ProduceAsync or Produce:

producer.ProduceAsync("<topic>", new() { Value = JsonSerializer.SerializeToUtf8Bytes("{\"test_key\":\"test_value\"}") });

Consumer

To use Superstream with kafka consumer, first define the consumer configurations:

var config = new ConsumerConfig
{
  GroupId = "groupid",
  BootstrapServers = brokerList,
  EnableAutoCommit = false
};
var options = new ConsumerBuildOptions(config);

Then create a new instance of kafka consumer and use SuperstreamInitializer.Init to initialize the consumer with Superstream options:

var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
        .Build();
using var consumer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>",kafkaConsumer, options);

Finally, to consume messages from kafka, use Consume:

var consumeResult = consumer.Consume();
Console.WriteLine($"Message : ${Encoding.UTF8.GetString(consumeResult.Message.Value)}");