Skip to content

lilihh/workerpool

Repository files navigation

workerpool

What is workerpool?

Workerpool is a pool containing several workers, who are waiting to process tasks independently. In other word, workerpool is a project which can help you using goroutine easier for it has limited the amount of thread.

Implement

  • Every worker is a goroutine/thread, and start working when Start() is be called.
  • There's a channel of Task in dispatcher, storing tasks recevied by ReceiveNormalTask(task Task)
  • Task is an interface holding only one method: Exec() error
  • Structure Diagram
    workerpool
     __________________________________________________________
    |
    |   dispatcher                          workers
    |   contain 2 storage                   implement by goruntine
    |   implement by channel                grab and process task
    |                                                          _
    |   ------------------                                      |
    |   urgent tasks        (grab first)    |---->  worker #1   |
    |   ██ ██ ██ ██ ██ ██  -----------------|                   |
    |   ------------------                  |---->  worker #2   |
    |                                       |                   |
    |                                       |---->  worker #3    \
    |   ------------------                  |                     workerNum
    |   normal tasks        (grab later)    |---->  worker #4    / 
    |   ██ ██ ██ ██ ██ ██  - - - - - - - - -|                   | 
    |   ------------------                  |---->  worker #5   |
    |                                       .                   |
    |                                       .                   |      
    |   |<---  buf   --->|                  .                  _|
    |

Installation

$ go get github.com/lilihh/workerpool

How to use?

The main idea is

  1. generate a workerpool and start it
  2. define your own Task by implement Exec() error
  3. make workerpool receive tasks

This section will show you some examples.

Simplest example

Let's begin with the simplest one.

// define your own task
func newExampleTask() workerpool.Task {
    // generate a task and return it
}

type exampleTask struct {}

func (t *exampleTask) Exec() error {
    // do something
}

func main() {
    // new a workerpool
    wp := workerpool.NewWorkerPool(buf, workerNum)
    wp.Start()
    defer wp.Close()

    // generate tasks
    num := 10
    tasks := make([]workerpool.Task, 0, num)
    for i := 0; i < num; i++ {
        tasks = append(tasks, newExampleTask())
    }

    // perocess tasks
    for _, task := range tasks {
        wp.ReceiveNormalTask(task)
    }

    // wait
    <-time.After(time.Millisecond)  
}

Example with priorty

If there are some tasks are urgent, you should mark it with high priority, and workers in workerpool will process it first.

// normal task
err := wp.ReceiveNormalTask(task)

// urgent task
err := wp.ReceiveUrgentTask(task)

Example with every task must be done

The capacity of task-buffer in the workerpool is constant. What about the amount of tasks is larger than the capacity? ReceiveNormalTask(task Task) will return an error if the workerpool receive a task but the buffer is full already. In that case, workerpool do not receive that task actually. So you have to control it by yourself.

// if you want every task must be done anyway
func main() {
    // new a workerpool
    ...

    // generate tasks
    ...

    // perocess tasks
    for _, task := range tasks {
        for err := wp.ReceiveNormalTask(task); err != nil; {
            err = wp.ReceiveNormalTask(task)
        }
    }

    // wait
    ...
}

Example with WaitGroup

In real case, we usually use sync.WaitGroup instead of time.After to control the process.

func newExampleTask(wg *sync.WaitGroup){} Task {
    return &exampleTask{
        wg: wg,
    }
}

type exampleTask struct {
    wg *sync.WaitGroup
}

func (t *exampleTask) Exec() error {
    // do something

    t.wg.Done()
}
func main() {
    // new a workerpool
    ...

    // generate tasks
    wg := &sync.WaitGroup{}
    
    tasks := make([]workerpool.Task, 0, 10)
    for i := 0; i < 10; i++ {
        tasks = append(tasks, newExampleTask(wg, fmt.Sprintf("%s", i+1)))
    }

    // perocess tasks
    wg.Add(len(tasks))

    for _, task := range tasks {
        for err := wp.ReceiveNormalTask(task); err != nil; {
            err = wp.ReceiveNormalTask(task)
        }
    }

    wg.Wait()
}

Option

if you want to know what happen in workerpool, you can use Debug()

// new a workerpool
...
wp.Debug()
wp.Start()

About

No description or website provided.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages