Skip to content

Commit

Permalink
Pre release (#21)
Browse files Browse the repository at this point in the history
* added Request to context as helper function

* wip poison wg

* fixed child poison with waitgroup

* remove .vscode stuff

* bench

* benching

* ecorrect placement of runtime.LockosThread()

* LOCK_OS_THREAD = false for testing

* Updated README

* minor improvements and private pid separator

* fixed typo

* updated README

* updated README

* added chat example
  • Loading branch information
anthdm committed Mar 3, 2023
1 parent 13efa04 commit 7015eb8
Show file tree
Hide file tree
Showing 33 changed files with 782 additions and 146 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
bin
TODO.txt
TODO
_test
_test
.vscode
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ build:
go build -o bin/eventstream examples/eventstream/main.go
go build -o bin/tcpserver examples/tcpserver/main.go

bench:
go run _bench/main.go

.PHONY: proto
70 changes: 57 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,35 @@
![example workflow](https://github.com/anthdm/hollywood/actions/workflows/build.yml/badge.svg?branch=master)

# Blazingly fast, low latency actors for Golang
Hollywood is an actor engine build for speed and low-latency applications. Benchmarks coming soon...

Hollywood is an ULTRA fast actor engine build for speed and low-latency applications. Think about game servers, advertising brokers, trading engines, etc... It can handle **10 million messages in under 1 second**.

## Features

- lock free LMAX based message queue for low latency messaging
- lock free LMAX based message queue for ultra low latency messaging
- guaranteed message delivery on receiver failure (buffer mechanism)
- fire&forget or request&response messaging, or both.
- dRPC as the transport layer
- Optimized protoBuffers without reflection
- Optimized proto buffers without reflection
- lightweight and highly customizable
- built and optimized for speed
- cluster support with Consul [coming soon...]
- cluster support [coming soon...]

# Benchmarks

```
make bench
```

```
[BENCH HOLLYWOOD LOCAL] processed 1_000_000 messages in 83.4437ms
[BENCH HOLLYWOOD LOCAL] processed 10_000_000 messages in 786.2787ms
[BENCH HOLLYWOOD LOCAL] processed 100_000_000 messages in 7.8718426s
```

# Installation

```
go get github.com/anthdm/hollywood
go get github.com/anthdm/hollywood/...
```

# Quickstart
Expand Down Expand Up @@ -56,7 +70,7 @@ func main() {
```Go
e.Spawn(newFoo, "foo",
actor.WithMaxRestarts(4),
actor.WithInboxSize(999),
actor.WithInboxSize(1024 * 2),
actor.WithTags("bar", "1"),
)
```
Expand Down Expand Up @@ -97,22 +111,52 @@ e.SpawnFunc(func(c *actor.Context) {
time.Sleep(time.Second)
```

# PIDS

### Customize the PID separator.
## Customizing the PID separator

```Go
actor.PIDSeparator = ">"
cfg := actor.Config{
PIDSeparator: "->",
}
e := actor.NewEngine(cfg)
```

After configuring the Engine with a custom PID Separator the string representation of PIDS will look like this:

```
pid := actor.NewPID("127.0.0.1:3000", "foo", "bar", "baz", "1")
// 127.0.0.1:3000->foo->bar->baz->1
```

## Custom middleware

You can add custom middleware to your Receivers. This can be usefull for storing metrics, saving and loading data for your Receivers on `actor.Started` and `actor.Stopped`.

For examples on how to implement custom middleware, check out the middleware folder in the **[examples](https://github.com/anthdm/hollywood/tree/master/examples/middleware)**

Will result in the following PID
## Logging

You can set the log level of Hollywoods log module:

```
// 127.0.0.1:3000>foo>bar>baz>1
import "github.com/anthdm/hollywood/log
log.SetLevel(log.LevelInfo)
```

To disable all logging

```
import "github.com/anthdm/hollywood/log
log.SetLevel(log.LevelPanic)
```

# Test

```
make test
```

# License

Hollywood is licensed under the MIT licence.
31 changes: 0 additions & 31 deletions _bench/bench_test.go

This file was deleted.

63 changes: 63 additions & 0 deletions _bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"runtime"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/log"
"github.com/anthdm/hollywood/remote"
)

func makeRemoteEngine(addr string) *actor.Engine {
e := actor.NewEngine()
r := remote.New(e, remote.Config{ListenAddr: addr})
e.WithRemote(r)
return e
}

func benchmarkRemote() {
var (
a = makeRemoteEngine("127.0.0.1:3000")
b = makeRemoteEngine("127.0.0.1:3001")
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
)
its := []int{
1_000_000,
10_000_000,
}
for i := 0; i < len(its); i++ {
start := time.Now()
for j := 0; j < its[i]; j++ {
a.Send(pidB, pidB)
}
fmt.Printf("[BENCH HOLLYWOOD REMOTE] processed %d messages in %v\n", its[i], time.Since(start))
}
}

func benchmarkLocal() {
e := actor.NewEngine()
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
its := []int{
1_000_000,
10_000_000,
}
payload := make([]byte, 128)
for i := 0; i < len(its); i++ {
start := time.Now()
for j := 0; j < its[i]; j++ {
e.Send(pid, payload)
}
fmt.Printf("[BENCH HOLLYWOOD LOCAL] processed %d messages in %v\n", its[i], time.Since(start))
}
}

func main() {
if runtime.GOMAXPROCS(runtime.NumCPU()) == 1 {
log.Fatalw("Please use a system with more than 1 CPU. Its 2023...", nil)
}
log.SetLevel(log.LevelPanic)
benchmarkLocal()
benchmarkRemote()
}
19 changes: 0 additions & 19 deletions _bench/receiver/main.go

This file was deleted.

12 changes: 10 additions & 2 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actor

import (
"strings"
"time"

"github.com/anthdm/hollywood/log"
"github.com/anthdm/hollywood/safemap"
Expand All @@ -28,10 +29,17 @@ func newContext(e *Engine, pid *PID) *Context {
}
}

// Receiver returns the underlying receiver of this Context.
func (c *Context) Receiver() Receiver {
return c.receiver
}

// See Engine.Request for information. This is just a helper function doing that
// calls Request on the underlying Engine. c.Engine().Request().
func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response {
return c.engine.Request(pid, msg, timeout)
}

// Respond will sent the given message to the sender of the current received message.
func (c *Context) Respond(msg any) {
if c.sender == nil {
Expand All @@ -48,7 +56,7 @@ func (c *Context) Respond(msg any) {
// Hence, all children will receive the Stopped message.
func (c *Context) SpawnChild(p Producer, name string, opts ...OptFunc) *PID {
options := DefaultOpts(p)
options.Name = c.PID().ID + PIDSeparator + name
options.Name = c.PID().ID + pidSeparator + name
for _, opt := range opts {
opt(&options)
}
Expand Down Expand Up @@ -84,7 +92,7 @@ func (c *Context) Forward(pid *PID) {
// GetPID returns the PID of the process found by the given name and tags.
// Returns nil when it could not find any process..
func (c *Context) GetPID(name string, tags ...string) *PID {
name = name + PIDSeparator + strings.Join(tags, PIDSeparator)
name = name + pidSeparator + strings.Join(tags, pidSeparator)
proc := c.engine.Registry.getByID(name)
if proc != nil {
return proc.PID()
Expand Down
17 changes: 7 additions & 10 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestSpawnChildPID(t *testing.T) {
PIDSeparator = ">"
pidSeparator = ">"
var (
e = NewEngine()
wg = sync.WaitGroup{}
Expand All @@ -29,7 +29,7 @@ func TestSpawnChildPID(t *testing.T) {
}, "parent")

wg.Wait()
PIDSeparator = "/"
pidSeparator = "/"
}

func TestChild(t *testing.T) {
Expand Down Expand Up @@ -96,18 +96,14 @@ func TestGetPID(t *testing.T) {

func TestSpawnChild(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
stopwg = sync.WaitGroup{}
e = NewEngine()
wg = sync.WaitGroup{}
)

wg.Add(1)
stopwg.Add(1)

childFunc := func(c *Context) {
switch c.Message().(type) {
case Stopped:
stopwg.Done()
}
}

Expand All @@ -120,9 +116,10 @@ func TestSpawnChild(t *testing.T) {
}, "parent", WithMaxRestarts(0))

wg.Wait()
e.Poison(pid)

stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()

assert.Equal(t, e.deadLetter, e.Registry.get(NewPID("local", "child")))
assert.Equal(t, e.deadLetter, e.Registry.get(pid))
}
9 changes: 5 additions & 4 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actor

import (
"reflect"
"sync"

"github.com/anthdm/hollywood/log"
)
Expand Down Expand Up @@ -34,7 +35,7 @@ func (d *deadLetter) Send(dest *PID, msg any, sender *PID) {
})
}

func (d *deadLetter) PID() *PID { return d.pid }
func (d *deadLetter) Shutdown() {}
func (d *deadLetter) Start() {}
func (d *deadLetter) Invoke([]Envelope) {}
func (d *deadLetter) PID() *PID { return d.pid }
func (d *deadLetter) Shutdown(_ *sync.WaitGroup) {}
func (d *deadLetter) Start() {}
func (d *deadLetter) Invoke([]Envelope) {}

0 comments on commit 7015eb8

Please sign in to comment.