From 93e1d9ddca3aa46444fa706a65681c499efeac43 Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Fri, 26 Apr 2019 17:46:50 +0000 Subject: [PATCH] improvements for issue #153 --- monstache.go | 58 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/monstache.go b/monstache.go index 9ba066c..7bb3edf 100644 --- a/monstache.go +++ b/monstache.go @@ -1330,6 +1330,17 @@ func resumeWork(ctx *gtm.OpCtxMulti, session *mgo.Session, config *configOptions ts := doc["ts"].(bson.MongoTimestamp) ctx.Since(ts) } + drained := false + for !drained { + select { + case _, open := <-ctx.OpC: + if !open { + drained = true + } + default: + drained = true + } + } ctx.Resume() } @@ -4003,27 +4014,41 @@ func main() { ChangeStreamNs: changeStreamNs, } - gtmCtx := gtm.StartMulti(mongos, gtmOpts) - - if config.readShards() && !config.DisableChangeEvents { - gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler()) - } + heartBeat := time.NewTicker(10 * time.Second) if config.ClusterName != "" { if enabled { infoLog.Printf("Starting work for cluster %s", config.ClusterName) } else { infoLog.Printf("Pausing work for cluster %s", config.ClusterName) - gtmCtx.Pause() + bulk.Stop() + for range heartBeat.C { + enabled, err = enableProcess(mongo, config) + if enabled { + infoLog.Printf("Resuming work for cluster %s", config.ClusterName) + bulk.Start(context.Background()) + break + } + select { + case <-sigs: + shutdown(5, nil, nil, nil, nil, config) + default: + continue + } + } } + } else { + heartBeat.Stop() + } + + gtmCtx := gtm.StartMulti(mongos, gtmOpts) + + if config.readShards() && !config.DisableChangeEvents { + gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler()) } timestampTicker := time.NewTicker(10 * time.Second) if config.Resume == false { timestampTicker.Stop() } - heartBeat := time.NewTicker(10 * time.Second) - if config.ClusterName == "" { - heartBeat.Stop() - } statsTimeout := time.Duration(30) * time.Second if config.StatsDuration != "" { statsTimeout, err = time.ParseDuration(config.StatsDuration) @@ -4136,19 +4161,6 @@ func main() { }() } infoLog.Println("Listening for events") - if config.ClusterName != "" && !enabled { - gtmCtx.Pause() - bulk.Stop() - for range heartBeat.C { - enabled, err = enableProcess(mongo, config) - if enabled { - infoLog.Printf("Resuming work for cluster %s", config.ClusterName) - bulk.Start(context.Background()) - resumeWork(gtmCtx, mongo, config) - break - } - } - } for { select { case timeout := <-doneC: