From aa51ca5099167182ea99550e39abbcaa093a0ed3 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 10 Mar 2021 23:09:45 +0900 Subject: [PATCH] Update GCP Pub/Sub backend (#434) Fixed breaking changes introduced by gcloud-aio dependency. Closes https://github.com/bmoscon/cryptofeed/issues/433 Note: Pinned google_cloud_pubsub~=2.2.0 in setup.py because of an SSL issue in the Pub/Sumb emulator. google_cloud_pubsub fix is merged, but not on pypi: https://github.com/googleapis/python-pubsub/pull/297 --- cryptofeed/backends/gcppubsub.py | 28 ++++++++++++------ examples/demo_gcppubsub.py | 49 ++++++++++++++------------------ setup.py | 2 +- 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/cryptofeed/backends/gcppubsub.py b/cryptofeed/backends/gcppubsub.py index 239acfc34..0036e8446 100644 --- a/cryptofeed/backends/gcppubsub.py +++ b/cryptofeed/backends/gcppubsub.py @@ -36,7 +36,6 @@ class GCPPubSubCallback: def __init__(self, topic: Optional[str] = None, key: Optional[str] = None, service_file: Optional[Union[str, IO[AnyStr]]] = None, - client: Optional[PublisherClient] = None, session: Optional[aiohttp.ClientSession] = None, ordering_key: Optional[Union[str, io.IOBase]] = None, numeric_type=float): ''' Backend using Google Cloud Platform Pub/Sub. Use requires an account with Google Cloud Platform. @@ -59,10 +58,6 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None, or App Engine the environment variable will already be set. https://cloud.google.com/bigquery/docs/authentication/service-account-file https://cloud.google.com/docs/authentication/production - client: PublisherClient - Allows gcloud.aio.pubsub.PublisherClient reuse - session: ClientSession - Allows aiohttp.ClientSession resuse ordering_key: str if messages have the same ordering key and you publish the messages to the same region, subscribers can receive the messages in order @@ -73,28 +68,43 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None, self.numeric_type = numeric_type self.topic = topic or f'cryptofeed-{self.key}' self.topic_path = self.get_topic() - self.session = session or aiohttp.ClientSession() - self.client = client or PublisherClient(service_file=service_file, session=self.session) + self.service_file = service_file + self.session = None + self.client = None def get_topic(self): publisher = pubsub_v1.PublisherClient() project_id = os.getenv('GCP_PROJECT') topic_path = PublisherClient.topic_path(project_id, self.topic) try: - publisher.create_topic(topic_path) + publisher.create_topic(request={"name": topic_path}) except google.api_core.exceptions.AlreadyExists: pass finally: return topic_path + async def get_session(self): + if not self.session: + self.session = aiohttp.ClientSession() + return self.session + + async def get_client(self): + if not self.client: + session = await self.get_session() + self.client = PublisherClient( + service_file=self.service_file, session=session + ) + return self.client + async def write(self, feed: str, symbol: str, timestamp: float, receipt_timestamp: float, data: dict): ''' Publish message. For filtering, "feed" and "symbol" are added as attributes. https://cloud.google.com/pubsub/docs/filtering ''' + client = await self.get_client() payload = json.dumps(data).encode() message = PubsubMessage(payload, feed=feed, symbol=symbol) - await self.client.publish(self.topic_path, [message]) + await client.publish(self.topic_path, [message]) class TradeGCPPubSub(GCPPubSubCallback, BackendTradeCallback): diff --git a/examples/demo_gcppubsub.py b/examples/demo_gcppubsub.py index 11df13327..dd6ed11b5 100644 --- a/examples/demo_gcppubsub.py +++ b/examples/demo_gcppubsub.py @@ -6,9 +6,9 @@ ''' import os import asyncio -import google.api_core.exceptions -from gcloud.aio.pubsub import PublisherClient, SubscriberClient, SubscriberMessage +import aiohttp +from gcloud.aio.pubsub import subscribe, PublisherClient, SubscriberClient, SubscriberMessage from yapic import json from cryptofeed import FeedHandler @@ -27,7 +27,7 @@ $ gcloud beta emulators pubsub start --host-port=0.0.0.0:8681 3. In another console, run the demo -$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681' python examples/demo_gcppubsub.py +$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681'; python examples/demo_gcppubsub.py Try it with GCP Pub/Sub in the cloud @@ -45,51 +45,46 @@ async def message_callback(message: SubscriberMessage) -> None: - try: - data = json.loads(message.data) - except Exception: - message.nack() - else: - print(data) - message.ack() + data = json.loads(message.data) + print(data) -def start_subscriber(loop, topic): +async def start_subscriber(topic): client = SubscriberClient() - - project_id = os.getenv('GCP_PROJECT') + project_id = os.getenv("GCP_PROJECT") topic_path = PublisherClient.topic_path(project_id, topic) subscription_path = PublisherClient.subscription_path(project_id, topic) # Create subscription if it doesn't already exist try: - client.create_subscription(subscription_path, topic_path) - except google.api_core.exceptions.PermissionDenied as e: - raise TypeError('Please set the GCP_PROJECT environment variable') from e - - # Subscribe to the subscription, receiving a Future that acts as a keepalive - keep_alive = client.subscribe(subscription_path, message_callback) + await client.create_subscription(subscription_path, topic_path) + except aiohttp.client_exceptions.ClientResponseError as e: + if e.status == 409: # Subscription exists + pass + else: + raise TypeError("Please set the GCP_PROJECT environment variable") from e - # Have the client run forever, pulling messages from this subscription, - # passing them to the specified callback function, and wrapping it in an - # asyncio task. - client.run_forever(keep_alive) + # For demo with Pub/Sub emulator, maybe ack_deadline_cache_timeout 300 + # On GCP, default seems fine. + # For more options, check gcloud-aio docs: + # https://github.com/talkiq/gcloud-aio/tree/master/pubsub + await subscribe(subscription_path, message_callback, client, ack_deadline_cache_timeout=300) def main(): f = FeedHandler() trades = TradeGCPPubSub() - cbs = {TRADES: trades} f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks=cbs)) - f.run(start_loop=False) + # Have the client run forever, pulling messages from subscription_path, + # passing them to the specified callback function loop = asyncio.get_event_loop() - - start_subscriber(loop, trades.topic) + loop.create_task(start_subscriber(trades.topic)) + loop.run_forever() if __name__ == '__main__': diff --git a/setup.py b/setup.py index fcc9385e1..4baa005e8 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ def run_tests(self): ], extras_require={ "arctic": ["arctic"], - "gcp_pubsub": ["google_cloud_pubsub", "gcloud_aio_pubsub"], + "gcp_pubsub": ["google_cloud_pubsub~=2.2.0", "gcloud_aio_pubsub"], "kafka": ["aiokafka>=0.7.0"], "mongo": ["motor"], "postgres": ["asyncpg"],