Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio coroutines for common tasks #389

Open
mbrancato opened this issue Apr 16, 2021 · 12 comments
Open

Asyncio coroutines for common tasks #389

mbrancato opened this issue Apr 16, 2021 · 12 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@mbrancato
Copy link

mbrancato commented Apr 16, 2021

The current status of asyncio has mostly been answered here already: #218 (comment)

This feature request is to ask for a formal exposure of high-level awaitable methods for interacting with the PubSub library and reducing the use of threads. Although, if the library still wants to manage threads, that seems fine as long as how we use the library would be awaitable and the callback are run in the same event loop. I don't want to be too prescriptive on the implementation in my request.

Some examples, assuming SubscriberClient and PublisherClient were already setup...

example subscribe:

async def my_func(msg):
    # ... do something
    pass


subscription_path = f"projects/{project_id}/subscriptions/{subscription}"
streaming_pull_async = subscriber.subscribe_async(subscription_path, callback=my_func,)

try:
    await streaming_pull_async.result(timeout=pull_timeout)
except TimeoutError:
    # Handle flow timeout
    pass

example publish:

topic_path = f"projects/{project_id}/topics/{output_topic}"
publish_value_async = publisher.publish_async(topic_path, value)
await publish_value_async.result()
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Apr 16, 2021
@plamut plamut added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Apr 16, 2021
@plamut
Copy link
Contributor

plamut commented Apr 16, 2021

@mbrancato So essentially the request is to offer the same functionality the existing (sync) client offers, but in async/await flavor?

I'll forward it, but I suspect this will require substantial amount of work to match the current client's features.

@mbrancato
Copy link
Author

That's basically it @plamut. It would prevent needing to wrap some of the pubsub calls in things like run_in_executor() and wrapping the callback.

In case anyone needs a solution, a good example of what works now is here: https://github.com/allenporter/python-google-nest-sdm/blob/cb2dc1bb5c61284e6f489f9e8933dfa758645196/google_nest_sdm/google_nest_subscriber.py#L49

@plamut
Copy link
Contributor

plamut commented Apr 22, 2021

@mbrancato I brought this up at a meeting, and it's quite a common request in other client libraries, too.

This feature is being considered, but it's currently in exploratory phase, e.g. how to approach not to duplicate the hand-written logic too much, etc., as the amount of work required can be substantial.

As such, there's no ETA at the moment, but I'll keep this request open for visibility, and for any future updates.

@vikahl
Copy link

vikahl commented Jan 26, 2022

This feature is being considered, but it's currently in exploratory phase, e.g. how to approach not to duplicate the hand-written logic too much, etc., as the amount of work required can be substantial.

I understand that there is no ETA on this specific feature, but has there been any thoughts/plan about how approach the problem in general?

@acocuzzo
Copy link
Contributor

acocuzzo commented Feb 4, 2022

We do want to support keeping up with new language features like async/await. However, we're also careful to keep the API surface similar across languages, so we need to co-ordinate between languages to do this. This is something we will look at for a 2.0 API.

@geosach
Copy link

geosach commented May 18, 2022

Hello, until a v2.0 API is implemented, is it possible to provide some examples on how to combine the library with projects that use other async libraries? For instance, in our FastAPI based project we need to implement consumers that store data on a mongo db database using an async mongo library (AsyncIOMotorClient). I believe that this is a common use case.

Is this something that can be implemented in a thread safe way, using asyncio and functions like run_until_complete? We can find some workarounds but we don't feel confident that these workarounds are safe since they combine asyncio with the internal threading mechanism of the library. Do you have any recommendations or examples?

@adriangb
Copy link

adriangb commented Jun 8, 2022

+1 for this feature request

@samskiter
Copy link

As there is now a v2.0 version of this library, is an Async-flavour interface now implemented? The documentation on 2.0 is very thin on the ground...

@NickNaskida
Copy link

NickNaskida commented Nov 11, 2023

Almost end of 2023 but still no updates 😢

+999 for a feature

@plamut

@plamut
Copy link
Contributor

plamut commented Nov 12, 2023

@NickNaskida I am not a maintainer of this library anymore (since the start of 2022), and thus cannot say much about its current state or its roadmap, I'm afraid.

@mbrancato
Copy link
Author

@NickNaskida This may be helpful, but here is an example of using the async publisher. It may not be supported by Google to use these directly.

import asyncio
import logging
from typing import MutableSequence

from google.pubsub_v1 import PubsubMessage
from google.pubsub_v1.services.publisher.async_client import PublisherAsyncClient
from google.pubsub_v1.services.publisher.transports import PublisherGrpcAsyncIOTransport
from grpc import ChannelConnectivity


async def run():
    publisher_client = PublisherAsyncClient(transport="grpc_asyncio")
    transport = publisher_client.transport
    if isinstance(transport, PublisherGrpcAsyncIOTransport):
        await transport.grpc_channel.channel_ready()
        chan = transport.grpc_channel.get_state(try_to_connect=True)
        if chan != ChannelConnectivity.READY:
            logging.error("Channel is not ready")
            return

    messages = [
        PubsubMessage(
            {
                "data": b"FOO",
                "attributes": {
                    "message_type": "test",
                    "customer": "acme",
                },
            }
        ),
        PubsubMessage(
            {
                "data": b"BAR",
                "attributes": {
                    "message_type": "test",
                    "customer": "acme",
                },
            }
        ),
    ]

    if await publish_pubsub(messages, publisher_client):
        logging.info("Published messages")

    await publisher_client.transport.close()


async def publish_pubsub(
    messages: MutableSequence[PubsubMessage],
    client: PublisherAsyncClient,
) -> bool:
    topic = client.topic_path("my_project", "my_topic")

    try:
        resp = await client.publish(topic=topic, messages=messages)
        if len(resp.message_ids) != len(messages):
            logging.error("Failed to publish some messages to Pub/Sub")
            return False
    except asyncio.TimeoutError:
        logging.warning("Timeout pushing event to Pub/Sub")
        return False
    else:
        return True

@steve-marmalade
Copy link

In case it's helpful to anyone else, I ultimately decided to use the synchronous PublisherClient in my FastAPI service and convert the future to an awaitable via asyncio.wrap_future(future). I think this is the easiest way to use GCP Pub/Sub in a Python async environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests