-
Notifications
You must be signed in to change notification settings - Fork 3
/
bigquery.py
130 lines (104 loc) · 4.48 KB
/
bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json
from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter
from octue.cloud.events.validation import VALID_EVENT_KINDS
from octue.exceptions import ServiceNotFound
from octue.resources import Manifest
def get_events(table_id, sender, question_uuid, kind=None, include_backend_metadata=False, limit=1000):
"""Get Octue service events for a question from a sender from a Google BigQuery event store.
:param str table_id: the full ID of the table e.g. "your-project.your-dataset.your-table"
:param str sender: the SRUID of the sender of the events
:param str question_uuid: the UUID of the question to get the events for
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
:param bool include_backend_metadata: if `True`, include the service backend metadata
:param int limit: the maximum number of events to return
:raise ValueError: if the `kind` parameter is invalid
:raise octue.exceptions.ServiceNotFound: if the sender hasn't emitted any events related to the question UUID (or any events at all)
:return list(dict): the events for the question
"""
if kind:
if kind not in VALID_EVENT_KINDS:
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")
event_kind_condition = [f"AND kind={kind!r}"]
else:
event_kind_condition = []
client = Client()
fields = [
"`event`",
"`kind`",
"`datetime`",
"`uuid`",
"`originator`",
"`sender`",
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
]
if include_backend_metadata:
fields.extend(("`backend`", "`backend_metadata`"))
query = "\n".join(
[
f"SELECT {', '.join(fields)} FROM `{table_id}`",
"WHERE sender=@sender",
"AND question_uuid=@question_uuid",
*event_kind_condition,
"ORDER BY `order`",
"LIMIT @limit",
]
)
job_config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("sender", "STRING", sender),
ScalarQueryParameter("question_uuid", "STRING", question_uuid),
ScalarQueryParameter("limit", "INTEGER", limit),
]
)
query_job = client.query(query, job_config=job_config)
result = query_job.result()
if result.total_rows == 0:
raise ServiceNotFound(
f"No events found. The requested sender {sender!r} may not exist or it hasn't emitted any events for "
f"question {question_uuid!r} (or any events at all)."
)
df = result.to_dataframe()
# Convert JSON strings to python primitives.
df["event"] = df["event"].map(json.loads)
df["event"].apply(_deserialise_manifest_if_present)
df["other_attributes"] = df["other_attributes"].map(json.loads)
if "backend_metadata" in df:
df["backend_metadata"] = df["backend_metadata"].map(json.loads)
events = df.to_dict(orient="records")
return _unflatten_events(events)
def _deserialise_manifest_if_present(event):
"""If the event is a "question" or "result" event and a manifest is present, deserialise the manifest and replace
the serialised manifest with it.
:param dict event: an Octue service event
:return None:
"""
manifest_keys = {"input_manifest", "output_manifest"}
for key in manifest_keys:
if key in event:
event[key] = Manifest.deserialise(event[key])
# Only one of the manifest types will be in the event, so return if one is found.
return
def _unflatten_events(events):
"""Convert the events and attributes from the flat structure of the BigQuery table into the nested structure of the
service communication schema.
:param list(dict) events: flattened events
:return list(dict): unflattened events
"""
for event in events:
event["event"]["kind"] = event.pop("kind")
event["attributes"] = {
"datetime": event.pop("datetime").isoformat(),
"uuid": event.pop("uuid"),
"originator": event.pop("originator"),
"sender": event.pop("sender"),
"sender_type": event.pop("sender_type"),
"sender_sdk_version": event.pop("sender_sdk_version"),
"recipient": event.pop("recipient"),
"order": event.pop("order"),
**event.pop("other_attributes"),
}
return events