Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE REQUEST] Dynamically Changing Concurrency Config Value? #852

Open
windowshopr opened this issue Mar 31, 2024 · 1 comment
Open
Assignees
Labels
enhancement New feature or request

Comments

@windowshopr
Copy link

windowshopr commented Mar 31, 2024

Is your feature request related to a problem? Please describe.
I would LOVE, and maybe it's already done somehow I just haven't found it yet, to be able to create a separate handler function for calculating a changing "Concurrency" value, based on the worker servers that will connect/disconnect over the uptime of the server. For example, if I have a POTENTIAL pool of say 100 workers across 10 different machines, it doesn't make much sense to set the value to the default of 10 as this would severely limit the worker pool, however setting a maximum of 100 is sort of a naive approach as well. What would be cool is to allow worker servers to connect/disconnect whenever they want (like say, when a new machine is purchased and wants to be added to the server without stopping it, changing the Concurrency, then restarting it), and have a function that updates the Concurrency value in real-time based on the number of threads now available to the scheduler!

Describe the solution you'd like
Conceivably, I haven't tested this code yet, something like this could work:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/hibiken/asynq"
)

// Define the task type
const TypeSendEmail = "email:send"

// Define the task payload
type EmailPayload struct {
	To      string
	Subject string
	Body    string
}

// Define the task function
func sendEmail(ctx context.Context, task *asynq.Task) error {
	var payload EmailPayload
	if err := task.Payload(&payload); err != nil {
		return fmt.Errorf("failed to unpack payload: %v", err)
	}

	// Send the email here
	fmt.Printf("Sending email to %s with subject: %s\n", payload.To, payload.Subject)
	time.Sleep(5 * time.Second) // Simulating email sending
	fmt.Println("Email sent successfully!")

	return nil
}

func main() {
	// Create a new Asynq server
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{
			Concurrency: getInitialConcurrency(),
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
		},
	)

	// Create a new ServeMux and register the email task function
	mux := asynq.NewServeMux()
	mux.HandleFunc(TypeSendEmail, sendEmail)

	// Start a background goroutine to update the Concurrency value
	go updateConcurrency(srv)

	// Run the server
	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

// getInitialConcurrency retrieves the initial number of available worker nodes from the configuration
func getInitialConcurrency() int {
	// In a real-world scenario, you would use a configuration management system
	// or a service discovery mechanism to retrieve the initial number of available workers
	numWorkersStr := os.Getenv("NUM_WORKERS")
	if numWorkersStr == "" {
		return 10 // Default to 10 workers if not set
	}

	numWorkers, err := strconv.Atoi(numWorkersStr)
	if err != nil {
		log.Fatalf("failed to parse NUM_WORKERS: %v", err)
	}

	return numWorkers
}

// updateConcurrency periodically checks for changes in the worker pool and updates the Concurrency value
func updateConcurrency(srv *asynq.Server) {
	ticker := time.NewTicker(1 * time.Minute) // Check for changes every minute
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			numWorkers, err := getNumWorkers()
			if err != nil {
				log.Printf("failed to get number of workers: %v", err)
				continue
			}

			if numWorkers != srv.Config().Concurrency {
				log.Printf("Updating Concurrency from %d to %d", srv.Config().Concurrency, numWorkers)
				srv.UpdateConfig(asynq.Config{
					Concurrency: numWorkers,
					Queues:      srv.Config().Queues,
				})
			}
		}
	}
}

// getNumWorkers retrieves the number of available worker nodes from the configuration
func getNumWorkers() (int, error) {
	// In a real-world scenario, you would use a configuration management system
	// or a service discovery mechanism to retrieve the number of available workers
	numWorkersStr := os.Getenv("NUM_WORKERS")
	if numWorkersStr == "" {
		return 10, nil // Default to 10 workers if not set
	}

	numWorkers, err := strconv.Atoi(numWorkersStr)
	if err != nil {
		return 0, fmt.Errorf("failed to parse NUM_WORKERS: %v", err)
	}

	return numWorkers, nil
}

This sets the initial number of workers from the .env file, and then starts a separate go routine to update the Concurrency config value from the same .env file value. In practise, there would be a "getAvailableCPUCount" method instead of grabbing from a .env file. Only question, without testing it yet, would be if the Concurrency value is in fact able to be updated in real-time and take advantage of the newly connect workers or not. I plan on testing this out by monitoring the GUI to see if it works once I get my multiple machines setup, just thought I'd post it here in case something is already in the works for it, and if it might be of use to others.

Describe alternatives you've considered
NONE

Additional context
I will update this thread with my results if desired.

@windowshopr windowshopr added the enhancement New feature or request label Mar 31, 2024
@kamikazechaser
Copy link
Collaborator

Related to #850

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants