Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Publisher 504 Deadline Exceeded when sending bulk messages #72

Closed
dillonjohnson opened this issue Apr 14, 2020 · 8 comments · Fixed by #96
Closed

PubSub Publisher 504 Deadline Exceeded when sending bulk messages #72

dillonjohnson opened this issue Apr 14, 2020 · 8 comments · Fixed by #96
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@dillonjohnson
Copy link

dillonjohnson commented Apr 14, 2020

Environment details

  • OS type and version: PRETTY_NAME="Debian GNU/Linux 10 (buster)"
  • Python version: python --version python3.7
  • pip version: pip --version 20.0.2
  • google-cloud-pubsub version: pip show google-cloud-pubsub 1.4.2

Steps to reproduce

  1. Send large number of messages to a PubSub Topic fails when I try to send 5000 without pausing and waiting, for example.
  2. Check logs

Code example

# I have also tried this using the callback and hit the same error. This is what I'm doing now though.
iter_size = 5000
iter_count = 0
futures = []
for row in rows:
     data = json.dumps(row, default=lambda x: str(x)).encode('utf-8')
     future = publish_client.publish(topic, data, origin='xxx')
     futures.append(future)
     published_count += 1
     iter_count += 1
     if iter_count >= iter_size:
           [f.result() for f in futures]
           futures = []
           iter_count = 0
     count += 1
     [f.result() for f in futures]

Stack trace

[2020-04-14 14:19:23,788] {{thread.py:264}} ERROR - Failed to publish 141 messages.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
     return callable_(*args, **kwargs)
   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__
     return _end_unary_response_blocking(state, call, False, None)
   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
     raise _InactiveRpcError(state)
 grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
 	status = StatusCode.DEADLINE_EXCEEDED
 	details = "Deadline Exceeded"
 	debug_error_string = "{"created":"@1586873963.056617563","description":"Error received from peer ipv4:108.177.122.95:443","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Deadline Exceeded","grpc_status":4}"
 >

Other Details:

I've found a similar issue here
googleapis/google-cloud-java#3867

Which references to this issue
googleapis/google-cloud-java#3003

Then
googleapis/java-pubsub#31

This will normally be running on GKE, so I understand me running it locally may have some variation of behavior due to network speed. However, we would prefer to avoid having this issue either way.

Is this something that is planned/intended to be supported or should I implement auto-throttling?

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Apr 14, 2020
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Apr 15, 2020
@plamut plamut added type: question Request for information or clarification. Not an issue. and removed triage me I really want to be triaged. labels Apr 15, 2020
@plamut
Copy link
Contributor

plamut commented Apr 15, 2020

@dillonjohnson Can the issue be reproduced consistently? Asking because I tried it a few times (from my local machine), and it worked fine, even though the publishing took quite some time.

The only difference on my side could be the message size, I just sent generic short-ish strings instead of potentially long JSON rows. What's the typical row length in the data you used?

Update: I also tried tried with a CSV containing some public data on real estate sales (typical stringified row length ~250 bytes), same result, no error occurred.

@dillonjohnson
Copy link
Author

@plamut Sorry for the delay. I should've given a better example. I do feel it is somewhat network bound to my local, but we will be sending a fair amount of messages so I don't want to outpace the network on K8s. For me locally, this below is crashing every time.

from google.cloud import bigquery
from google.cloud import pubsub_v1
from time import sleep
import json
ack_count = 0
sent_count = 0
def get_callback(f):
    def callback(f):
        try:
            global ack_count
            ack_count += 1
        except:  # noqa
            print("Please handle {} for {}.".format(f.exception()))
    return callback
project_id = ''  # TODO: insert here
topic_name = ''  # TODO: insert here
bq = bigquery.Client()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
rows = bq.query("select * from bigquery-public-data.samples.gsod limit 20000")
for row in rows:
    sent_count += 1
    future = publisher.publish(topic_path, json.dumps(dict(row)).encode('utf-8'))
    future.add_done_callback(get_callback(future))
while sent_count < ack_count:
    sleep(5)

This will work using public data, but our average message is somewhere around 2K bytes long.

If this doesn't produce the issue for you, you should be able to raise the limit and hit it. I think at least.

Thanks for the quick response.

@plamut
Copy link
Contributor

plamut commented Apr 16, 2020

@dillonjohnson Thanks for the updated sample, but I still wasn't able to reproduce the error even if I bumped the row limit to 200k (tried multiple times) - at least not without "cheating". I only had success if I briefly disabled my WiFi during the process, in which case I saw the 504 Deadline Exceeded errors.

Would it be feasible for you to try running the script from a test K8s instance and see if the error is reproducible there as well?

@yan-hic
Copy link

yan-hic commented Apr 16, 2020

@plamut It all depends on the upload bandwidth on the client side. In the US, on coax, you could get only 5Mb/s up and more likely to hit that error. It is pretty much unlikely on/within GCP.

As per https://cloud.google.com/pubsub/docs/troubleshooting#publish-deadline-exceeded, rate limiting should be built in the client so local use is supported.
Or maybe making 504 a transient (retriable) error.

Another possible factor for the timeout is the GRPC serialization. As per https://cloud.google.com/blog/products/data-analytics/testing-cloud-pubsub-clients-to-maximize-streaming-performance, Python does not perform well and would require lots of CPU's to improve throughput.

For "light clients" i.e. limited CPU and bandwidth, I am wondering if a http GCF that takes in the (gzipped compressed) message and publishes it to pubsub would not perform better. Calling script could even use aiohttp.

Thoughts ?

@plamut
Copy link
Contributor

plamut commented Apr 17, 2020

@yiga2

In the US, on coax, you could get only 5Mb/s up and more likely to hit that error.

That's indeed a difference. I tested on a 40/40 Mbps home fiber optic with a decent router, which is probably why all the load went through fine.

Publish rate limiting is something that is indeed worth considering IMO. There already exists a similar feature request, albeit for a different reason (to avoid consuming too much memory). In fact, this feature was actually already considered at some point.

I can check what the current status is at the next weekly meeting and post when there's more info (cc: @kamalaboulhosn).

For "light clients" i.e. limited CPU and bandwidth, I am wondering if a http GCF that takes in the (gzipped compressed) message and publishes it to pubsub would not perform better. Calling script could even use aiohttp

AFAIK, the underlying gRPC transport does not compress the payload by default. If the bandwidth on the client machine is the bottleneck, and the nature of the message data allows for significant compression gains, then performance gains might be achievable, although that would require additional CPU resources for compressing the data first... hard to say in a vacuum.


@dillonjohnson If you reduce the number of messages to publish, does the error on your machine still occur? If it doesn't, it means that it's indeed an issue with the CPU/network bottleneck.

Unfortunately, the publisher client currently does not do any throttling on its own, it only batches multiple successive publish() calls, and publishes messages in batches. If you need publish throttling "soon", you will have to implement it by yourself at this time, I'm afraid, but I'll let you know if there are plans to implement this in the client itself.

@plamut plamut added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed type: question Request for information or clarification. Not an issue. labels Apr 17, 2020
@dillonjohnson
Copy link
Author

@plamut
If I reduce the number of calls in my code, the issue doesn't happen. It's fairly consistent as to which point it begins to fail for me. It is very much linked to network speed.

I've implemented a simple throttling in the application I've built, using the logic in the original question (send x amount, wait for those to send, send more, so on and so forth).

Thanks for looking into this.

I appreciate your time and help and looking forward to updates in the future.

@plamut
Copy link
Contributor

plamut commented Apr 20, 2020

@dillonjohnson Thanks for the additional checks, and good that you've found the way around the problem!

Since I can imagine other users could face similar issues, too, I'll bring this topic up at the next PubSub meeting and see what the plans are.

@plamut
Copy link
Contributor

plamut commented May 7, 2020

@dillonjohnson There is an upcoming feature (PR #96) that you might be interested in.

It's not exactly throttling, but still allows one to specify the desired thresholds for the number of in-flight messages being published or their total size, and the desired action to take if these thresholds are exceeded. The default is IGNORE, but one can also choose BLOCK or ERROR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants