Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): subscriber-side changes for ordering keys #10201

Closed
wants to merge 22 commits into from

Conversation

pradn
Copy link
Contributor

@pradn pradn commented Jan 24, 2020

Implements the other half of #9928

@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Jan 24, 2020

There is enough load capacity to send one message per ordering key
because each key is the result of an ack or nack. Since the load went
down by one message, it's safe to send the user another message for the

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? If so, how do messages for different ordering keys get in? Let's say load capacity allows for 10 messages and I have 10 ordering keys that have built up a queue. Now, a message for an 11th ordering key comes in. What happens?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar remark here - what if the load is too high due to the total message size, not count?
This seems to break the assumption in the docstring...

Re: "what happens" - the 11th ordering keys message is normally put into MessagesOnHold (in the streaming pull manager's _on_response() method). Then they just wait there to be released until the load allows for it.

Copy link
Contributor Author

@pradn pradn Jan 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response to Kamal's question:
When a message is dropped (see the dispatcher's drop() method) as a result of ack, nack, or drop by the user, messages get released in two ways:

  1. dispatcher.drop() calls activate_ordering_keys(), which releases the next message for each ordering key (if they exist). Since at least one message was dropped for each key, it's safe to release a message for each key.
  2. dispatcher.drop() then calls maybe_resume_consumer(), which tells the MessagesOnHold class to release the next messages in the queue (within load parameters). These messages may be ordered or unordered.

In your example, the message with the 11th ordering key would be released in step 2.

Copy link
Contributor Author

@pradn pradn Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response to Peter's question:
You're right that it's possible that the next ordered message pulled for the same key is too big, and that load would exceed the max. I should document this in the docstring.

I think this is permissible because the alternative would complicate MessagesOnHold even more. To solve this issue, we'd have to keep track of ordering keys that need to be "activated" next. When MessagesOnHold.get() is called, we'd need to check that side list, and also the main queue of held messages.

The reason why I went with this way of storing ordered messages is to prevent a situation where we have to choose between various ordering keys and the general unprocessed/on-hold queue. If we always gave the user ordered messages first, we might be starving the user of unordered messages, and vice versa. Fairness would only be guaranteed by random weighting, which is usually linear or, in a more complicated form, log(n) in the number of weights (keys). We'd run into this same issue with this little side list of "ordering keys that need to be activated".

Copy link
Contributor

@plamut plamut Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't say if treating FlowControl.max_bytes only as a "guideline" is permissible or not, but yes, let's at least document that possibility here.

Sometimes releasing some of the message a bit too early is IMO still a lesser evil than starving.

@@ -80,6 +80,20 @@ def _wrap_callback_errors(callback, on_callback_error, message):
on_callback_error(exc)


# TODO
# check if dropping behavior in Leaser is going to be a problem. (maybe disbale

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a message to be dropped that was receipt acked, but not yet sent to the user's callback? If so, then drop could break ordered delivery, yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the leaser only drops ordered messages that have been 1) sent to the user and 2) have gone past the lease expiry timer.

The lease expiry timer starts only when the message has been sent to the user.

"""
self._manager.leaser.remove(items)
ordering_keys = (k.ordering_key for k in items if k.ordering_key)
self._manager.activate_ordering_keys(ordering_keys)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be later than it needs to be for delivering the next message for the ordering key. We only need to wait for the user callback to run to completion, we do not need to wait for ack or nack. Maybe in Python it doesn't matter so much because the likelihood of someone doing asynchronous processing outside of the callback is small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption was that most users would ack their message inside the user callback itself. This would be before the callback finishes, though presumably near the end. I think this scenario is more likely than offloading the message to an async process. We can revisit the question if the async scenario arises in practice. The change itself would be small - modifying _wrap_callback_errors in streaming_pull_manager.py.

@plamut plamut added the api: pubsub Issues related to the Pub/Sub API. label Jan 29, 2020
Copy link
Contributor

@plamut plamut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first review pass. I did not check the tests yet, nor have I run the code, as the aim was mostly to get the big picture and verify a few assumptions.


There is enough load capacity to send one message per ordering key
because each key is the result of an ack or nack. Since the load went
down by one message, it's safe to send the user another message for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar remark here - what if the load is too high due to the total message size, not count?
This seems to break the assumption in the docstring...

Re: "what happens" - the 11th ordering keys message is normally put into MessagesOnHold (in the streaming pull manager's _on_response() method). Then they just wait there to be released until the load allows for it.

@plamut plamut added the do not merge Indicates a pull request not ready for merge, due to either quality or timing. label Jan 31, 2020
@plamut
Copy link
Contributor

plamut commented Jan 31, 2020

Starting the repo split, please do not merge.

activating the key.
Since the load went down by one message, it's probably safe to send the
user another message for the same key. Since the released message may be
bigger than the previous one, this may increase the load above the maximum.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check - it might also happen that we release a message while the current load is still above 1.0. Please confirm that this is fine, too, if it happens due to message size.


An example how this could happen (please correct me if I'm wrong):

  • FlowControl.max_messages is set to "a lot" and FlowControl.max_bytes is set to 10. The user code is currently processing three messages with sizes 1, 1, and 1, respectively (load == 0.3).
  • The client then receives several message with sizes 9, 8, and 7. Since the current load allows for it, we dispatch the first message (size 9). The user code is now processing messages of sizes 1, 1, 1, and 9, the load is 1.2, and two messages with sizes 8 and 7 are on hold.
  • The user code ACKs a small message, that message is dropped from the lease management. The load drops to 1.1.
  • Since the load went down by one message, we (incorrectly) assume that we now have capacity to release another message, thus we release the message of size 8.
  • The user code is now processing messages of sizes 1, 1, 9, and 8, and the load actually jumps to 1.9.
  • In unlucky cases, the issue can compound - the user code ACKs another small message (size 1), which makes the client to release another (bigger) message - size 7 this time. The user code is then processing messages of sizes 1, 9, 8, 7, and the load is 2.5 ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right that this can happen.

My assumption is that messages are of roughly the same size and that user byte size limits are significantly higher than the messages themselves. The default byte size limit is 100mb and the max user-message size is 10mb, which I believe is rare.

Hopefully, users wont hit upon a pathological case often.

Copy link
Contributor

@plamut plamut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have a sanity check question if we are indeed fine with temporarily not taking FlowControl.max_bytes into account in some circumstances, but otherwise it seems like we're getting there. 👍

Now that we have both the subscriber and the publisher sides, we should consider adding at least one system tests to cover the ordering keys feature.

The rest are a few minor remarks and a merge conflict to resolve.

@@ -0,0 +1,274 @@
# Copyright 2018, Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New file, should have the current year in the Copyright line (AFAIK).

Suggested change
# Copyright 2018, Google LLC
# Copyright 2020, Google LLC

@plamut
Copy link
Contributor

plamut commented Feb 10, 2020

Merged in the new repo, closing.

@plamut plamut closed this Feb 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement. do not merge Indicates a pull request not ready for merge, due to either quality or timing.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants