Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause a workerpool through keyboard #42

Open
Numenorean opened this issue Nov 11, 2020 · 16 comments
Open

Pause a workerpool through keyboard #42

Numenorean opened this issue Nov 11, 2020 · 16 comments

Comments

@Numenorean
Copy link

Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example

@gammazero
Copy link
Owner

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

@Numenorean
Copy link
Author

Numenorean commented Nov 11, 2020

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

i'm really don't know how to do it, i have tried this, but it have not work for me

       if err := keyboard.Open(); err != nil {
		panic(err)
	}
	defer func() {
		_ = keyboard.Close()
	}()
	ctx, cancel := context.WithCancel(context.Background())
	for i, str := range resources.data {
                char, _, _ := keyboard.GetKey()
	        if char == 112 {
			wp.Pause(ctx)
		} else if char == 115 {
			cancel()
		}
		str := str
		i := i
		wp.Submit(func() {
                    doSomeWork()
                }

@gammazero
Copy link
Owner

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
	"context"
	"fmt"

	"github.com/eiannone/keyboard"
	"github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
	wp := workerpool.New(5)

	if err := keyboard.Open(); err != nil {
		panic(err)
	}

	var cancel context.CancelFunc
	defer func() {
		keyboard.Close()
		if cancel != nil {
			cancel()
		}
	}()

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for _, str := range data {
		char, _, err := keyboard.GetKey()
		if err != nil {
			panic(err)
		}
		switch char {
		case 'p':
			if cancel == nil {
				// pause work
				var ctx context.Context
				ctx, cancel = context.WithCancel(context.Background())
				wp.Pause(ctx)
				fmt.Println("--- paused ---")
			} else {
				// already paused, so resume work
				fmt.Println("--- resuming ---")
				cancel()
				cancel = nil
			}
		case 's':
			wp.StopWait()
			fmt.Println("--- stopped ---")
			return
		}
		str := str
		wp.Submit(func() {
			doSomeWork(str)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s string) {
	fmt.Println("doWork", s)
}

@Numenorean
Copy link
Author

Numenorean commented Nov 12, 2020

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
	"context"
	"fmt"

	"github.com/eiannone/keyboard"
	"github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
	wp := workerpool.New(5)

	if err := keyboard.Open(); err != nil {
		panic(err)
	}

	var cancel context.CancelFunc
	defer func() {
		keyboard.Close()
		if cancel != nil {
			cancel()
		}
	}()

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for _, str := range data {
		char, _, err := keyboard.GetKey()
		if err != nil {
			panic(err)
		}
		switch char {
		case 'p':
			if cancel == nil {
				// pause work
				var ctx context.Context
				ctx, cancel = context.WithCancel(context.Background())
				wp.Pause(ctx)
				fmt.Println("--- paused ---")
			} else {
				// already paused, so resume work
				fmt.Println("--- resuming ---")
				cancel()
				cancel = nil
			}
		case 's':
			wp.StopWait()
			fmt.Println("--- stopped ---")
			return
		}
		str := str
		wp.Submit(func() {
			doSomeWork(str)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s string) {
	fmt.Println("doWork", s)
}

thanks, but it isn't what i want. I have been trying to do system like that:

  • user launch the loop with something data
  • program do some work with this data automatically, without user who pressing keys
  • user want to pause a program, he click "p" and program stops work
  • a few moments later user wants to resume work, he click "p" again and program launches from the place that is stopped
  • user want to sleep, he click on "s" and program save all remaining data(i'll do it myself) and completely stops
    It's not really cos keyboard.GetKey() wait for any key and my loop just stands still

@Numenorean
Copy link
Author

Numenorean commented Nov 18, 2020

@gammazero could u help me? i have a problem with pausing workerpool. I've solved the problem with pressing keyboard key to pause, but it doesn't work correctly. When i try to pause workerpool with queue ~6k, it waits for complete all tasks. is there any way to limit tasks queue? Smth like limiting queue to workers num, so i will be able to just wait for all workers are finished

@gammazero
Copy link
Owner

When i try to pause workerpool with queue ~6k, it waits for complete all tasks

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks. One reason for this was to avoid doing any additional checks for pause signal in the critical path. I see that for your use case, you would prefer that workerpool pauses without consuming additional queued items. I will evaluate a minimal approach this week.

is there any way to limit tasks queue?

The purpose of workerpool is to accept tasks without ever blocking. If the task queue size is limited, then workerpool will need to block (I do not think discarding tasks is appropriate). My thought on providing this is to have a new flavor of workerpool that is created using a new constructor: workerpool.NewBlocking(maxWorkers, queueSize int). I will also be able to look into this and provide an experimental version this week.

For your use case(s), do you see one or both of these features as useful?

@Numenorean
Copy link
Author

@gammazero thanks. just for me first feature would be more useful. But now i realise one big problem, when i call pause method it blocks main goroutine because it waits for complete all tasks, so i cannot resume working(function that pause pool doesn't call anyway). Here is example, i believe there is an any solution:

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/Numenorean/keypresses"
	"github.com/gammazero/workerpool"
)

var cancel context.CancelFunc

func checkPauseOrStop(wp *workerpool.WorkerPool) {
	if keypresses.IsKeyPressedGlobal(0x50, false) {
		if cancel == nil {
			// pause work
			var ctx context.Context
			ctx, cancel = context.WithCancel(context.Background())
			wp.Pause(ctx)
			fmt.Println("--- paused ---")
		} else {
			// already paused, so resume work
			fmt.Println("--- resuming ---")
			cancel()
			cancel = nil
		}
	} else if keypresses.IsKeyPressedGlobal(0x53, false) {
		wp.StopWait()
		fmt.Println("--- stopped ---")
		os.Exit(1)
	}
}

func main() {
	wp := workerpool.New(5)

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for i := 0; i < 0xf4240; i++ {
		checkPauseOrStop(wp)
		fmt.Println("Queue:", wp.WaitingQueueSize())
		wp.Submit(func() {
			doSomeWork(i)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s int) {
	fmt.Println("Work:", s)
	time.Sleep(5 * time.Second)
}

@Numenorean
Copy link
Author

@gammazero i've posted an issue above this post, is there anyway to resolve that?

@gammazero
Copy link
Owner

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

@Numenorean
Copy link
Author

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

any news?

@Numenorean
Copy link
Author

Hey man, any news?

@Numenorean
Copy link
Author

any news?

@gammazero
Copy link
Owner

I do have a couple implementations that I can publish in a branch. Here are some variations. Do you need any other than 1?

  1. Pause immediately (before running more queued tasks) and continue to queue submitted tasks
  2. Pause immediately and block new tasks from being submitted
  3. Pause after running all tasks that were already submitted, and continue queuing new tasks
  4. Pause after running all tasks that were already submitted, and block any new tasks

3 and 4 might better be called SyncPause. I am considering that blocking submission of new tasks (2 and 4) could be a parameter to pause. Thoughts?

@Numenorean
Copy link
Author

Numenorean commented Jul 18, 2021

continue to queue submitted tasks

Sorry for not replying right away. I don't quite understand if I have, for example, 1 million tasks, after pause -> waiting for all the tasks to be queued -> resume, will I be able to pause it one more time? And what about memory usage, after all the million tasks will be queued?

@Jbaukens
Copy link

Hello @gammazero hope u are doing well.
Did you eventually published somewhere in a branch option 1 of your latest comment ?

Cheers

@gammazero
Copy link
Owner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants