Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native async support for subscriptions #231

Open
csaroff opened this issue Sep 30, 2022 · 6 comments
Open

Add native async support for subscriptions #231

csaroff opened this issue Sep 30, 2022 · 6 comments

Comments

@csaroff
Copy link

csaroff commented Sep 30, 2022

Currently, async functions can't be used with the @sub context manager.

@sub(topic='my-topic'):
async def async_handler(data, **kwargs):
    print('data', data)

If you try, you'll get the following error.

Configuring worker with 1 subscription(s)...
  dicom_uploads - handle_upload
/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
  callback(message)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

There are some simple workarounds, but it would be nice if this was supported natively.

@sub(topic='my-topic'):
def sync_handler(data, **kwargs)
    return asyncio.run(async_handler(data, **kwargs))

async def async_handler(data, **kwargs):
    print('data', data)
@andrewgy8
Copy link
Contributor

andrewgy8 commented Sep 30, 2022

Curious about the scope of this. Is this just making the sub async compatible, or the worker as well?

I havent delved into it, but do you know if the google cloud pubsub library supports async as well?

@csaroff
Copy link
Author

csaroff commented Sep 30, 2022

It looks like the gcp pub/sub library doesn't support async yet - googleapis/python-pubsub#389

What are the tradeoffs of making the worker async-compatible vs just the sub?

It's probably more complicated than this, but I was thinking you could just check if self.func was async and use asyncio.run(or similar) here.

return self._func(data, **kwargs)

@andrewgy8
Copy link
Contributor

andrewgy8 commented Oct 4, 2022

What are the tradeoffs of making the worker async-compatible vs just the sub?

The subscriptions are essentially callbacks that are run within the worker. As such, a naive approach could be something you suggest with asyncio.run.
Unfortunately, if the GCP client does not support async, it will be nearly impossible to support async at a higher level.
Out of curiosity, and not being super familiar with async python, whats the benefit of having an async function in a background worker? I also assume if you are going to run async.run, it runs the async function synchronously anyways.

@csaroff
Copy link
Author

csaroff commented Oct 4, 2022

I also assume if you are going to run async.run, it runs the async function synchronously anyways.

Basically

whats the benefit of having an async function in a background worker?

If you want to be able to run TONS of concurrent callbacks that do exclusively io, I think you'd need native support for async.

For us, we're less concerned with the performance implications and just want to be able to inter-operate with async functions in our codebase.

Understandable if this isn't a priority right now, but wanted to file an issue since it seems like this library is mostly about ergonomics.

@andrewgy8
Copy link
Contributor

andrewgy8 commented Oct 5, 2022

Not too sure if itll work, but one attempt you can try is sub-classing the Subscription class like here but with the asyncio.run in the call method.

from rele import Subscription

class DoSomethingSub(Subscription):
    topic = 'photo-uploaded'

    def __init__(self):
        self._func = self.callback_func
        super().__init__(self._func, self.topic)

    async def callback_func(self, data, **kwargs):
		return await asyncio.sleep(2)
	
	def __call__(self, data, **kwargs):
		return asyncio.run(self._func(data, **kwargs))

@csaroff
Copy link
Author

csaroff commented Oct 5, 2022

Makes sense. I think I'll stick with the workaround for now. It's actually not so bad.

Appreciate all the help ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants