From babfda4ac1b8f820a812a5e0e9392b0a4f25da6a Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Fri, 26 Apr 2019 18:13:41 +0000 Subject: [PATCH] improvements for issue #153 --- monstache.go | 58 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/monstache.go b/monstache.go index 4a80624..a36fe01 100644 --- a/monstache.go +++ b/monstache.go @@ -1292,6 +1292,17 @@ func resumeWork(ctx *gtm.OpCtxMulti, client *mongo.Client, config *configOptions } } } + drained := false + for !drained { + select { + case _, open := <-ctx.OpC: + if !open { + drained = true + } + default: + drained = true + } + } ctx.Resume() } @@ -3841,27 +3852,41 @@ func main() { ChangeStreamNs: config.ChangeStreamNs, } - gtmCtx := gtm.StartMulti(mongos, gtmOpts) - - if config.readShards() && !config.DisableChangeEvents { - gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler()) - } + heartBeat := time.NewTicker(10 * time.Second) if config.ClusterName != "" { if enabled { infoLog.Printf("Starting work for cluster %s", config.ClusterName) } else { infoLog.Printf("Pausing work for cluster %s", config.ClusterName) - gtmCtx.Pause() + bulk.Stop() + for range heartBeat.C { + enabled, err = enableProcess(mongoClient, config) + if enabled { + infoLog.Printf("Resuming work for cluster %s", config.ClusterName) + bulk.Start(context.Background()) + break + } + select { + case <-sigs: + shutdown(5, nil, nil, nil, nil, config) + default: + continue + } + } } + } else { + heartBeat.Stop() + } + + gtmCtx := gtm.StartMulti(mongos, gtmOpts) + + if config.readShards() && !config.DisableChangeEvents { + gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler()) } timestampTicker := time.NewTicker(10 * time.Second) if config.Resume == false { timestampTicker.Stop() } - heartBeat := time.NewTicker(10 * time.Second) - if config.ClusterName == "" { - heartBeat.Stop() - } statsTimeout := time.Duration(30) * time.Second if config.StatsDuration != "" { statsTimeout, err = time.ParseDuration(config.StatsDuration) @@ -3975,19 +4000,6 @@ func main() { }() } infoLog.Println("Listening for events") - if config.ClusterName != "" && !enabled { - gtmCtx.Pause() - bulk.Stop() - for range heartBeat.C { - enabled, err = enableProcess(mongoClient, config) - if enabled { - infoLog.Printf("Resuming work for cluster %s", config.ClusterName) - bulk.Start(context.Background()) - resumeWork(gtmCtx, mongoClient, config) - break - } - } - } for { select { case timeout := <-doneC: