Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Apr 22, 2024
1 parent bb6ffac commit da96453
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Expand Up @@ -331,7 +331,7 @@ def modify_ack_deadline(
"""
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
#if len(items) > 0:
# if len(items) > 0:
# print(f"mk: dispatcher.modify_ack_deadline() called with items={items}")
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
Expand Down
Expand Up @@ -1075,7 +1075,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
# protobuf message to significantly gain on attribute access performance.
received_messages = response._pb.received_messages

current_time = Timestamp()
current_time.GetCurrentTime()
for rm in received_messages:
Expand Down Expand Up @@ -1107,24 +1107,28 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
exactly_once_enabled = False
with self._exactly_once_enabled_lock:
exactly_once_enabled = self._exactly_once_enabled

expired_ack_ids = set()
if exactly_once_enabled:
if exactly_once_enabled:
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
else:
items = []
for message in received_messages:
request = requests.ModAckRequest(message.ack_id, self.ack_deadline, None)
request = requests.ModAckRequest(
message.ack_id, self.ack_deadline, None
)
items.append(request)
assert self._dispatcher is not None
assert self._scheduler is not None
#print(f"mk: streaming_pull_manager: calling scheduler.schedule() with items: {items} and deadline={self.ack_deadline}")
self._scheduler.schedule(self._dispatcher.modify_ack_deadline,items, self.ack_deadline)
# print(f"mk: streaming_pull_manager: calling scheduler.schedule() with items: {items} and deadline={self.ack_deadline}")
self._scheduler.schedule(
self._dispatcher.modify_ack_deadline, items, self.ack_deadline
)

#print("mk:streaming pull manager: proceeding with remaining on_response code")
# print("mk:streaming pull manager: proceeding with remaining on_response code")
with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None
Expand Down

0 comments on commit da96453

Please sign in to comment.