Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GCP Pub/Sub backend #434

Merged
merged 1 commit into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 19 additions & 9 deletions cryptofeed/backends/gcppubsub.py
Expand Up @@ -36,7 +36,6 @@
class GCPPubSubCallback:
def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
service_file: Optional[Union[str, IO[AnyStr]]] = None,
client: Optional[PublisherClient] = None, session: Optional[aiohttp.ClientSession] = None,
ordering_key: Optional[Union[str, io.IOBase]] = None, numeric_type=float):
'''
Backend using Google Cloud Platform Pub/Sub. Use requires an account with Google Cloud Platform.
Expand All @@ -59,10 +58,6 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
or App Engine the environment variable will already be set.
https://cloud.google.com/bigquery/docs/authentication/service-account-file
https://cloud.google.com/docs/authentication/production
client: PublisherClient
Allows gcloud.aio.pubsub.PublisherClient reuse
session: ClientSession
Allows aiohttp.ClientSession resuse
ordering_key: str
if messages have the same ordering key and you publish the messages
to the same region, subscribers can receive the messages in order
Expand All @@ -73,28 +68,43 @@ def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
self.numeric_type = numeric_type
self.topic = topic or f'cryptofeed-{self.key}'
self.topic_path = self.get_topic()
self.session = session or aiohttp.ClientSession()
self.client = client or PublisherClient(service_file=service_file, session=self.session)
self.service_file = service_file
self.session = None
self.client = None

def get_topic(self):
publisher = pubsub_v1.PublisherClient()
project_id = os.getenv('GCP_PROJECT')
topic_path = PublisherClient.topic_path(project_id, self.topic)
try:
publisher.create_topic(topic_path)
publisher.create_topic(request={"name": topic_path})
except google.api_core.exceptions.AlreadyExists:
pass
finally:
return topic_path

async def get_session(self):
if not self.session:
self.session = aiohttp.ClientSession()
return self.session

async def get_client(self):
if not self.client:
session = await self.get_session()
self.client = PublisherClient(
service_file=self.service_file, session=session
)
return self.client

async def write(self, feed: str, symbol: str, timestamp: float, receipt_timestamp: float, data: dict):
'''
Publish message. For filtering, "feed" and "symbol" are added as attributes.
https://cloud.google.com/pubsub/docs/filtering
'''
client = await self.get_client()
payload = json.dumps(data).encode()
message = PubsubMessage(payload, feed=feed, symbol=symbol)
await self.client.publish(self.topic_path, [message])
await client.publish(self.topic_path, [message])


class TradeGCPPubSub(GCPPubSubCallback, BackendTradeCallback):
Expand Down
49 changes: 22 additions & 27 deletions examples/demo_gcppubsub.py
Expand Up @@ -6,9 +6,9 @@
'''
import os
import asyncio
import google.api_core.exceptions

from gcloud.aio.pubsub import PublisherClient, SubscriberClient, SubscriberMessage
import aiohttp
from gcloud.aio.pubsub import subscribe, PublisherClient, SubscriberClient, SubscriberMessage
from yapic import json

from cryptofeed import FeedHandler
Expand All @@ -27,7 +27,7 @@
$ gcloud beta emulators pubsub start --host-port=0.0.0.0:8681

3. In another console, run the demo
$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681' python examples/demo_gcppubsub.py
$ export PUBSUB_EMULATOR_HOST='0.0.0.0:8681'; python examples/demo_gcppubsub.py


Try it with GCP Pub/Sub in the cloud
Expand All @@ -45,51 +45,46 @@


async def message_callback(message: SubscriberMessage) -> None:
try:
data = json.loads(message.data)
except Exception:
message.nack()
else:
print(data)
message.ack()
data = json.loads(message.data)
print(data)


def start_subscriber(loop, topic):
async def start_subscriber(topic):
client = SubscriberClient()

project_id = os.getenv('GCP_PROJECT')
project_id = os.getenv("GCP_PROJECT")
topic_path = PublisherClient.topic_path(project_id, topic)
subscription_path = PublisherClient.subscription_path(project_id, topic)

# Create subscription if it doesn't already exist
try:
client.create_subscription(subscription_path, topic_path)
except google.api_core.exceptions.PermissionDenied as e:
raise TypeError('Please set the GCP_PROJECT environment variable') from e

# Subscribe to the subscription, receiving a Future that acts as a keepalive
keep_alive = client.subscribe(subscription_path, message_callback)
await client.create_subscription(subscription_path, topic_path)
except aiohttp.client_exceptions.ClientResponseError as e:
if e.status == 409: # Subscription exists
pass
else:
raise TypeError("Please set the GCP_PROJECT environment variable") from e

# Have the client run forever, pulling messages from this subscription,
# passing them to the specified callback function, and wrapping it in an
# asyncio task.
client.run_forever(keep_alive)
# For demo with Pub/Sub emulator, maybe ack_deadline_cache_timeout 300
# On GCP, default seems fine.
# For more options, check gcloud-aio docs:
# https://github.com/talkiq/gcloud-aio/tree/master/pubsub
await subscribe(subscription_path, message_callback, client, ack_deadline_cache_timeout=300)


def main():
f = FeedHandler()

trades = TradeGCPPubSub()

cbs = {TRADES: trades}

f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks=cbs))

f.run(start_loop=False)

# Have the client run forever, pulling messages from subscription_path,
# passing them to the specified callback function
loop = asyncio.get_event_loop()

start_subscriber(loop, trades.topic)
loop.create_task(start_subscriber(trades.topic))
loop.run_forever()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -75,7 +75,7 @@ def run_tests(self):
],
extras_require={
"arctic": ["arctic"],
"gcp_pubsub": ["google_cloud_pubsub", "gcloud_aio_pubsub"],
"gcp_pubsub": ["google_cloud_pubsub~=2.2.0", "gcloud_aio_pubsub"],
"kafka": ["aiokafka>=0.7.0"],
"mongo": ["motor"],
"postgres": ["asyncpg"],
Expand Down