Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: support draining messages / removing nsqd from rotation #1305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jehiah
Copy link
Member

@jehiah jehiah commented Nov 25, 2020

This adds support for a nsqd mode where messages are drained to facilitate removing a nsqd instance from a cluster.

New nsqd API Endpoints

  • PUT /state/shutdown
  • PUT /state/drain
  • POST /topic/drain?topic=...

New nsqd CLI options (also settable via config file)

  • --sigterm-mode=shutdown (default - existing behavior) and --sigterm-mode=drain (new option)

Resolves #1302

@jehiah
Copy link
Member Author

jehiah commented Dec 22, 2020

RFR @mreiferson @ploxiln - this is ready for a review pass.

I'm pretty happy with how this came out, but still have another pass to make to expand some test coverage.

go.mod Outdated
@@ -16,4 +18,4 @@ require (
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 // indirect
)

go 1.13
replace github.com/judwhite/go-svc => github.com/jehiah/go-svc v1.1.3-0.20201125205428-33f8faa2d870
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO pending judwhite/go-svc#15

Copy link
Member

@mreiferson mreiferson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far! 👍

apps/nsqd/main.go Outdated Show resolved Hide resolved
apps/nsqd/main.go Show resolved Hide resolved
internal/test/assertions.go Outdated Show resolved Hide resolved
nsqd/channel.go Outdated Show resolved Hide resolved
nsqd/nsqd.go Outdated
@@ -477,8 +489,24 @@ func (n *NSQD) GetTopic(topicName string) *Topic {
n.Unlock()
return t
}
if atomic.LoadInt32(&n.isDraining) == 1 {
// don't create new topics when nsqd is draining
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callers of GetTopic() previously expected a non-nil response. It seems like this check is to prevent any of the code paths that create topics as a side effect from recreating topics during drain.

Would it be better to just bite-the-bullet and convert this to return an error rather than retrofitting this into a nil response?

Alternatively, we could add an nsqd.IsTopicDraining() func and the call sites that create topics as side effects can check it before calling GetTopic()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally agree this is the most non-obvious bit of logic - I'll start by renaming these to GetOrCreate.... and docstring that creation may not succeed.

I think despite being atomic we want the isDraining here in the read lock so this can't race a concurrent nsqd starting to drain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on the race. Still, we shouldn't abuse nil here. This is a new edge case where a topic won't be returned and we have to modify all call sites anyway to check for nil, might as well do it "right" and add a proper error return value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL - I think the rename to GetOrCreate{Topic,Channel} helped a lot; I also documented that these may or may not succeed. Given that, checking nil feels an idiomatic way to implement.

We could switch but feels like bike shedding? The rename was helpful in ensuring i've covered all code paths (a few more were updated) - i should have done a more exhaustive check originally.

nsqd/channel.go Outdated Show resolved Hide resolved
nsqd/nsqd.go Outdated Show resolved Hide resolved
nsqd/topic.go Outdated Show resolved Hide resolved
nsqd/topic.go Show resolved Hide resolved
// If in draining mode and we wrote a message to channels
// check if it was the last message on the topic (there are no more left)
// in which case we start draining each channel
if atomic.LoadInt32(&t.isDraining) == 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth benchmarking... but we've tried hard to minimize the logic in the core message loop. In this case, draining is an irreversible state, meaning we just need to propagate the state change locally into this goroutine (e.g. like paused state) rather than check an atomic for every message published.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to write some benchmarks generally for messagePump but I couldn't get a consistent benchmark output. (as such i'll probably drop that benchmark from this PR before landing - unless it looks useful). In trying to benchmark i ran into issues with GUID sequences expiring so I switched some benchmarks to use an integer sequence to avoid timing impacts from GUID sequences expiring.

name                old time/op   new time/op   delta
TopicMessagePump-2  3.06µs ±181%  7.04µs ±131%   ~     (p=0.113 n=9+10)

I'd assume a refactor which added another 'drainChanchannel forTopic.messagePumpto watch for draining state updates would be more expensive thanatomic.LoadInt32` but.. i'm not sure - @ploxiln have any intuition here?

@ploxiln
Copy link
Member

ploxiln commented Dec 29, 2020

It seems like it would be simpler to just dis-allow publishing messages. Allow creating channels like normal, and don't bother deleting any before regular shutdown. Just return error for any message publish request. The only other bit of logic needed is the check on each FIN if this channel is now completely empty, and then if it was the last one.

@jehiah
Copy link
Member Author

jehiah commented Dec 29, 2020

@ploxiln that functionality would be similar but i'm not sure it would be easier to implement because you then need a new method of propagating FIN's back up where this piggy backs on the delete functionality. - #1302 (comment) discusses the tradeoffs between the two and this is the functionality i'm interested in (to remove a node from rotation) - Can you think of a compelling use case for the other, or where it becomes materially different?

@ploxiln
Copy link
Member

ploxiln commented Dec 29, 2020

#1302 (comment)

If you are trying to remove a nsqd instance from rotation where that nsqd instance had 10 different topics, but just one or two with notable backlogs, it would be desirable to have the topics that drain quickly deleted. Deleting promotes a better cluster hygiene where you don't have a nsqd instance which is no longer getting messages on a topic still getting consumer connections where it causes RDY to be spread thin. (i.e. think a topic that takes a day to drain in some odd circumstance.)

I see.

I didn't have a need for the other behavior, of keeping the topics/channels until exit, I just think it may be simpler to implement, and sufficient. Fewer global state checks, fewer new error cases. There was some mess with ephemeral topics/channels being deleted ... though it looks like this case may not be so bad.

@jehiah jehiah requested a review from mreiferson January 1, 2021 03:49
@jehiah
Copy link
Member Author

jehiah commented Jan 19, 2021

@mreiferson PTAL - i think this is the main outstanding discussion to resolve - #1305 (comment)

@jehiah jehiah mentioned this pull request Feb 2, 2021
Copy link
Member

@mreiferson mreiferson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Looks like this needs a rebase)

As @ploxiln mentioned, there are a lot of global state checks. I'm still trying to map it all out, but it feels like we're missing something.

I am also still unhappy with the nil checks in the revised Get*Topic paths, especially the implicit knowledge that call sites now have that it means draining (i.e. why not return that error?).

depth, inFlight, deferred := c.Depth(), c.InFlightCount(), c.DeferredCount()
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining. depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred)
// if we are empty delete
if depth+inFlight+deferred == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition b/w this line and line 162 (a concurrent pub that had already passed the isDraining check but lost the race w/ line 162)

router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
router.Handle("PUT", "/state/drain", http_api.Decorate(s.startDraining, log, http_api.V1))
router.Handle("PUT", "/state/shutdown", http_api.Decorate(s.shutdown, log, http_api.V1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming bla bla bla... do we need the /state prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an API layout (and documentation) perspective I felt it was good to have these grouped together, but i'm also not sold on the /state prefix; Any other naming ideas?

go func() {
// in some cases StartDraining results in an exit immediately
// allow this API call to respond before exiting
time.Sleep(time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a HTTP request, there is some desire to allow the HTTP request to complete (i.e. the caller should get a HTTP response back) before the action takes place. If we call immediately an shutdown can (and will in many cases) close this connection and the caller will get a connection error.

I'm open to other ways to solve this (because it's obviously not a generally good pattern to rely on timing), but this seemed reasonable for the scope of this endpoint.

nsqd/nsqd.go Outdated
@@ -350,7 +357,11 @@ func (n *NSQD) LoadMetadata() error {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
topic := n.GetOrCreateTopic(t.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these changes in LoadMetadata. It is only ever called from Main(), so it's impossible for it to encounter topics/channels that are being drained.

@jehiah
Copy link
Member Author

jehiah commented Mar 1, 2021

(Looks like this needs a rebase)

I was debating the rebase mid-review to pickup upstream go-svc changes 🤷

As @ploxiln mentioned, there are a lot of global state checks. I'm still trying to map it all out, but it feels like we're missing something.

Open to ideas

I am also still unhappy with the nil checks in the revised Get*Topic paths, especially the implicit knowledge that call sites now have that it means draining (i.e. why not return that error?).

ok; updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

nsq: DRAINING mode
3 participants