You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
I would LOVE, and maybe it's already done somehow I just haven't found it yet, to be able to create a separate handler function for calculating a changing "Concurrency" value, based on the worker servers that will connect/disconnect over the uptime of the server. For example, if I have a POTENTIAL pool of say 100 workers across 10 different machines, it doesn't make much sense to set the value to the default of 10 as this would severely limit the worker pool, however setting a maximum of 100 is sort of a naive approach as well. What would be cool is to allow worker servers to connect/disconnect whenever they want (like say, when a new machine is purchased and wants to be added to the server without stopping it, changing the Concurrency, then restarting it), and have a function that updates the Concurrency value in real-time based on the number of threads now available to the scheduler!
Describe the solution you'd like
Conceivably, I haven't tested this code yet, something like this could work:
package main
import (
"context""fmt""log""os""strconv""time""github.com/hibiken/asynq"
)
// Define the task typeconstTypeSendEmail="email:send"// Define the task payloadtypeEmailPayloadstruct {
TostringSubjectstringBodystring
}
// Define the task functionfuncsendEmail(ctx context.Context, task*asynq.Task) error {
varpayloadEmailPayloadiferr:=task.Payload(&payload); err!=nil {
returnfmt.Errorf("failed to unpack payload: %v", err)
}
// Send the email herefmt.Printf("Sending email to %s with subject: %s\n", payload.To, payload.Subject)
time.Sleep(5*time.Second) // Simulating email sendingfmt.Println("Email sent successfully!")
returnnil
}
funcmain() {
// Create a new Asynq serversrv:=asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: getInitialConcurrency(),
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
// Create a new ServeMux and register the email task functionmux:=asynq.NewServeMux()
mux.HandleFunc(TypeSendEmail, sendEmail)
// Start a background goroutine to update the Concurrency valuegoupdateConcurrency(srv)
// Run the serveriferr:=srv.Run(mux); err!=nil {
log.Fatalf("could not run server: %v", err)
}
}
// getInitialConcurrency retrieves the initial number of available worker nodes from the configurationfuncgetInitialConcurrency() int {
// In a real-world scenario, you would use a configuration management system// or a service discovery mechanism to retrieve the initial number of available workersnumWorkersStr:=os.Getenv("NUM_WORKERS")
ifnumWorkersStr=="" {
return10// Default to 10 workers if not set
}
numWorkers, err:=strconv.Atoi(numWorkersStr)
iferr!=nil {
log.Fatalf("failed to parse NUM_WORKERS: %v", err)
}
returnnumWorkers
}
// updateConcurrency periodically checks for changes in the worker pool and updates the Concurrency valuefuncupdateConcurrency(srv*asynq.Server) {
ticker:=time.NewTicker(1*time.Minute) // Check for changes every minutedeferticker.Stop()
for {
select {
case<-ticker.C:
numWorkers, err:=getNumWorkers()
iferr!=nil {
log.Printf("failed to get number of workers: %v", err)
continue
}
ifnumWorkers!=srv.Config().Concurrency {
log.Printf("Updating Concurrency from %d to %d", srv.Config().Concurrency, numWorkers)
srv.UpdateConfig(asynq.Config{
Concurrency: numWorkers,
Queues: srv.Config().Queues,
})
}
}
}
}
// getNumWorkers retrieves the number of available worker nodes from the configurationfuncgetNumWorkers() (int, error) {
// In a real-world scenario, you would use a configuration management system// or a service discovery mechanism to retrieve the number of available workersnumWorkersStr:=os.Getenv("NUM_WORKERS")
ifnumWorkersStr=="" {
return10, nil// Default to 10 workers if not set
}
numWorkers, err:=strconv.Atoi(numWorkersStr)
iferr!=nil {
return0, fmt.Errorf("failed to parse NUM_WORKERS: %v", err)
}
returnnumWorkers, nil
}
This sets the initial number of workers from the .env file, and then starts a separate go routine to update the Concurrency config value from the same .env file value. In practise, there would be a "getAvailableCPUCount" method instead of grabbing from a .env file. Only question, without testing it yet, would be if the Concurrency value is in fact able to be updated in real-time and take advantage of the newly connect workers or not. I plan on testing this out by monitoring the GUI to see if it works once I get my multiple machines setup, just thought I'd post it here in case something is already in the works for it, and if it might be of use to others.
Describe alternatives you've considered
NONE
Additional context
I will update this thread with my results if desired.
The text was updated successfully, but these errors were encountered:
Is your feature request related to a problem? Please describe.
I would LOVE, and maybe it's already done somehow I just haven't found it yet, to be able to create a separate handler function for calculating a changing "Concurrency" value, based on the worker servers that will connect/disconnect over the uptime of the server. For example, if I have a POTENTIAL pool of say 100 workers across 10 different machines, it doesn't make much sense to set the value to the default of 10 as this would severely limit the worker pool, however setting a maximum of 100 is sort of a naive approach as well. What would be cool is to allow worker servers to connect/disconnect whenever they want (like say, when a new machine is purchased and wants to be added to the server without stopping it, changing the Concurrency, then restarting it), and have a function that updates the Concurrency value in real-time based on the number of threads now available to the scheduler!
Describe the solution you'd like
Conceivably, I haven't tested this code yet, something like this could work:
This sets the initial number of workers from the .env file, and then starts a separate go routine to update the Concurrency config value from the same .env file value. In practise, there would be a "getAvailableCPUCount" method instead of grabbing from a .env file. Only question, without testing it yet, would be if the Concurrency value is in fact able to be updated in real-time and take advantage of the newly connect workers or not. I plan on testing this out by monitoring the GUI to see if it works once I get my multiple machines setup, just thought I'd post it here in case something is already in the works for it, and if it might be of use to others.
Describe alternatives you've considered
NONE
Additional context
I will update this thread with my results if desired.
The text was updated successfully, but these errors were encountered: