Skip to content

a-h/stream

Repository files navigation

stream

DynamoDB backed event sourced database library.

Usage

Define your state model by implementing the state.State interface. It has a single method that processes inbound events, and returns outbound events to send.

Here's an example that processes inbound events and when the batch size of input events is reached and outbound event is sent.

func NewBatchState() *BatchState {
	return &BatchState{
		BatchSize: 2,
	}
}

// BatchState outputs a BatchOutput when BatchSize BatchInputs have been received.
type BatchState struct {
	BatchSize      int
	BatchesEmitted int
	Values         []int
}

func (s *BatchState) Process(event InboundEvent) (outbound []OutboundEvent, err error) {
	switch e := event.(type) {
	case BatchInput:
		s.Values = append(s.Values, e.Number)
		if len(s.Values) >= s.BatchSize {
			outbound = append(outbound, BatchOutput{Numbers: s.Values})
			s.BatchesEmitted++
			s.Values = nil
		}
		break
	}
	return
}

The inbound and outbound events must implement the InboundEvent and OutboundEvent interfaces respecively:

type BatchInput struct {
	Number int
}

func (bi BatchInput) EventName() string { return "BatchInput" }
func (bi BatchInput) IsInbound()        {}

type BatchOutput struct {
	Numbers []int
}

func (bo BatchOutput) EventName() string { return "BatchOutput" }
func (bo BatchOutput) IsOutbound()       {}

Wire up the ./main/handler to DynamoDB streams to send outbound events to EventBridge.

Examples

See the ./example directory for a complete example.

About

Event sourced Go with DynamoDB.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published