Workerpool is a pool containing several workers, who are waiting to process tasks independently. In other word, workerpool is a project which can help you using goroutine easier for it has limited the amount of thread.
- Every worker is a goroutine/thread, and start working when
Start()
is be called. - There's a channel of
Task
in dispatcher, storing tasks recevied byReceiveNormalTask(task Task)
Task
is an interface holding only one method:Exec() error
- Structure Diagram
workerpool
__________________________________________________________
|
| dispatcher workers
| contain 2 storage implement by goruntine
| implement by channel grab and process task
| _
| ------------------ |
| urgent tasks (grab first) |----> worker #1 |
| ██ ██ ██ ██ ██ ██ -----------------| |
| ------------------ |----> worker #2 |
| | |
| |----> worker #3 \
| ------------------ | workerNum
| normal tasks (grab later) |----> worker #4 /
| ██ ██ ██ ██ ██ ██ - - - - - - - - -| |
| ------------------ |----> worker #5 |
| . |
| . |
| |<--- buf --->| . _|
|
$ go get github.com/lilihh/workerpool
The main idea is
- generate a workerpool and start it
- define your own
Task
by implementExec() error
- make workerpool receive tasks
This section will show you some examples.
Let's begin with the simplest one.
// define your own task
func newExampleTask() workerpool.Task {
// generate a task and return it
}
type exampleTask struct {}
func (t *exampleTask) Exec() error {
// do something
}
func main() {
// new a workerpool
wp := workerpool.NewWorkerPool(buf, workerNum)
wp.Start()
defer wp.Close()
// generate tasks
num := 10
tasks := make([]workerpool.Task, 0, num)
for i := 0; i < num; i++ {
tasks = append(tasks, newExampleTask())
}
// perocess tasks
for _, task := range tasks {
wp.ReceiveNormalTask(task)
}
// wait
<-time.After(time.Millisecond)
}
If there are some tasks are urgent, you should mark it with high priority, and workers in workerpool will process it first.
// normal task
err := wp.ReceiveNormalTask(task)
// urgent task
err := wp.ReceiveUrgentTask(task)
The capacity of task-buffer in the workerpool is constant. What about the amount of tasks is larger than the capacity?
ReceiveNormalTask(task Task)
will return an error if the workerpool receive a task but the buffer is full already. In that case, workerpool do not receive that task actually. So you have to control it by yourself.
// if you want every task must be done anyway
func main() {
// new a workerpool
...
// generate tasks
...
// perocess tasks
for _, task := range tasks {
for err := wp.ReceiveNormalTask(task); err != nil; {
err = wp.ReceiveNormalTask(task)
}
}
// wait
...
}
In real case, we usually use sync.WaitGroup
instead of time.After
to control the process.
func newExampleTask(wg *sync.WaitGroup){} Task {
return &exampleTask{
wg: wg,
}
}
type exampleTask struct {
wg *sync.WaitGroup
}
func (t *exampleTask) Exec() error {
// do something
t.wg.Done()
}
func main() {
// new a workerpool
...
// generate tasks
wg := &sync.WaitGroup{}
tasks := make([]workerpool.Task, 0, 10)
for i := 0; i < 10; i++ {
tasks = append(tasks, newExampleTask(wg, fmt.Sprintf("%s", i+1)))
}
// perocess tasks
wg.Add(len(tasks))
for _, task := range tasks {
for err := wp.ReceiveNormalTask(task); err != nil; {
err = wp.ReceiveNormalTask(task)
}
}
wg.Wait()
}
if you want to know what happen in workerpool, you can use Debug()
// new a workerpool
...
wp.Debug()
wp.Start()