Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing hangs with fresh PublisherClient instance #977

Open
acolavin opened this issue Aug 10, 2023 · 0 comments
Open

Publishing hangs with fresh PublisherClient instance #977

acolavin opened this issue Aug 10, 2023 · 0 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.

Comments

@acolavin
Copy link

Howdy folks,

We ran across undesirable behavior stemming from this python pubsub library. We found several unintuitive workarounds (described below), and we're sharing the behavior here for the developers' benefit, and for others who run into this behavior.

Briefly, we found that a submitting many jobs of intermediate message size to a nascent PublisherClient instance causes publishing to hang. Smaller or larger messages don't seem to cause hanging. The problematic message size threshold can be altered by changing the Publisher batch settings. Submitting a single job to the PublisherClient to completion also suppresses this behavior, as does checking credentials before submitting the bolus of jobs.

We do see a stack trace that suggests an authentication issue reminiscent of similar issues reported in other related libraries, such as this one, but we believe the issue stems from how pubsub handles credentials and job batching.

All this and more is enumerated in the code for reproducing the behavior.

Also of note: we could only reproduce this behavior on ubuntu. I could not figure out how to reproduce this on a mac.

Environment details

  • OS type and version: Ubuntu 22.04.2 LTS (running on docker, COS GCP host)
  • Python version: 3.8.17, though replicated in py39
  • pip version: 23.1 (via poetry)
  • google-cloud-pubsub version: 2.18.2
  • google-auth: 2.22.0
  • grpcio: 1.54.0

Steps to reproduce

from google.cloud import pubsub

# error prevention methods. Set either of these to True and the hanging does not occur.
suppress_hang_method_1 = False # Set to True to check permissions before sending jobs
suppress_hang_method_2 = False # Set to True to wait for the first job future to resolve before submitting other jobs

# replace with real topic
topic = 'projects/<project-name>/topics/<topic-name>' 


# With default batch settings, we find that a job size between 0.5-1.5MB triggers the behavior. 
# With smaller `max_bytes` batching, the problematic job size is between 0.01-2MB
# See the bottom of this script for other combinations of batch and message sizes that cause issues.
max_bytes = 1_000_000 # This is default value
message_size = 1_000_000 # in bytes, roughly

message = b'a' * message_size

print(f"Message size is {message.__sizeof__()/1000./1000.}MB")
print(f"Batch size is {max_bytes}")

# If you change max_bytes to 80, the problematic job size is smaller. 
publisher = pubsub.PublisherClient(
    pubsub.types.BatchSettings(
        max_bytes = max_bytes,
    ))


if suppress_hang_method_1:
    # checking permissions using the publisher instance somehow prevents the job publishing from hanging
    permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"]
    
    allowed_permissions = publisher.test_iam_permissions(
        request={"resource": topic, "permissions": permissions_to_check}
    )

futures = list()
for _ in range(100): # submit 100 messages
    
    if suppress_hang_method_2 and len(futures)==1:
        # allowing first message future to resolve prevents hanging
        futures[0].result()

    futures.append(publisher.publish(topic=topic, data=message))
    
print(f"The future result is: {futures[0].result()}") # this is what hangs indefinitely

# batch/message size combinations that cause or don't cause hanging:
nonproblematic_batch_message_combinations = [(1_000_000, 10), # (max_bytes, message_size)
                                             (1_000_000, 100),
                                             (1_000_000, 1_000),
                                             (1_000_000, 300_000),
                                             (1_000_000, 3_000_000),
                                             (1_000_000, 6_000_000),
                                             (100, 1),
                                             (100, 5_000_000)] # notice that very large messages work fine

problematic_batch_message_combinations = [(1_000_000, 500_000), 
                                          (1_000_000, 1_000_000),
                                          (1_000_000, 1_500_000),
                                          (100, 100),
                                          (100, 1_000_000)]

Code example

# example

Stack trace

Traceback (most recent call last):
  File "/mnt/jupyter-disk/.poetry-envs/***lib/python3.8/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/urllib3/connectionpool.py", line 798, in urlopen
    retries = retries.increment(
  File "/mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by SSLError(SSLZeroReturnError(6, 'TLS/SSL connection has been closed (EOF) (_ssl.c:1131)')))

Subsequent attempts to submit jobs in the same python thread will eventually yield the following traceback:

The above exception was the direct cause of the following exception:

RetryError                                Traceback (most recent call last)
Cell In[12], line 1
----> 1 first_future.result() # this hangs

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/futures.py:66, in Future.result(self, timeout)
     48 def result(self, timeout: Union[int, float] = None) -> str:
     49     """Return the message ID or raise an exception.
     50 
     51     This blocks until the message has been published successfully and
   (...)
     64             call execution.
     65     """
---> 66     return super().result(timeout=timeout)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py:274, in Batch._commit(self)
    271 batch_transport_succeeded = True
    272 try:
    273     # Performs retries for errors defined by the retry configuration.
--> 274     response = self._client._gapic_publish(
    275         topic=self._topic,
    276         messages=self._messages,
    277         retry=self._commit_retry,
    278         timeout=self._commit_timeout,
    279     )
    280 except google.api_core.exceptions.GoogleAPIError as exc:
    281     # We failed to publish, even after retries, so set the exception on
    282     # all futures and exit.
    283     self._status = base.BatchStatus.ERROR

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/client.py:267, in Client._gapic_publish(self, *args, **kwargs)
    265 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
    266     """Call the GAPIC public API directly."""
--> 267     return super().publish(*args, **kwargs)

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/pubsub_v1/services/publisher/client.py:831, in PublisherClient.publish(self, request, topic, messages, retry, timeout, metadata)
    826 metadata = tuple(metadata) + (
    827     gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
    828 )
    830 # Send the request.
--> 831 response = rpc(
    832     request,
    833     retry=retry,
    834     timeout=timeout,
    835     metadata=metadata,
    836 )
    838 # Done; return the response.
    839 return response

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py:113, in _GapicCallable.__call__(self, timeout, retry, *args, **kwargs)
    110     metadata.extend(self._metadata)
    111     kwargs["metadata"] = metadata
--> 113 return wrapped_func(*args, **kwargs)

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
    345 target = functools.partial(func, *args, **kwargs)
    346 sleep_generator = exponential_sleep_generator(
    347     self._initial, self._maximum, multiplier=self._multiplier
    348 )
--> 349 return retry_target(
    350     target,
    351     self._predicate,
    352     sleep_generator,
    353     self._timeout,
    354     on_error=on_error,
    355 )

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/retry.py:207, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
    203     next_attempt_time = datetime_helpers.utcnow() + datetime.timedelta(
    204         seconds=sleep
    205     )
    206     if deadline < next_attempt_time:
--> 207         raise exceptions.RetryError(
    208             "Deadline of {:.1f}s exceeded while calling target function".format(
    209                 timeout
    210             ),
    211             last_exc,
    212         ) from last_exc
    214 _LOGGER.debug(
    215     "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
    216 )
    217 time.sleep(sleep)

RetryError: Deadline of 600.0s exceeded while calling target function, last exception: 504 Deadline Exceeded
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Aug 10, 2023
@acocuzzo acocuzzo added the type: question Request for information or clarification. Not an issue. label Aug 20, 2023
@liuyunnnn liuyunnnn assigned pradn and unassigned liuyunnnn Oct 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

5 participants