-
-
Notifications
You must be signed in to change notification settings - Fork 663
/
gcppubsub.py
149 lines (114 loc) · 5.12 KB
/
gcppubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'''
Copyright (C) 2017-2024 Bryant Moscon - bmoscon@gmail.com
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import os
import io
from typing import Optional
from typing import IO
from typing import Union
from typing import AnyStr
import aiohttp
import google.api_core.exceptions
from google.cloud import pubsub_v1
from yapic import json
# Use gcloud.aio.pubsub for asyncio
# https://github.com/talkiq/gcloud-aio
from gcloud.aio.pubsub import PublisherClient, PubsubMessage
from cryptofeed.backends.backend import BackendBookCallback, BackendCallback
class GCPPubSubCallback:
def __init__(self, topic: Optional[str] = None, key: Optional[str] = None,
service_file: Optional[Union[str, IO[AnyStr]]] = None,
ordering_key: Optional[Union[str, io.IOBase]] = None, numeric_type=float, none_to=None):
'''
Backend using Google Cloud Platform Pub/Sub. Use requires an account with Google Cloud Platform.
Free tier allows 10GB messages per month.
Both the environment variables GCP_PROJECT='<project_id>' and GOOGLE_APPLICATION_CREDENTIALS='/path/key.json'
may be required.
topic: str
Topic name. Defaults to 'cryptofeed-{key}', for example 'cryptofeed-trades'
key: str
Setting key lets you override the symbol name.
The defaults are related to the data
being stored, i.e. trade, funding, etc
service_file: str or file obj
Loads credentials from a service account file.
If not provided, credentials will be loaded from the environment variable
'GOOGLE_APPLICATION_CREDENTIALS'. If inside a Google Cloud environment
that has a default service account, such as Compute Engine, Google Kubernetes Engine,
or App Engine the environment variable will already be set.
https://cloud.google.com/bigquery/docs/authentication/service-account-file
https://cloud.google.com/docs/authentication/production
ordering_key: str
if messages have the same ordering key and you publish the messages
to the same region, subscribers can receive the messages in order
https://cloud.google.com/pubsub/docs/publisher#using_ordering_keys
'''
self.key = key or self.default_key
self.ordering_key = ordering_key
self.numeric_type = numeric_type
self.none_to = none_to
self.topic = topic or f'cryptofeed-{self.key}'
self.topic_path = self.get_topic()
self.service_file = service_file
self.session = None
self.client = None
def get_topic(self):
publisher = pubsub_v1.PublisherClient()
project_id = os.getenv('GCP_PROJECT')
topic_path = PublisherClient.topic_path(project_id, self.topic)
try:
publisher.create_topic(request={"name": topic_path})
except google.api_core.exceptions.AlreadyExists:
pass
finally:
return topic_path
async def get_session(self):
if not self.session:
self.session = aiohttp.ClientSession()
return self.session
async def get_client(self):
if not self.client:
session = await self.get_session()
self.client = PublisherClient(
service_file=self.service_file, session=session
)
return self.client
async def write(self, data: dict):
'''
Publish message. For filtering, "feed" and "symbol" are added as attributes.
https://cloud.google.com/pubsub/docs/filtering
'''
client = await self.get_client()
payload = json.dumps(data).encode()
message = PubsubMessage(payload, feed=data['exchange'], symbol=data['symbol'])
await client.publish(self.topic_path, [message])
class TradeGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'trades'
class FundingGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'funding'
class BookGCPPubSub(GCPPubSubCallback, BackendBookCallback):
default_key = 'book'
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)
class TickerGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'ticker'
class OpenInterestGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'open_interest'
class LiquidationsGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'liquidations'
class CandlesGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'candles'
class OrderInfoGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'order_info'
class TransactionsGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'transactions'
class BalancesGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'balances'
class FillsGCPPubSub(GCPPubSubCallback, BackendCallback):
default_key = 'fills'