Skip to content

Commit

Permalink
Testing: Async init modack re-using dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Apr 22, 2024
1 parent ff229a5 commit bb6ffac
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
2 changes: 2 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Expand Up @@ -331,6 +331,8 @@ def modify_ack_deadline(
"""
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
#if len(items) > 0:
# print(f"mk: dispatcher.modify_ack_deadline() called with items={items}")
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
deadline_seconds_gen = (item.seconds for item in items)
Expand Down
Expand Up @@ -41,6 +41,7 @@
import google.cloud.pubsub_v1.subscriber.message
from google.cloud.pubsub_v1.subscriber import futures
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
from google.protobuf.timestamp_pb2 import Timestamp
from google.pubsub_v1 import types as gapic_types
from grpc_status import rpc_status # type: ignore
from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
Expand Down Expand Up @@ -1074,6 +1075,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
# protobuf message to significantly gain on attribute access performance.
received_messages = response._pb.received_messages

current_time = Timestamp()
current_time.GetCurrentTime()
for rm in received_messages:
rm.message.publish_time.CopyFrom(current_time)

_LOGGER.debug(
"Processing %s received message(s), currently on hold %s (bytes %s).",
Expand All @@ -1098,11 +1104,27 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received, as this tells the server that we've
# received them.
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
exactly_once_enabled = False
with self._exactly_once_enabled_lock:
exactly_once_enabled = self._exactly_once_enabled

expired_ack_ids = set()
if exactly_once_enabled:
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
else:
items = []
for message in received_messages:
request = requests.ModAckRequest(message.ack_id, self.ack_deadline, None)
items.append(request)
assert self._dispatcher is not None
assert self._scheduler is not None
#print(f"mk: streaming_pull_manager: calling scheduler.schedule() with items: {items} and deadline={self.ack_deadline}")
self._scheduler.schedule(self._dispatcher.modify_ack_deadline,items, self.ack_deadline)

#print("mk:streaming pull manager: proceeding with remaining on_response code")
with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None
Expand Down

0 comments on commit bb6ffac

Please sign in to comment.