Skip to content

Commit

Permalink
MRG: Merge pull request #647 from octue/improve-async-retrieved-events
Browse files Browse the repository at this point in the history
Improve async event retrieval workflow
  • Loading branch information
cortadocodes committed Apr 23, 2024
2 parents 6999b22 + 2f4e8b7 commit 146d5ba
Show file tree
Hide file tree
Showing 24 changed files with 376 additions and 251 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/python-ci.yml
Expand Up @@ -23,7 +23,7 @@ jobs:
if: "!contains(github.event.head_commit.message, 'skipci')"
runs-on: ${{ matrix.os }}
env:
USING_COVERAGE: "3.9"
USING_COVERAGE: "3.10"
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
Expand All @@ -34,9 +34,9 @@ jobs:
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: "3.10"

- name: Install Poetry
uses: snok/install-poetry@v1.3.2
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Expand Up @@ -15,7 +15,7 @@ jobs:
if: "github.event.pull_request.merged == true"
runs-on: ${{ matrix.os }}
env:
USING_COVERAGE: "3.9"
USING_COVERAGE: "3.10"
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
Expand All @@ -26,9 +26,9 @@ jobs:
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: "3.10"

- name: Install Poetry
uses: snok/install-poetry@v1.3.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/version-compatibility.yml
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Check version compatibility has been tested
Expand Down
16 changes: 14 additions & 2 deletions README.md
Expand Up @@ -4,6 +4,7 @@
[![Documentation Status](https://readthedocs.org/projects/octue-python-sdk/badge/?version=latest)](https://octue-python-sdk.readthedocs.io/en/latest/?badge=latest)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.10961975.svg)](https://doi.org/10.5281/zenodo.10961975)

# Octue Python SDK <img src="./docs/source/images/213_purple-fruit-snake-transparent.gif" alt="Purple Fruit Snake" width="100"/></span>

Expand All @@ -15,7 +16,9 @@ Read the docs [here.](https://octue-python-sdk.readthedocs.io/en/latest/)
Uses our [twined](https://twined.readthedocs.io/en/latest/) library for data validation.

## Installation and usage

To install, run one of:

```shell
pip install octue
```
Expand All @@ -25,6 +28,7 @@ poetry add octue
```

The command line interface (CLI) can then be accessed via:

```shell
octue --help
```
Expand Down Expand Up @@ -59,13 +63,15 @@ Commands:
```

## Deprecated code

When code is deprecated, it will still work but a deprecation warning will be issued with a suggestion on how to update
it. After an adjustment period, deprecations will be removed from the codebase according to the [code removal schedule](https://github.com/octue/octue-sdk-python/issues/415).
This constitutes a breaking change.

## Developer notes

### Installation

We use [Poetry](https://python-poetry.org/) as our package manager. For development, run the following from the
repository root, which will editably install the package:

Expand All @@ -76,18 +82,24 @@ poetry install --all-extras
Then run the tests to check everything's working.

### Testing

These environment variables need to be set to run the tests:
* `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
* `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`

- `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
- `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`

Then, from the repository root, run

```shell
python3 -m unittest
```

or

```shell
tox
```

## Contributing

Take a look at our [contributing](/docs/contributing.md) page.
7 changes: 3 additions & 4 deletions docs/source/asking_questions.rst
Expand Up @@ -35,7 +35,7 @@ Asking a question
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)
answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
input_manifest=manifest,
)
Expand Down Expand Up @@ -104,7 +104,6 @@ access the event store and run:
**Options**

- ``kind`` - Only retrieve this kind of event if present (e.g. "result")
- ``include_attributes`` - If ``True``, retrieve all the events' attributes as well
- ``include_backend_metadata`` - If ``True``, retrieve information about the service backend that produced the event
- ``limit`` - If set to a positive integer, limit the number of events returned to this

Expand Down Expand Up @@ -232,7 +231,7 @@ this:

.. code-block:: python
answer = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
answer, question_uuid = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
if your app configuration file is:

Expand Down Expand Up @@ -323,7 +322,7 @@ then you can override them like this:

.. code-block:: python
answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
children=[
{
Expand Down
4 changes: 2 additions & 2 deletions docs/source/manifest.rst
Expand Up @@ -49,7 +49,7 @@ Get an Octue service to analyse data for you as part of a larger analysis.
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)
answer = child.ask(input_manifest=manifest)
answer, question_uuid = child.ask(input_manifest=manifest)
See :doc:`here <asking_questions>` for more information.

Expand Down Expand Up @@ -108,7 +108,7 @@ the cloud and then download them again for each service (as would happen with cl
}
)
analysis.children["wind_speed"].ask(
answer, question_uuid = analysis.children["wind_speed"].ask(
input_values=analysis.input_values,
input_manifest=analysis.input_manifest,
allow_local_files=True,
Expand Down
8 changes: 4 additions & 4 deletions docs/source/testing_services.rst
Expand Up @@ -87,7 +87,7 @@ Instantiating a child emulator in python
def handle_monitor_message(message):
...
result = child_emulator.ask(
result, question_uuid = child_emulator.ask(
input_values={"hello": "world"},
handle_monitor_message=handle_monitor_message,
)
Expand Down Expand Up @@ -133,7 +133,7 @@ You can then instantiate a child emulator from this in python:
def handle_monitor_message(message):
...
result = child_emulator.ask(
result, question_uuid = child_emulator.ask(
input_values={"hello": "world"},
handle_monitor_message=handle_monitor_message,
)
Expand Down Expand Up @@ -226,7 +226,7 @@ child.
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)
result = child.ask(input_values=[1, 2, 3, 4])
result, question_uuid = child.ask(input_values=[1, 2, 3, 4])
child.received_events
>>> [
Expand Down Expand Up @@ -260,6 +260,6 @@ You can then feed these into a child emulator to emulate one possible response o
child_emulator = ChildEmulator(events=child.received_events)
child_emulator.ask(input_values=[1, 2, 3, 4])
>>> {"some": "results"}
>>> {"some": "results"}, "9cab579f-c486-4324-ac9b-96491d26266b"
You can also create test fixtures from :ref:`downloaded service crash diagnostics <test_fixtures_from_crash_diagnostics>`.
2 changes: 1 addition & 1 deletion docs/source/troubleshooting_services.rst
Expand Up @@ -121,7 +121,7 @@ For example:
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
)
answer = child.ask(
answer, question_uuid = child.ask(
input_values={"height": 32, "width": 3},
save_diagnostics="SAVE_DIAGNOSTICS_OFF",
)
@@ -1,4 +1,4 @@
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.11-slim
FROM windpioneers/gdal-python:modest-heron-gdal-2.4.1-python-3.11-slim

# Ensure print statements and log messages appear promptly in Cloud Logging.
ENV PYTHONUNBUFFERED True
Expand Down
8 changes: 5 additions & 3 deletions octue/cloud/emulators/child.py
Expand Up @@ -125,12 +125,12 @@ def ask(
:param bool asynchronous: if `True`, don't create an answer subscription
:param float timeout: time in seconds to wait for an answer before raising a timeout error
:raise TimeoutError: if the timeout is exceeded while waiting for an answer
:return dict: a dictionary containing the keys "output_values" and "output_manifest"
:return dict, str: a dictionary containing the keys "output_values" and "output_manifest", and the question UUID
"""
with ServicePatcher():
self._child.serve(allow_existing=True)

subscription, _ = self._parent.ask(
subscription, question_uuid = self._parent.ask(
service_id=self._child.id,
input_values=input_values,
input_manifest=input_manifest,
Expand All @@ -141,13 +141,15 @@ def ask(
asynchronous=asynchronous,
)

return self._parent.wait_for_answer(
answer = self._parent.wait_for_answer(
subscription,
handle_monitor_message=handle_monitor_message,
record_events=record_events,
timeout=timeout,
)

return answer, question_uuid

def _emulate_analysis(
self,
analysis_id,
Expand Down
107 changes: 74 additions & 33 deletions octue/cloud/pub_sub/bigquery.py
Expand Up @@ -3,53 +3,46 @@
from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter

from octue.cloud.events.validation import VALID_EVENT_KINDS
from octue.exceptions import ServiceNotFound
from octue.resources import Manifest


def get_events(
table_id,
sender,
question_uuid,
kind=None,
include_attributes=False,
include_backend_metadata=False,
limit=1000,
):
def get_events(table_id, sender, question_uuid, kind=None, include_backend_metadata=False, limit=1000):
"""Get Octue service events for a question from a sender from a Google BigQuery event store.
:param str table_id: the full ID of the table e.g. "your-project.your-dataset.your-table"
:param str sender: the SRUID of the sender of the events
:param str question_uuid: the UUID of the question to get the events for
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
:param bool include_attributes: if `True`, include events' attributes (excluding question UUID)
:param bool include_backend_metadata: if `True`, include the service backend metadata
:param int limit: the maximum number of events to return
:raise ValueError: if the `kind` parameter is invalid
:raise octue.exceptions.ServiceNotFound: if the sender hasn't emitted any events related to the question UUID (or any events at all)
:return list(dict): the events for the question
"""
if kind:
if kind not in VALID_EVENT_KINDS:
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")

event_kind_condition = [f'AND JSON_EXTRACT_SCALAR(event, "$.kind") = "{kind}"']
event_kind_condition = [f"AND kind={kind!r}"]
else:
event_kind_condition = []

client = Client()
fields = ["`event`"]

if include_attributes:
fields.extend(
(
"`datetime`",
"`uuid`",
"`originator`",
"`sender`",
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
)
)

fields = [
"`event`",
"`kind`",
"`datetime`",
"`uuid`",
"`originator`",
"`sender`",
"`sender_type`",
"`sender_sdk_version`",
"`recipient`",
"`order`",
"`other_attributes`",
]

if include_backend_metadata:
fields.extend(("`backend`", "`backend_metadata`"))
Expand All @@ -74,16 +67,64 @@ def get_events(
)

query_job = client.query(query, job_config=job_config)
rows = query_job.result()
df = rows.to_dataframe()
result = query_job.result()

if result.total_rows == 0:
raise ServiceNotFound(
f"No events found. The requested sender {sender!r} may not exist or it hasn't emitted any events for "
f"question {question_uuid!r} (or any events at all)."
)

df = result.to_dataframe()

# Convert JSON strings to python primitives.
df["event"] = df["event"].map(json.loads)

if "other_attributes" in df:
df["other_attributes"] = df["other_attributes"].map(json.loads)
df["event"].apply(_deserialise_manifest_if_present)
df["other_attributes"] = df["other_attributes"].map(json.loads)

if "backend_metadata" in df:
df["backend_metadata"] = df["backend_metadata"].map(json.loads)

return df.to_dict(orient="records")
events = df.to_dict(orient="records")
return _unflatten_events(events)


def _deserialise_manifest_if_present(event):
"""If the event is a "question" or "result" event and a manifest is present, deserialise the manifest and replace
the serialised manifest with it.
:param dict event: an Octue service event
:return None:
"""
manifest_keys = {"input_manifest", "output_manifest"}

for key in manifest_keys:
if key in event:
event[key] = Manifest.deserialise(event[key])
# Only one of the manifest types will be in the event, so return if one is found.
return


def _unflatten_events(events):
"""Convert the events and attributes from the flat structure of the BigQuery table into the nested structure of the
service communication schema.
:param list(dict) events: flattened events
:return list(dict): unflattened events
"""
for event in events:
event["event"]["kind"] = event.pop("kind")

event["attributes"] = {
"datetime": event.pop("datetime").isoformat(),
"uuid": event.pop("uuid"),
"originator": event.pop("originator"),
"sender": event.pop("sender"),
"sender_type": event.pop("sender_type"),
"sender_sdk_version": event.pop("sender_sdk_version"),
"recipient": event.pop("recipient"),
"order": event.pop("order"),
**event.pop("other_attributes"),
}

return events

0 comments on commit 146d5ba

Please sign in to comment.