Skip to content

Commit

Permalink
fea: support job count limit
Browse files Browse the repository at this point in the history
Use case: I have 100k items I'd like to process when my
system is under low load. By submitting 100 `batch`[1] jobs doing

    cmdstalk -cmd=my-script.sh -max-jobs 1000

I know processing will halt as soon as load is above 0.8 (or whatever
`batch` is configured to).

[1] https://www.centos.org/docs/5/html/Deployment_Guide-en-US/s1-autotasks-at-batch.html
  • Loading branch information
JensRantil committed Jun 30, 2018
1 parent ebf86df commit 2468bf0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 19 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cmdstalk -help
# -cmd="": Command to run in worker.
# -per-tube=1: Number of workers per tube.
# -tubes=[default]: Comma separated list of tubes.
# -max-jobs=0: Maximum number of items to process before exitting. Zero for no limit.

# Watch three specific tubes.
cmdstalk -cmd="/path/to/your/worker --your=flags --here" -tubes="one,two,three"
Expand Down
12 changes: 8 additions & 4 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type Broker struct {
// Tube name this broker will service.
Tube string

log *log.Logger
results chan<- *JobResult
ctx context.Context
log *log.Logger
results chan<- *JobResult
jobReceived chan<- struct{}
ctx context.Context
}

type JobResult struct {
Expand Down Expand Up @@ -73,13 +74,14 @@ type JobResult struct {
}

// New broker instance.
func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) {
func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult, jobReceived chan<- struct{}) (b Broker) {
b.Address = address
b.Tube = tube
b.Cmd = cmd

b.log = log.New(os.Stdout, fmt.Sprintf("[%s:%d] ", tube, slot), log.LstdFlags)
b.results = results
b.jobReceived = jobReceived
b.ctx = ctx
return
}
Expand Down Expand Up @@ -117,6 +119,8 @@ func (b *Broker) Run(ticks chan bool) {

job := bs.NewJob(id, body, conn)

b.jobReceived <- struct{}{}

t, err := job.Timeouts()
if err != nil {
b.log.Panic(err)
Expand Down
47 changes: 33 additions & 14 deletions broker/broker_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,27 @@ const (
// created. The `perTube` option determines how many brokers are started for
// each tube.
type BrokerDispatcher struct {
address string
cmd string
conn *beanstalk.Conn
perTube uint64
tubeSet map[string]bool
ctx context.Context
wg sync.WaitGroup
address string
cmd string
conn *beanstalk.Conn
perTube uint64
tubeSet map[string]bool
jobReceived chan<- struct{}
ctx context.Context
wg sync.WaitGroup
}

func NewBrokerDispatcher(ctx context.Context, address, cmd string, perTube uint64) *BrokerDispatcher {
func NewBrokerDispatcher(parentCtx context.Context, address, cmd string, perTube, maxJobs uint64) *BrokerDispatcher {
ctx, cancel := context.WithCancel(parentCtx)
jobReceived := make(chan struct{})
go limittedCountGenerator(maxJobs, cancel, jobReceived)
return &BrokerDispatcher{
address: address,
cmd: cmd,
perTube: perTube,
tubeSet: make(map[string]bool),
ctx: ctx,
address: address,
cmd: cmd,
perTube: perTube,
tubeSet: make(map[string]bool),
jobReceived: jobReceived,
ctx: ctx,
}
}

Expand Down Expand Up @@ -77,11 +82,25 @@ func (bd *BrokerDispatcher) RunAllTubes() (err error) {
return
}

// limittedCountGenerator creates a channel that returns a boolean channel with
// nlimit true's and false otherwise. If nlimit is 0 it the channel will always
// be containing true.
func limittedCountGenerator(nlimit uint64, cancel context.CancelFunc, eventHappened <-chan struct{}) {
ngenerated := uint64(1)
for range eventHappened {
if nlimit != 0 && ngenerated == nlimit {
log.Println("reached job limit. quitting.")
cancel()
}
ngenerated++
}
}

func (bd *BrokerDispatcher) runBroker(tube string, slot uint64) {
bd.wg.Add(1)
go func() {
defer bd.wg.Done()
b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil)
b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil, bd.jobReceived)
b.Run(nil)
}()
}
Expand Down
4 changes: 4 additions & 0 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Options struct {

// The beanstalkd tubes to watch.
Tubes TubeList

// Maximum number of jobs to process before exitting.
MaxJobs uint64
}

// TubeList is a list of beanstalkd tube names.
Expand All @@ -54,6 +57,7 @@ func ParseFlags() (o Options, err error) {
flag.BoolVar(&o.All, "all", false, "Listen to all tubes, instead of -tubes=...")
flag.StringVar(&o.Cmd, "cmd", "", "Command to run in worker.")
flag.Uint64Var(&o.PerTube, "per-tube", 1, "Number of workers per tube.")
flag.Uint64Var(&o.MaxJobs, "max-jobs", 0, "Maximum number of items to process before exitting. Zero for no limit.")
flag.Var(&o.Tubes, "tubes", "Comma separated list of tubes.")
flag.Parse()

Expand Down
2 changes: 1 addition & 1 deletion cmdstalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
cancel()
}()

bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube)
bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube, opts.MaxJobs)

if opts.All {
bd.RunAllTubes()
Expand Down

0 comments on commit 2468bf0

Please sign in to comment.