Skip to content

Commit

Permalink
improvements for issue #153
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Apr 26, 2019
1 parent b85f78a commit 93e1d9d
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions monstache.go
Expand Up @@ -1330,6 +1330,17 @@ func resumeWork(ctx *gtm.OpCtxMulti, session *mgo.Session, config *configOptions
ts := doc["ts"].(bson.MongoTimestamp)
ctx.Since(ts)
}
drained := false
for !drained {
select {
case _, open := <-ctx.OpC:
if !open {
drained = true
}
default:
drained = true
}
}
ctx.Resume()
}

Expand Down Expand Up @@ -4003,27 +4014,41 @@ func main() {
ChangeStreamNs: changeStreamNs,
}

gtmCtx := gtm.StartMulti(mongos, gtmOpts)

if config.readShards() && !config.DisableChangeEvents {
gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler())
}
heartBeat := time.NewTicker(10 * time.Second)
if config.ClusterName != "" {
if enabled {
infoLog.Printf("Starting work for cluster %s", config.ClusterName)
} else {
infoLog.Printf("Pausing work for cluster %s", config.ClusterName)
gtmCtx.Pause()
bulk.Stop()
for range heartBeat.C {
enabled, err = enableProcess(mongo, config)
if enabled {
infoLog.Printf("Resuming work for cluster %s", config.ClusterName)
bulk.Start(context.Background())
break
}
select {
case <-sigs:
shutdown(5, nil, nil, nil, nil, config)
default:
continue
}
}
}
} else {
heartBeat.Stop()
}

gtmCtx := gtm.StartMulti(mongos, gtmOpts)

if config.readShards() && !config.DisableChangeEvents {
gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler())
}
timestampTicker := time.NewTicker(10 * time.Second)
if config.Resume == false {
timestampTicker.Stop()
}
heartBeat := time.NewTicker(10 * time.Second)
if config.ClusterName == "" {
heartBeat.Stop()
}
statsTimeout := time.Duration(30) * time.Second
if config.StatsDuration != "" {
statsTimeout, err = time.ParseDuration(config.StatsDuration)
Expand Down Expand Up @@ -4136,19 +4161,6 @@ func main() {
}()
}
infoLog.Println("Listening for events")
if config.ClusterName != "" && !enabled {
gtmCtx.Pause()
bulk.Stop()
for range heartBeat.C {
enabled, err = enableProcess(mongo, config)
if enabled {
infoLog.Printf("Resuming work for cluster %s", config.ClusterName)
bulk.Start(context.Background())
resumeWork(gtmCtx, mongo, config)
break
}
}
}
for {
select {
case timeout := <-doneC:
Expand Down

0 comments on commit 93e1d9d

Please sign in to comment.