Skip to content
/ oni Public

๐Ÿ‘บ Echo or Gin like kafka API wrapper for kafka-go

License

Notifications You must be signed in to change notification settings

xoxoist/oni

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

45 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Oni Kafka Framework

xoxoist codecov Sourcegraph Go Report Card GoDoc License

Oni is Kafka Framework written in Go (Golang). that makes you easy to consume and produce kafka messages using robust API wrapper for kafka-go thanks to segmentio. the usage most likely same with Gin / Echo web framework.

Oni Mask art by @inksyndromeartwork

Contents

Installation

  1. Required go installed on your machine
go version
  1. Get oni and kafka-go
go get -u github.com/xoxoist/oni
go get -u github.com/segmentio/kafka-go
  1. Import oni
import "github.com/xoxoist/oni"

Quick Start

  1. Create package model and create foo.go file and place this code to it
package model

type Foo struct {
	FooContent string `json:"foo_content"`
}
  1. Create package consumer and create main.go file and place this code to it
package main

import (
	"context"
	"fmt"
	"github.com/xoxoist/oni"
	"github.com/your/projectname/model"
	"github.com/segmentio/kafka-go"
	"syscall"
	"time"
)

func main() {
	ctx := context.Background()
	defer ctx.Done()

	// initialize consumer
	stream := oni.NewStream(kafka.ReaderConfig{
		Brokers: []string{
			"localhost:8097", // kafka brokers 1
			"localhost:8098", // kafka brokers 2
			"localhost:8099", // kafka brokers 3 you can only define one inside array
		},
		Topic: "foos",  // topic you want to listen at 
		GroupID: "consumer-group-foos",
	})
	foosConsumer := oni.NewConsumer(stream)
	foosConsumer.Handler(
		"create.foo", // event key you want to map to specific handler function
		func(ctx oni.Context) error {
			var foo model.Foo
			err := ctx.ShouldBindJSON(&foo) // bind message value to struct
			if err != nil {
				return err
			}
			fmt.Println(fmt.Sprintf("key=%s value=%s foo=%s", ctx.KeyString(), ctx.ValueString(), foo))
			return nil
		},
	)

	// initialize oni runner
	// to help start and graceful shutdown all producer and consumer you defined
	oniRunner := oni.Runner{
		Context: ctx,
		Timeout: 15 * time.Second,
		Syscall: oni.SyscallOpt(
			syscall.SIGINT,
			syscall.SIGTERM,
			syscall.SIGHUP,
		),
		Consumers: oni.ConsumerOpt(foosConsumer),
	}
	oniRunner.Start()
}
  1. Create package producer and create main.go file and place this code to it
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/your/projectname/model"
	"github.com/segmentio/kafka-go"
	"time"
)

func main() {
	foos := &kafka.Writer{
		Addr:  kafka.TCP("localhost:8097"), // kafka broker
		Topic: "foos",                      // target topic you want to send
	}

	// create json object using json.Marshal
	fooObj := model.Foo{
		FooContent: fmt.Sprintf("This is new foo %d", time.Now().Unix()),
	}
	fooByte, _ := json.Marshal(fooObj)

	err := foos.WriteMessages(context.Background(), kafka.Message{
		Key:   []byte("create.foo"), // target event you want to send at
		Value: fooByte,              // fooObj marshal result as the value
	})
	fmt.Println(err)
}
  1. Start run consumer/main.go and run producer/main.go separately

API Examples

Stream

  • oni.NewStream(cfg kafka.ReaderConfig)

    // example for oni.NewStream(cfg kafka.ReaderConfig)
    stream := oni.NewStream(kafka.ReaderConfig{
        Brokers: []string{"localhost:8097"},
        Topic: "foos",
        GroupID: "consumer-group-foos",
    })
  • end

Consumer

  • oni.NewConsumer(stream *oni.Stream) IConsumer

    // example for oni.NewConsumer(stream *oni.Stream)
    // default consume mode is implicit
    consumer := oni.NewConsumer(stream)
  • IConsumer.ErrorHandler(callbackFunc ErrorCallbackFunc)

    // set global error handler which will be invoked when oni.HandlerFunc returns error
    // this function should be called before handler creation and only called once 
    consumer.ErrorHandler(func (err error) {
        if err != nil {
            // do error handling logic such as logging
            // or handle special error cases
        }
    })
  • IConsumer.Implicit()

    // set consume mode to implicit which means every message
    // received by *oni.Stream will be automatically ack or committed
    // this function should be called before handler creation
    consumer.Implicit()
  • IConsumer.Explicit()

    // set consume mode to explicit which means every message 
    // received by *oni.Stream will be ack or committed manually using Context.Ack()
    // this function should be called before handler creation
    consumer.Explicit()
  • IConsumer.Group(keyGroup string) *Consumer

    // create new group of consumer key event prefix, for example `event.notification.blast`
    // could be had last suffix like `email.channel` and `sms.channel` so your next handler
    // should be had key event `email.channel` and `sms.channel` and actual handler event key 
    // for each handler should look like this
    // `event.notification.blast.email.channel`
    // `event.notification.blast.sms.channel`
    // and previous *oni.Stream behavior should inherit to new group 
    notificationBlastEvent := consumer.Group("event.notification.blast")
    notificationBlastEvent.Handler("email.channel", func (ctx oni.Context) error {})
    notificationBlastEvent.Handler("sms.channel", func (ctx oni.Context) error {})
  • IConsumer.Handler(key string, handlerFunc ...HandlerFunc)

    // create handler function for specific key event, for this example is `event.send.email`
    // message that produced to `event.send.email` key will be received by handler function
    // defined in this handler creation and message value and details will be propagated through
    // oni.Context
    consumer.Handler("event.send.email", func (ctx oni.Context) error {
        // put business logic here
        return nil
    })
  • IConsumer.Producer(name string, producerFunc ProducerFunc)

    // create producer that can be accessed by its name through oni.Context functions that
    // allows business logic to access producer by the name and use it for sending message
    // to targeted topic, can be defined multiple times and only created when producer get
    // called by oni.Context function and closed after sent the message  
    consumer.Producer("notification_producer", func() *kafka.Writer {
        // you can define using *kafka.Writer struct or you can use templates from oni
        // by returning this oni.BasicWriter(addr net.Addr, topic string) function.
        return &kafka.Writer{
            Addr:  kafka.TCP("localhost:8097"),
            Topic: "notification",
        }
    })
  • end

Context

  • Context.ShouldBindJSON(v interface{}) error

    func (ctx oni.Context) error {
        var foo model.Foo
        
        // binding received message value into model.Foo struct
        err := ctx.ShouldBindJSON(&foo)
        if err != nil {
            return err
        }
    
        return nil
    }
  • Context.ShouldRetryWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message to `retries` topic to be re-processed in side `main` topic  
        err := ctx.ShouldRetryWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
  • Context.ShouldErrorWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message to `failures` topic to be marked as invalid request format 
        // or any system failures possible
        err := ctx.ShouldErrorWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
  • Context.ShouldReturnWith(producerFuncName string) error

    func (ctx oni.Context) error {
        // send back message from `retries` topic to `main` topic to be re-processed
        // this function should be called only inside `retries` topic oni.HandlerFunc 
        err := ctx.ShouldReturnWith("producer_name")
        if err != nil {
            return err
        }
    
        return nil
    }
  • Context.Ack() error

    func (ctx oni.Context) error {
        // ack-knowledge or commit message, this function only valid when using explicit
        // consume mode because explicit mode doesn't automatically commit messages 
        err := ctx.Ack()
        if err != nil {
            return err
        }
    
        return nil
    }
  • Context.ValueBytes() []byte

    func (ctx oni.Context) error {
        // returns message value as []byte
        ctx.ValueBytes()
        return nil
    }
  • Context.ValueString() string

    func (ctx oni.Context) error {
        // returns message value as string
        ctx.ValueString()
        return nil
    }
  • Context.KeyBytes() []byte

    func (ctx oni.Context) error {
        // returns message key as []byte
        ctx.KeyBytes()
        return nil
    }
  • Context.KeyString() string

    func (ctx oni.Context) error {
        // returns message key as string
        ctx.KeyString()
        return nil
    }
  • Context.Message() kafka.Message

    func (ctx oni.Context) error {
        // returns all message details
        ctx.Message()
        return nil
    }
  • Context.ReaderStats() kafka.ReaderStats

    func (ctx oni.Context) error {
        // returns all reader stats
        ctx.ReaderStats()
        return nil
    }
  • Context.ReaderConfig() kafka.ReaderConfig

    func (ctx oni.Context) error {
        // returns all reader configurations
        ctx.ReaderConfig()
        return nil
    }
  • Context.GetProducer(producerFuncName string) *kafka.Writer

    func (ctx oni.Context) error {
        // return find producer using its name, registered by this function
        // IConsumer.Producer(name string, producerFunc ProducerFunc)
        // to be used for sending message to topic you want
        ctx.GetProducer("producer_name")
        return nil
    }
  • Context.OuterContext() context.Context

    func (ctx oni.Context) error {
        // return outer context that signed in the first creation of consumer
        // can be used for getting key-value data inside it
        ctx.OuterContext()
        return nil
    }
  • Context.FindKey(key string) interface{}

    func (ctx oni.Context) error {
        // find key inside outer context and return it as interface{}
        ctx.FindKey("context_key_name")
        return nil
    }
  • Context.CreateKeyVal(key string, val interface{})

    func (ctx oni.Context) error {
        // create key with its value into outer context
        ctx.CreateKeyVal("key_name", "this is value put whatever you want inside here")
        return nil
    }
  • end