Skip to content

Commit

Permalink
Update GCP Pub/Sub backend
Browse files Browse the repository at this point in the history
Fixed breaking changes introduced by gcloud-aio dependency.

Closes bmoscon#433

Note: Pinned google_cloud_pubsub~=2.2.0 in setup.py because of an SSL issue in the Pub/Sumb emulator.
google_cloud_pubsub fix is merged, but not on pypi: googleapis/python-pubsub#297
  • Loading branch information
globophobe committed Mar 10, 2021
1 parent 651abbc commit e138c09
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
28 changes: 19 additions & 9 deletions cryptofeed/backends/gcppubsub.py
Expand Up @@ -36,7 +36,6 @@
class GCPPubSubCallback:
def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
service_file: Optional[Union[str, IO[AnyStr]]] = None,
client: Optional[PublisherClient] = None, session: Optional[aiohttp.ClientSession] = None,
ordering_key: Optional[Union[str, io.IOBase]] = None, numeric_type=float):
'''
Backend using Google Cloud Platform Pub/Sub. Use requires an account with Google Cloud Platform.
Expand All @@ -59,10 +58,6 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
or App Engine the environment variable will already be set.
https://cloud.google.com/bigquery/docs/authentication/service-account-file
https://cloud.google.com/docs/authentication/production
client: PublisherClient
Allows gcloud.aio.pubsub.PublisherClient reuse
session: ClientSession
Allows aiohttp.ClientSession resuse
ordering_key: str
if messages have the same ordering key and you publish the messages
to the same region, subscribers can receive the messages in order
Expand All @@ -73,28 +68,43 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
self.numeric_type = numeric_type
self.topic = topic or f'cryptofeed-{self.key}'
self.topic_path = self.get_topic()
self.session = session or aiohttp.ClientSession()
self.client = client or PublisherClient(service_file=service_file, session=self.session)
self.service_file = service_file
self.session = None
self.client = None

def get_topic(self):
publisher = pubsub_v1.PublisherClient()
project_id = os.getenv('GCP_PROJECT')
topic_path = PublisherClient.topic_path(project_id, self.topic)
try:
publisher.create_topic(topic_path)
publisher.create_topic(request={"name": topic_path})
except google.api_core.exceptions.AlreadyExists:
pass
finally:
return topic_path

async def get_session(self):
if not self.session:
self.session = aiohttp.ClientSession()
return self.session

async def get_client(self):
if not self.client:
session = await self.get_session()
self.client = PublisherClient(
service_file=self.service_file, session=session
)
return self.client

async def write(self, feed: str, symbol: str, timestamp: float, receipt_timestamp: float, data: dict):
'''
Publish message. For filtering, "feed" and "symbol" are added as attributes.
https://cloud.google.com/pubsub/docs/filtering
'''
client = await self.get_client()
payload = json.dumps(data).encode()
message = PubsubMessage(payload, feed=feed, symbol=symbol)
await self.client.publish(self.topic_path, [message])
await client.publish(self.topic_path, [message])


class TradeGCPPubSub(GCPPubSubCallback, BackendTradeCallback):
Expand Down
50 changes: 23 additions & 27 deletions examples/demo_gcppubsub.py
Expand Up @@ -6,9 +6,10 @@
'''
import os
import asyncio
import google.api_core.exceptions

from gcloud.aio.pubsub import PublisherClient, SubscriberClient, SubscriberMessage
import aiohttp
from gcloud.aio.pubsub import subscribe, PublisherClient, SubscriberClient, SubscriberMessage
from google.cloud import pubsub_v1
from yapic import json

from cryptofeed import FeedHandler
Expand All @@ -27,7 +28,7 @@
$ gcloud beta emulators pubsub start --host-port=0.0.0.0:8681
3. In another console, run the demo
$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681' python examples/demo_gcppubsub.py
$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681'; python examples/demo_gcppubsub.py
Try it with GCP Pub/Sub in the cloud
Expand All @@ -45,51 +46,46 @@


async def message_callback(message: SubscriberMessage) -> None:
try:
data = json.loads(message.data)
except Exception:
message.nack()
else:
print(data)
message.ack()
data = json.loads(message.data)
print(data)


def start_subscriber(loop, topic):
async def start_subscriber(topic):
client = SubscriberClient()

project_id = os.getenv('GCP_PROJECT')
project_id = os.getenv("GCP_PROJECT")
topic_path = PublisherClient.topic_path(project_id, topic)
subscription_path = PublisherClient.subscription_path(project_id, topic)

# Create subscription if it doesn't already exist
try:
client.create_subscription(subscription_path, topic_path)
except google.api_core.exceptions.PermissionDenied as e:
raise TypeError('Please set the GCP_PROJECT environment variable') from e

# Subscribe to the subscription, receiving a Future that acts as a keepalive
keep_alive = client.subscribe(subscription_path, message_callback)
await client.create_subscription(subscription_path, topic_path)
except aiohttp.client_exceptions.ClientResponseError as e:
if e.status == 409: # Subscription exists
pass
else:
raise TypeError("Please set the GCP_PROJECT environment variable") from e

# Have the client run forever, pulling messages from this subscription,
# passing them to the specified callback function, and wrapping it in an
# asyncio task.
client.run_forever(keep_alive)
# For demo with Pub/Sub emulator, maybe ack_deadline_cache_timeout 300
# On GCP, default seems fine.
# For more options, check gcloud-aio docs:
# https://github.com/talkiq/gcloud-aio/tree/master/pubsub
await subscribe(subscription_path, message_callback, client, ack_deadline_cache_timeout=300)


def main():
f = FeedHandler()

trades = TradeGCPPubSub()

cbs = {TRADES: trades}

f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks=cbs))

f.run(start_loop=False)

# Have the client run forever, pulling messages from subscription_path,
# passing them to the specified callback function
loop = asyncio.get_event_loop()

start_subscriber(loop, trades.topic)
loop.create_task(start_subscriber(trades.topic))
loop.run_forever()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -75,7 +75,7 @@ def run_tests(self):
],
extras_require={
"arctic": ["arctic"],
"gcp_pubsub": ["google_cloud_pubsub", "gcloud_aio_pubsub"],
"gcp_pubsub": ["google_cloud_pubsub~=2.2.0", "gcloud_aio_pubsub"],
"kafka": ["aiokafka>=0.7.0"],
"mongo": ["motor"],
"postgres": ["asyncpg"],
Expand Down

0 comments on commit e138c09

Please sign in to comment.