New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): subscriber-side changes for ordering keys #10201
Conversation
…_publish(). Do throw if it is called without enabling message ordering.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Outdated
Show resolved
Hide resolved
|
||
There is enough load capacity to send one message per ordering key | ||
because each key is the result of an ack or nack. Since the load went | ||
down by one message, it's safe to send the user another message for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? If so, how do messages for different ordering keys get in? Let's say load capacity allows for 10 messages and I have 10 ordering keys that have built up a queue. Now, a message for an 11th ordering key comes in. What happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar remark here - what if the load is too high due to the total message size, not count?
This seems to break the assumption in the docstring...
Re: "what happens" - the 11th ordering keys message is normally put into MessagesOnHold
(in the streaming pull manager's _on_response()
method). Then they just wait there to be released until the load allows for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response to Kamal's question:
When a message is dropped (see the dispatcher's drop() method) as a result of ack, nack, or drop by the user, messages get released in two ways:
- dispatcher.drop() calls activate_ordering_keys(), which releases the next message for each ordering key (if they exist). Since at least one message was dropped for each key, it's safe to release a message for each key.
- dispatcher.drop() then calls maybe_resume_consumer(), which tells the MessagesOnHold class to release the next messages in the queue (within load parameters). These messages may be ordered or unordered.
In your example, the message with the 11th ordering key would be released in step 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response to Peter's question:
You're right that it's possible that the next ordered message pulled for the same key is too big, and that load would exceed the max. I should document this in the docstring.
I think this is permissible because the alternative would complicate MessagesOnHold even more. To solve this issue, we'd have to keep track of ordering keys that need to be "activated" next. When MessagesOnHold.get() is called, we'd need to check that side list, and also the main queue of held messages.
The reason why I went with this way of storing ordered messages is to prevent a situation where we have to choose between various ordering keys and the general unprocessed/on-hold queue. If we always gave the user ordered messages first, we might be starving the user of unordered messages, and vice versa. Fairness would only be guaranteed by random weighting, which is usually linear or, in a more complicated form, log(n) in the number of weights (keys). We'd run into this same issue with this little side list of "ordering keys that need to be activated".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't say if treating FlowControl.max_bytes
only as a "guideline" is permissible or not, but yes, let's at least document that possibility here.
Sometimes releasing some of the message a bit too early is IMO still a lesser evil than starving.
@@ -80,6 +80,20 @@ def _wrap_callback_errors(callback, on_callback_error, message): | |||
on_callback_error(exc) | |||
|
|||
|
|||
# TODO | |||
# check if dropping behavior in Leaser is going to be a problem. (maybe disbale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for a message to be dropped that was receipt acked, but not yet sent to the user's callback? If so, then drop could break ordered delivery, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the leaser only drops ordered messages that have been 1) sent to the user and 2) have gone past the lease expiry timer.
The lease expiry timer starts only when the message has been sent to the user.
""" | ||
self._manager.leaser.remove(items) | ||
ordering_keys = (k.ordering_key for k in items if k.ordering_key) | ||
self._manager.activate_ordering_keys(ordering_keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be later than it needs to be for delivering the next message for the ordering key. We only need to wait for the user callback to run to completion, we do not need to wait for ack or nack. Maybe in Python it doesn't matter so much because the likelihood of someone doing asynchronous processing outside of the callback is small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My assumption was that most users would ack their message inside the user callback itself. This would be before the callback finishes, though presumably near the end. I think this scenario is more likely than offloading the message to an async process. We can revisit the question if the async scenario arises in practice. The change itself would be small - modifying _wrap_callback_errors in streaming_pull_manager.py.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the first review pass. I did not check the tests yet, nor have I run the code, as the aim was mostly to get the big picture and verify a few assumptions.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Outdated
Show resolved
Hide resolved
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Outdated
Show resolved
Hide resolved
|
||
There is enough load capacity to send one message per ordering key | ||
because each key is the result of an ack or nack. Since the load went | ||
down by one message, it's safe to send the user another message for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar remark here - what if the load is too high due to the total message size, not count?
This seems to break the assumption in the docstring...
Re: "what happens" - the 11th ordering keys message is normally put into MessagesOnHold
(in the streaming pull manager's _on_response()
method). Then they just wait there to be released until the load allows for it.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
Starting the repo split, please do not merge. |
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Outdated
Show resolved
Hide resolved
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Outdated
Show resolved
Hide resolved
activating the key. | ||
Since the load went down by one message, it's probably safe to send the | ||
user another message for the same key. Since the released message may be | ||
bigger than the previous one, this may increase the load above the maximum. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to double check - it might also happen that we release a message while the current load is still above 1.0. Please confirm that this is fine, too, if it happens due to message size.
An example how this could happen (please correct me if I'm wrong):
FlowControl.max_messages
is set to "a lot" andFlowControl.max_bytes
is set to 10. The user code is currently processing three messages with sizes 1, 1, and 1, respectively (load == 0.3).- The client then receives several message with sizes 9, 8, and 7. Since the current load allows for it, we dispatch the first message (size 9). The user code is now processing messages of sizes 1, 1, 1, and 9, the load is 1.2, and two messages with sizes 8 and 7 are on hold.
- The user code ACKs a small message, that message is dropped from the lease management. The load drops to 1.1.
- Since the load went down by one message, we (incorrectly) assume that we now have capacity to release another message, thus we release the message of size 8.
- The user code is now processing messages of sizes 1, 1, 9, and 8, and the load actually jumps to 1.9.
- In unlucky cases, the issue can compound - the user code ACKs another small message (size 1), which makes the client to release another (bigger) message - size 7 this time. The user code is then processing messages of sizes 1, 9, 8, 7, and the load is 2.5 ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right that this can happen.
My assumption is that messages are of roughly the same size and that user byte size limits are significantly higher than the messages themselves. The default byte size limit is 100mb and the max user-message size is 10mb, which I believe is rare.
Hopefully, users wont hit upon a pathological case often.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have a sanity check question if we are indeed fine with temporarily not taking FlowControl.max_bytes
into account in some circumstances, but otherwise it seems like we're getting there. 👍
Now that we have both the subscriber and the publisher sides, we should consider adding at least one system tests to cover the ordering keys feature.
The rest are a few minor remarks and a merge conflict to resolve.
@@ -0,0 +1,274 @@ | |||
# Copyright 2018, Google LLC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New file, should have the current year in the Copyright line (AFAIK).
# Copyright 2018, Google LLC | |
# Copyright 2020, Google LLC |
Merged in the new repo, closing. |
Implements the other half of #9928