Skip to content

butuzov/harmony

Repository files navigation

harmony Coverage Status build status MIT License

Generic Concurrency Patterns Library

Reference (generated by gomarkdoc)

import "github.com/butuzov/harmony"

Package harmony provides generic concurrency patterns library, created for educational proposes by it's author. It provides next patterns:

  • Bridge
  • FanIn
  • Feature
  • OrDone / OrWithDone / OrWithContext
  • Pipeline
  • Queue
  • Tee
  • WorkerPool
Example (Fastest Sqrt)

What SQRT funtion is faster? Complex example that shows the combination of few patterns Queue, Tee, FanIn patterns.

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
	"log"
	"math/rand"
	"time"
)

func main() {
	// the fastert square root cracker....
	type Report struct {
		Method string
		Value  uint64
	}

	var (
		// Babylonian method
		sqrtBabylonian = func(n uint64) Report {
			var (
				o = float64(n) // Original value as float64
				x = float64(n) // x of binary search
				y = 1.0        // y of binary search
				e = 1e-5       // error
			)

			for x-y > e {
				x = (x + y) / 2
				y = o / x
				// fmt.Printf("y=%f, x=%f.", y, x)
			}

			return Report{"Babylonian", uint64(x)}
		}

		// Bakhshali method
		sqrtBakhshali = func(n uint64) Report {
			iterate := func(x float64) float64 {
				a := (float64(n) - x*x) / (2 * x)
				xa := x + a
				return xa - ((a * a) / (2 * xa))
			}

			var (
				o = float64(n)
				x = float64(n) / 2.0
				e = 1e-5
			)

			for x*x-o > e {
				x = iterate(x)
			}
			return Report{"Bakhshali", uint64(x)}
		}
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch, _ := harmony.FututeWithContext(ctx, func() uint64 {
		r := rand.New(rand.NewSource(time.Now().UnixNano()))
		v := r.Uint64()
		fmt.Printf("Initial number: %d.", v)
		return v
	})

	if ch1, ch2, err := harmony.TeeWithContext(ctx, ch); err == nil {
		log.Printf("err: %v", err)
		return
	} else {
		chRep1, _ := harmony.PipelineWithContext(ctx, ch1, 1, sqrtBabylonian)
		chRep2, _ := harmony.PipelineWithContext(ctx, ch2, 1, sqrtBakhshali)

		chRep1, _ = harmony.OrDoneWithContext(ctx, chRep1)
		chRep2, _ = harmony.OrDoneWithContext(ctx, chRep2)

		out, _ := harmony.FanInWithContext(ctx, chRep1, chRep2)
		fmt.Printf("Result is :%v", <-out)
	}
}

Index

Variables

var ErrContext = errors.New("harmony: nil Context")
var ErrDone = errors.New("harmony: nil done chant")

func Bridge

func Bridge[T any](done <-chan struct{}, incoming <-chan (<-chan T)) (<-chan T, error)

Bridge will return chan of generic type T used a pipe for the values received from the sequence of channels or ErrDone. Close received channel .one you got fromincoming. in order to switch for a new one. Goroutines exists on close of incoming or done chan closed.

func BridgeWithContext

func BridgeWithContext[T any](ctx context.Context, incoming <-chan (<-chan T)) (<-chan T, error)

BridgeWithContext will return chan of generic type T used a pipe for the values received from the sequence of channels or ErrContext. Close received channel .one you got fromincoming. in order to switch for a new one. Goroutines exists on close of incoming or context canceled.

func FanIn

func FanIn[T any](done <-chan struct{}, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)

FanIn returns unbuffered channel of generic type T which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or done is closed

func FanInWithContext

func FanInWithContext[T any](ctx context.Context, ch1, ch2 <-chan T, channels ...<-chan T) (<-chan T, error)

FanInWithContext returns unbuffered channel of generic type T which serves as delivery pipeline for the values received from at least 2 incoming channels, it's closed once all of the incoming channels closed or context cancelled.

Example

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
)

func main() {
	// return channel that generate
	filler := func(start, stop int) chan int {
		ch := make(chan int)

		go func() {
			defer close(ch)
			for i := start; i <= stop; i++ {
				ch <- i
			}
		}()

		return ch
	}

	ch1 := filler(10, 12)
	ch2 := filler(12, 14)
	ch3 := filler(15, 16)

	ctx := context.Background()
	if ch, err := harmony.FanInWithContext(ctx, ch1, ch2, ch3); err != nil {
		for val := range ch {
			fmt.Println(val)
		}
	}
}

func Futute

func Futute[T any](done <-chan struct{}, futureFn func() T) (<-chan T, error)

Futute.T any. will return buffered channel of size 1 and generic type T, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.

func FututeWithContext

func FututeWithContext[T any](ctx context.Context, futureFn func() T) (<-chan T, error)

FututeWithContext.T any. will return buffered channel of size 1 and generic type T, which will eventually contain the results of the execution `futureFn``, or be closed in case if context cancelled.

Example

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
)

func main() {
	// Requests random dogs picture from dog.ceo (dog as service)
	ctx := context.Background()
	a, _ := harmony.FututeWithContext(ctx, func() int { return 1 })
	b, _ := harmony.FututeWithContext(ctx, func() int { return 0 })
	fmt.Println(<-a, <-b)
}

Output

1 0

Example (Dogs_as_service)

FututeWithContext is shows creation of two "futures" that are used in our "rate our dogs" startup.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/butuzov/harmony"
	"io/ioutil"
	"log"
	"net/http"
	"time"
)

func main() {
	// Requests random dogs picture from dog.ceo (dog as service)
	getRandomDogPicture := func() string {
		var data struct {
			Message string "json:'message'"
		}

		const API_URL = "https://dog.ceo/api/breeds/image/random"
		ctx := context.Background()

		if req, err := http.NewRequestWithContext(ctx, http.MethodGet, API_URL, nil); err != nil {
			log.Println(fmt.Errorf("request: %w", err))
			return ""
		} else if res, err := http.DefaultClient.Do(req); err != nil {
			log.Println(fmt.Errorf("request: %w", err))
			return ""
		} else {
			defer res.Body.Close()

			if body, err := ioutil.ReadAll(res.Body); err != nil {
				log.Println(fmt.Errorf("reading body: %w", err))
				return ""
			} else if err := json.Unmarshal(body, &data); err != nil {
				log.Println(fmt.Errorf("unmarshal: %w", err))
				return ""
			}
		}

		return data.Message
	}

	a, _ := harmony.FututeWithContext(context.Background(), func() string {
		return getRandomDogPicture()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()
	b, _ := harmony.FututeWithContext(ctx, func() string {
		return getRandomDogPicture()
	})
	fmt.Printf("Rate My Dog: ..a) %s..b) %s.", <-a, <-b)
}

func OrDone

func OrDone[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, error)

OrDone will return a new unbuffered channel of type T that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone orCancel.

func OrDoneWithContext

func OrDoneWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, error)

OrDoneWithContext will return a new unbuffered channel of type T that serves as a pipeline for the incoming channel. Channel is closed once the context is canceled or the incoming channel is closed. This is variation or the pattern that usually called OrWithDone orCancel.

func Pipeline

func Pipeline[T1, T2 any](done <-chan struct{}, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)

Pipeline returns the channel of generic type T2 that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.

func PipelineWithContext

func PipelineWithContext[T1, T2 any](ctx context.Context, incomingCh <-chan T1, totalWorkers int, workerFn func(T1) T2) (<-chan T2, error)

PipelineWithContext returns the channel of generic type T2 that can serve as a pipeline for the next stage. It's implemented in almost same manner as a WorkerPool and allows to specify number of workers that going to proseed values received from the incoming channel. Outgoing channel is going to be closed once the incoming chan is closed or context canceld.

Example (0rimes)

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
	"log"
	"math"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
	defer cancel()

	var (
		incomingCh = make(chan uint64)
		isPrime    = func(n uint64) bool {
			for i := uint64(2); i < (n/2)+1; i++ {
				if n%i == 0 {
					return false
				}
			}
			return true
		}
	)

	var results []uint64
	workerFunc := func(n uint64) uint64 {
		if isPrime(n) {
			return n
		}
		return 0
	}

	// Producer: Initial numbers
	go func() {
		for i := uint64(0); i < math.MaxUint64; i++ {
			incomingCh <- i
		}
	}()

	if ch, err := harmony.PipelineWithContext(ctx, incomingCh, 100, workerFunc); err != nil {
		log.Printf("Error: %v", err)
	} else {
		for val := range ch {
			if val == 0 {
				continue
			}
			results = append(results, val)
		}
		fmt.Println(results)
	}
}

func Queue

func Queue[T any](done <-chan struct{}, genFn func() T) (<-chan T, error)

Queue returns an unbuffered channel that is populated by func genFn. Chan is closed once context is Done. It's similar to Future pattern, but doesn't have a limit to just one result.

func QueueWithContext

func QueueWithContext[T any](ctx context.Context, genFn func() T) (<-chan T, error)

QueueWithContext returns an unbuffered channel that is populated by func genFn. Chan is closed once context is Done. It's similar to Future pattern, but doesn't have a limit to just one result.

Example

Generate fibonacci sequence

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
	"log"
)

func main() {
	// fin returns function  that returns Fibonacci sequence up to n element,
	// it returns 0 after limit reached.
	fib := func(limit int) func() int {
		a, b, nTh := 0, 1, 1
		return func() int {
			if nTh > limit {
				return 0
			}

			nTh++
			a, b = b, a+b
			return a
		}
	}

	first10FibNumbers := make([]int, 10)
	incoming, err := harmony.QueueWithContext(context.Background(), fib(10))
	if err != nil {
		log.Printf("err: %v", err)
		return
	}

	for i := 0; i < cap(first10FibNumbers); i++ {
		first10FibNumbers[i] = <-incoming
	}

	fmt.Println(first10FibNumbers)
}

Output

[1 1 2 3 5 8 13 21 34 55]

func Tee

func Tee[T any](done <-chan struct{}, incoming <-chan T) (<-chan T, <-chan T, error)

Tee will return two channels of generic type T used to fan -out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.

func TeeWithContext

func TeeWithContext[T any](ctx context.Context, incoming <-chan T) (<-chan T, <-chan T, error)

TeeWithContext will return two channels of generic type T used to fan -out data from the incoming channel. Channels needs to be read in order next iteration over incoming chanel happen.

func WorkerPool

func WorkerPool[T any](done <-chan struct{}, jobQueue chan T, maxWorkers int, workFunc func(T)) error

WorkerPool accepts channel of generic type T which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or done is closed

func WorkerPoolWithContext

func WorkerPoolWithContext[T any](ctx context.Context, jobQueue chan T, maxWorkers int, workFunc func(T)) error

WorkerPoolWithContext accepts channel of generic type T which is used to serve jobs to max workersTotal workers. Goroutines stop: Once channel closed and drained, or context cancelled.

Example (0rimes)

package main

import (
	"context"
	"fmt"
	"github.com/butuzov/harmony"
	"math"
	"runtime"
	"sync"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
	defer cancel()

	var (
		primesCh   = make(chan uint64)
		incomingCh = make(chan uint64)
		isPrime    = func(n uint64) bool {
			for i := uint64(2); i < (n/2)+1; i++ {
				if n%i == 0 {
					return false
				}
			}
			return true
		}
		totalWorkers = runtime.NumCPU() - 1
	)

	// Producer: Initial numbers
	go func() {
		for i := uint64(0); i < math.MaxUint64; i++ {
			incomingCh <- i
		}
	}()

	// Consumers Worker Pool: checking primes of incoming numbers.
	harmony.WorkerPoolWithContext(ctx, incomingCh, totalWorkers, func(n uint64) {
		if !isPrime(n) {
			return
		}
		primesCh <- n
	})

	var results []uint64
	var mu sync.RWMutex
	go func() {
		for n := range primesCh {
			mu.Lock()
			results = append(results, n)
			mu.Unlock()
		}
	}()

	<-ctx.Done()

	mu.RLock()
	fmt.Println(results)
	mu.RUnlock()
}

Resources

About

Go's concurrency patterns as generic library to use.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published