Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub: Consider a blocking publisher to prevent OOMing #16

Closed
evanj opened this issue May 16, 2019 · 6 comments · Fixed by #96
Closed

Pubsub: Consider a blocking publisher to prevent OOMing #16

evanj opened this issue May 16, 2019 · 6 comments · Fixed by #96
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@evanj
Copy link

evanj commented May 16, 2019

Publishing medium-sized (32 kiB) messages in a loop causes the process's memory usage to quickly run out. I set a 2 GiB memory limit, and the process runs out of memory after publishing about 20000 messages. I believe there are two issues:

  1. There is no limit to the number of queued bytes in the process.
  2. The client library keeps 3 copies of the messages and retains them until the batch "monitor" thread finishes.

As far as I can see, there no good way to use the current client library to publish large batches of messages without HUGE memory consumption. I believe this can be fixed in two ways:

Reduce memory of the current library:

  1. Release messages when the batch is done: In thread.Batch._commit, set self._messages to None after it is published. This seems to make memory consumption better, but I didn't test it super carefully.

  2. Eliminate an extra copy: Change thread.Batch to build a PubsubRequest, and pass that into the gapic.PublisherClient, rather than a list of messages that then need to be copied. This reduces one copy of messages, which seems to make memory consumption better (and possibly make things faster? again not tested carefully)

Back Pressure

Change the publisher to keep a queue of batches, and block after there are more than N in flight at once.

I implemented a client that takes the blocking queue approach. It can publish this workload without ever exceeding 300 MiB of memory. Only 3 in-flight batches were necessary to have the process be CPU bound when running in Google Cloud, so I don't think the queue doesn't even need to be too large.

Environment details

OS: Linux, ContainerOS (GKE), Container is Debian9 (using distroless)
Python: 3.5.3
API: google-cloud-python 0.41.0

Steps to reproduce

  1. Package the following program in a Docker container.
  2. Run it in Kubernetes with a memory limit of 2 GiB
  3. Watch it blow up when you run it.

Code example

#!/usr/bin/env python3
'''
Create the topic and a subscription:

gcloud pubsub topics create deleteme_publish_memleak_test
gcloud pubsub subscriptions create deleteme_memleak_subscription --topic=deleteme_publish_memleak_test
'''

from google.cloud import pubsub_v1
import argparse
import google.auth
import logging
import time


def main():
    parser = argparse.ArgumentParser(description='mass publish to pub/sub')
    parser.add_argument('--topic',
                        default='deleteme_publish_memleak_test',
                        help='topic to publish to')
    parser.add_argument('--messages',
                        type=int,
                        default=200000,
                        help='number of messages to publish')
    parser.add_argument('--bytes_per_message',
                        type=int,
                        default=32 * 1024,
                        help='bytes per message to publish')
    parser.add_argument('--log_every_n', type=int, default=2000, help='log every N messages')
    args = parser.parse_args()

    credentials, project_id = google.auth.default()
    publish_client = pubsub_v1.PublisherClient(credentials=credentials)
    topic_path = publish_client.topic_path(project_id, args.topic)

    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(levelname)s %(name)s: %(message)s',
    )
    logging.info('publishing %d messages to topic %s; %d bytes per message; %.1f total MiB',
                 args.messages, topic_path, args.bytes_per_message,
                 (args.messages * args.bytes_per_message) / 1024. / 1024.)

    start = time.time()
    for i in range(args.messages):
        message = (i % 256).to_bytes(1, byteorder='little') * args.bytes_per_message
        publish_client.publish(topic_path, message)
        if i % args.log_every_n == 0:
            logging.info('published %d messages', i)

    end = time.time()
    logging.info('done publishing in %f s', end - start)


if __name__ == '__main__':
    main()
@sduskis
Copy link
Contributor

sduskis commented May 17, 2019

@evanj, thanks for raising the issue, and the clear explanation.

@plamut can you please check this out? The only thing we can likely tackle in the near term is the extra copies.

We'd need to reevaluate the current non-blocking approach, which is the default across all clients. It would be worthwhile to explore a blocking option in the long term.

@kamalaboulhosn we'll need you to make the call about this feature.

@sduskis sduskis changed the title pubsub: publishing in a loop uses TONS of memory pubsub: Consider a blocking publisher to prevent OOMing May 17, 2019
@tseaver tseaver changed the title pubsub: Consider a blocking publisher to prevent OOMing Pubsub: Consider a blocking publisher to prevent OOMing May 20, 2019
@dangermike
Copy link

@sduskis you have unassigned @plamut. @kamalaboulhosn: What was the determination about creating a back-pressure mechanism?

@kamalaboulhosn
Copy link
Contributor

I think it's something we'll still need to discuss. We had considered it at some point. We also considered exposing the stats for the outstanding requests so that the user could make the decision about what they wanted to do when there were a lot of outstanding sends.

@evanj
Copy link
Author

evanj commented Jul 11, 2019

FWIW, I believe the official Go client will return an error when the number of buffered bytes exceeds a threshold, by default: 10 * maxMessageLength: https://github.com/googleapis/google-cloud-go/blob/master/pubsub/topic.go#L103

@plamut plamut transferred this issue from googleapis/google-cloud-python Jan 31, 2020
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jan 31, 2020
@yoshi-automation yoshi-automation added 🚨 This issue needs some love. triage me I really want to be triaged. labels Jan 31, 2020
@plamut plamut added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Jan 31, 2020
@yoshi-automation yoshi-automation removed triage me I really want to be triaged. 🚨 This issue needs some love. labels Jan 31, 2020
@plamut plamut self-assigned this May 7, 2020
@plamut
Copy link
Contributor

plamut commented May 7, 2020

@evanj I have an update on this, #96 will add the desired functionality to the publisher client.

It will allow configuring the limits for in-flight messages being published, and the action to take if these limits are exceeded. The default action is IGNORE, but one will also be able to specify BLOCK or ERROR.

@evanj
Copy link
Author

evanj commented May 8, 2020

Amazing! Thank you!

@plamut plamut closed this as completed in #96 Jun 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants