Skip to content

Commit

Permalink
MRG: Merge pull request #603 from octue/publish-answers-to-question-t…
Browse files Browse the repository at this point in the history
…opic-2

Publish answers to question topic
  • Loading branch information
cortadocodes committed Jan 9, 2024
2 parents 88c41ca + 1902194 commit c4b41f6
Show file tree
Hide file tree
Showing 61 changed files with 1,555 additions and 6,788 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/add-issues-to-octue-board.yml
Expand Up @@ -6,6 +6,10 @@ on:

jobs:
add-issues-to-octue-board:
uses: octue/.github/.github/workflows/reusable-add-issues-to-octue-board.yml@main
secrets:
github-token: ${{ secrets.PROJECT_AUTOMATION_GITHUB_TOKEN_2 }}
runs-on: ubuntu-latest
steps:
- name: Add to Board
uses: actions/add-to-project@v0.5.0
with:
project-url: https://github.com/orgs/octue/projects/22
github-token: ${{ secrets.OCTUE_PROJECT_ISSUES_TOKEN }}
3 changes: 3 additions & 0 deletions .github/workflows/python-ci.yml
Expand Up @@ -77,6 +77,9 @@ jobs:
if: "!contains(github.event.head_commit.message, 'skipci')"
runs-on: ubuntu-latest
needs: [check-semantic-version, run-tests]
permissions:
id-token: write
contents: read
steps:
- name: Checkout Repository
uses: actions/checkout@v3
Expand Down
21 changes: 0 additions & 21 deletions .github/workflows/release.yml
Expand Up @@ -101,24 +101,3 @@ jobs:

- name: Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@v1.8.10

docker:
runs-on: ubuntu-latest
needs: [run-tests, release]
timeout-minutes: 300
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Log in to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Build and push
uses: docker/build-push-action@v2.8.0
with:
context: .
push: true
tags: octue/octue-sdk-python:${{ needs.run-tests.outputs.package_version }}-slim,octue/octue-sdk-python:latest
16 changes: 16 additions & 0 deletions .readthedocs.yaml
@@ -0,0 +1,16 @@
# Read the Docs configuration file for Sphinx projects
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details

version: 2

build:
os: ubuntu-22.04
tools:
python: "3.10"

sphinx:
configuration: docs/source/conf.py

python:
install:
- requirements: docs/requirements.txt
2 changes: 1 addition & 1 deletion docs/source/asking_questions.rst
Expand Up @@ -48,7 +48,7 @@ You can also set the following options when you call :mod:`Child.ask <octue.reso
- ``allow_local_files`` - if true, local files/datasets are allowed in any input manifest you supply
- ``handle_monitor_message`` - if provided a function, it will be called on any monitor messages from the child
- ``record_messages_to`` – if given a path to a JSON file, messages received from the parent while it processes the question are saved to it
- ``allow_save_diagnostics_data_on_crash`` – if true, the input values and input manifest (including its datasets) will be saved by the child for future crash diagnostics if it fails while processing them
- ``save_diagnostics`` – must be one of {"SAVE_DIAGNOSTICS_OFF", "SAVE_DIAGNOSTICS_ON_CRASH", "SAVE_DIAGNOSTICS_ON"}; if turned on, allow the input values and manifest (and its datasets) to be saved by the child either all the time or just if the analysis fails
- ``question_uuid`` - if provided, the question will use this UUID instead of a generated one
- ``timeout`` - how long in seconds to wait for an answer (``None`` by default - i.e. don't time out)

Expand Down
271 changes: 97 additions & 174 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions docs/source/services.rst
Expand Up @@ -88,3 +88,12 @@ They look like ``namespace/name:tag`` where the tag is often a semantic version
name. It can be used to ask a question to a service without specifying a specific revision of it. This enables
asking questions to, for example, the service ``octue/my-service`` and automatically having them routed to its
default (usually latest) revision. :ref:`See here for more info<using_default_revision_tag>`.


Service communication standard
==============================

Octue services communicate according to the service communication standard. The JSON schema defining this can be found
`here <https://strands.octue.com/octue/service-communication>`_. Messages received by services are validated against it
and invalid messages are rejected. The schema is in beta, so (rare) breaking changes are reflected in the minor version
number.
6 changes: 3 additions & 3 deletions docs/source/troubleshooting_services.rst
Expand Up @@ -15,7 +15,7 @@ Services save the following data to the cloud if they crash while processing a q

.. important::

For this feature to be enabled, the child must have the ``crash_diagnostics_cloud_path`` field in its service
For this feature to be enabled, the child must have the ``diagnostics_cloud_path`` field in its service
configuration (:ref:`octue.yaml <octue_yaml>` file) set to a Google Cloud Storage path.


Expand Down Expand Up @@ -111,7 +111,7 @@ your service to fail.
Disabling crash diagnostics
===========================
When asking a question to a child, parents can disable crash diagnostics upload in the child on a question-by-question
basis by setting ``allow_save_diagnostics_data_on_crash`` to ``False`` in :mod:`Child.ask <octue.resources.child.Child.ask>`.
basis by setting ``save_diagnostics`` to ``"SAVE_DIAGNOSTICS_OFF"`` in :mod:`Child.ask <octue.resources.child.Child.ask>`.
For example:

.. code-block:: python
Expand All @@ -123,5 +123,5 @@ For example:
answer = child.ask(
input_values={"height": 32, "width": 3},
allow_save_diagnostics_data_on_crash=False,
save_diagnostics="SAVE_DIAGNOSTICS_OFF",
)
1 change: 1 addition & 0 deletions octue/__init__.py
Expand Up @@ -5,6 +5,7 @@


__all__ = ("Runner",)

REPOSITORY_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))


Expand Down
36 changes: 11 additions & 25 deletions octue/cli.py
Expand Up @@ -12,7 +12,7 @@

from octue.cloud import storage
from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.service import Service
from octue.cloud.pub_sub.service import PARENT_SENDER_TYPE, Service
from octue.cloud.service_id import convert_service_id_to_pub_sub_form, create_sruid, get_sruid_parts
from octue.cloud.storage import GoogleCloudStorageClient
from octue.configuration import load_service_and_app_configuration
Expand All @@ -22,7 +22,6 @@
from octue.resources import Manifest, service_backends
from octue.runner import Runner
from octue.utils.encoders import OctueJSONEncoder
from twined import Twine


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,16 +128,7 @@ def run(service_config, input_dir, output_file, output_manifest_file, monitor_me
if os.path.exists(input_manifest_path):
input_manifest = input_manifest_path

runner = Runner(
app_src=service_configuration.app_source_path,
twine=Twine(source=service_configuration.twine_path),
configuration_values=app_configuration.configuration_values,
configuration_manifest=app_configuration.configuration_manifest,
children=app_configuration.children,
output_location=app_configuration.output_location,
crash_diagnostics_cloud_path=service_configuration.crash_diagnostics_cloud_path,
service_registries=service_configuration.service_registries,
)
runner = Runner.from_configuration(service_configuration=service_configuration, app_configuration=app_configuration)

if monitor_messages_file:
if not os.path.exists(os.path.dirname(monitor_messages_file)):
Expand Down Expand Up @@ -229,14 +219,9 @@ def start(service_config, revision_tag, timeout, no_rm):
revision_tag=service_revision_tag_override or service_revision_tag,
)

runner = Runner(
app_src=service_configuration.app_source_path,
twine=Twine(source=service_configuration.twine_path),
configuration_values=app_configuration.configuration_values,
configuration_manifest=app_configuration.configuration_manifest,
children=app_configuration.children,
output_location=app_configuration.output_location,
crash_diagnostics_cloud_path=service_configuration.crash_diagnostics_cloud_path,
runner = Runner.from_configuration(
service_configuration=service_configuration,
app_configuration=app_configuration,
service_id=service_sruid,
)

Expand Down Expand Up @@ -296,12 +281,12 @@ def start(service_config, revision_tag, timeout, no_rm):
@click.option(
"--download-datasets",
is_flag=True,
help="If provided, download any datasets from the crash diagnostics and update their paths in the configuration and "
help="If provided, download any datasets from the diagnostics and update their paths in the configuration and "
"input manifests to the new local paths.",
)
def get_crash_diagnostics(cloud_path, local_path, download_datasets):
"""Download crash diagnostics for an analysis from the given directory in Google Cloud Storage. The cloud path
should end in the analysis ID.
def get_diagnostics(cloud_path, local_path, download_datasets):
"""Download diagnostics for a question from the given directory in Google Cloud Storage. The cloud path should end
in the question ID.
CLOUD_PATH: The path to the directory in Google Cloud Storage containing the diagnostics data.
"""
Expand Down Expand Up @@ -344,7 +329,7 @@ def get_crash_diagnostics(cloud_path, local_path, download_datasets):

manifest.to_file(manifest_path)

logger.info("Downloaded crash diagnostics from %r to %r.", cloud_path, local_path)
logger.info("Downloaded diagnostics from %r to %r.", cloud_path, local_path)


@octue_cli.group()
Expand Down Expand Up @@ -409,6 +394,7 @@ def create_push_subscription(
name=pub_sub_sruid,
topic=topic,
project_name=project_name,
filter=f'attributes.sender_type = "{PARENT_SENDER_TYPE}"',
expiration_time=expiration_time,
push_endpoint=push_endpoint,
)
Expand Down
29 changes: 12 additions & 17 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
@@ -1,7 +1,8 @@
import logging

from octue.cloud.pub_sub import Topic
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.cloud.service_id import convert_service_id_to_pub_sub_form, create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
from octue.resources.service_backends import GCPPubSubBackend
from octue.runner import Runner
Expand All @@ -17,7 +18,7 @@
def answer_question(question, project_name):
"""Answer a question sent to an app deployed in Google Cloud.
:param dict|tuple|apache_beam.io.gcp.pubsub.PubsubMessage question:
:param dict|tuple question:
:param str project_name:
:return None:
"""
Expand All @@ -31,30 +32,24 @@ def answer_question(question, project_name):
)

service = Service(service_id=service_sruid, backend=GCPPubSubBackend(project_name=project_name))

question_uuid = get_nested_attribute(question, "attributes.question_uuid")
answer_topic = service.instantiate_answer_topic(question_uuid)

try:
runner = Runner(
app_src=service_configuration.app_source_path,
twine=service_configuration.twine_path,
configuration_values=app_configuration.configuration_values,
configuration_manifest=app_configuration.configuration_manifest,
children=app_configuration.children,
output_location=app_configuration.output_location,
crash_diagnostics_cloud_path=service_configuration.crash_diagnostics_cloud_path,
runner = Runner.from_configuration(
service_configuration=service_configuration,
app_configuration=app_configuration,
project_name=project_name,
service_id=service_sruid,
service_registries=service_configuration.service_registries,
)

service.run_function = runner.run

service.answer(question, answer_topic=answer_topic)
service.answer(question)
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

# Forward any errors in the deployment configuration (errors in the analysis are already forwarded by the service).
except BaseException as error: # noqa
service.send_exception(topic=answer_topic)
service.send_exception(
topic=Topic(name=convert_service_id_to_pub_sub_form(service_sruid), project_name=project_name),
question_uuid=question_uuid,
)

logger.exception(error)

0 comments on commit c4b41f6

Please sign in to comment.