Skip to content

Commit

Permalink
MRG: Merge pull request #133 from octue/release/0.1.13
Browse files Browse the repository at this point in the history
Release/0.1.13
  • Loading branch information
cortadocodes committed Apr 21, 2021
2 parents 70b4ec3 + 47ccf05 commit eb0817b
Show file tree
Hide file tree
Showing 58 changed files with 1,199 additions and 527 deletions.
2 changes: 1 addition & 1 deletion docs/source/deploying_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Automated deployment with Octue means:

All you need to enable automated deployments are the following files in your repository root:

* A ``requirements.txt`` file that includes ``octue>=0.1.12`` and the rest of your service's dependencies
* A ``requirements.txt`` file that includes ``octue>=0.1.13`` and the rest of your service's dependencies
* A ``twine.json`` file
* A ``deployment_configuration.json`` file (optional)

Expand Down
3 changes: 1 addition & 2 deletions octue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging

from .cli import octue_cli
from .logging_handlers import LOG_FORMAT
from .runner import Runner


__all__ = "LOG_FORMAT", "octue_cli", "Runner"
__all__ = "LOG_FORMAT", "Runner"
package_logger = logging.getLogger(__name__)
7 changes: 4 additions & 3 deletions octue/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import click
import pkg_resources

from octue.cloud.pub_sub.service import Service
from octue.definitions import CHILDREN_FILENAME, FOLDER_DEFAULTS, MANIFEST_FILENAME, VALUES_FILENAME
from octue.logging_handlers import get_remote_handler
from octue.resources.communication import Service, service_backends
from octue.resources import service_backends
from octue.runner import Runner
from twined import Twine

Expand Down Expand Up @@ -119,7 +120,7 @@ def run(app_dir, data_dir, config_dir, input_dir, output_dir, twine):
twine = Twine(source=twine)

(
configruation_values,
configuration_values,
configuration_manifest,
input_values,
input_manifest,
Expand All @@ -138,7 +139,7 @@ def run(app_dir, data_dir, config_dir, input_dir, output_dir, twine):
runner = Runner(
app_src=app_dir,
twine=twine,
configuration_values=configruation_values,
configuration_values=configuration_values,
configuration_manifest=configuration_manifest,
output_manifest_path=os.path.join(output_dir, MANIFEST_FILENAME),
children=children,
Expand Down
File renamed without changes.
61 changes: 61 additions & 0 deletions octue/cloud/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import logging
import os
import warnings
from google.oauth2 import service_account


logger = logging.getLogger(__name__)


class GCPCredentialsManager:
"""A credentials manager for Google Cloud Platform (GCP) that takes a path to a service account JSON file, or a
JSON string of the contents of such a service account file, from the given environment variable and instantiates
a Google Cloud credentials object.
:param str environment_variable_name:
:return None:
"""

def __init__(self, environment_variable_name="GOOGLE_APPLICATION_CREDENTIALS"):
self.environment_variable_name = environment_variable_name

if self.environment_variable_name is None:
self.service_account_json = None
return

try:
self.service_account_json = os.environ[self.environment_variable_name]
except KeyError:
warnings.warn(
f"No environment variable called {self.environment_variable_name!r}; resorting to default Google Cloud "
f"credentials."
)
self.service_account_json = None

def get_credentials(self):
"""Get the Google OAUTH2 service account credentials.
:return google.auth.service_account.Credentials:
"""
if self.service_account_json is None:
return None

# Check that the environment variable refers to a real file.
if os.path.exists(self.service_account_json):
return self._get_credentials_from_file()

# If it doesn't, assume that it's the credentials file as a JSON string.
return self._get_credentials_from_string()

def _get_credentials_from_file(self):
with open(self.service_account_json) as f:
credentials = json.load(f)

logger.debug("GCP credentials read from file.")
return service_account.Credentials.from_service_account_info(credentials)

def _get_credentials_from_string(self):
credentials = json.loads(self.service_account_json)
logger.debug("GCP credentials loaded from string.")
return service_account.Credentials.from_service_account_info(credentials)
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
import logging
import uuid
from concurrent.futures import TimeoutError
import google.api_core
import google.api_core.exceptions
from google.api_core import retry
from google.cloud import pubsub_v1

from octue import exceptions
from octue.cloud.credentials import GCPCredentialsManager
from octue.cloud.pub_sub import Subscription, Topic
from octue.exceptions import FileLocationError
from octue.mixins import CoolNameable
from octue.resources.communication.google_pub_sub import Subscription, Topic
from octue.resources.manifest import Manifest
from octue.utils.cloud.credentials import GCPCredentialsManager


logger = logging.getLogger(__name__)
Expand All @@ -24,6 +27,29 @@
BATCH_SETTINGS = pubsub_v1.types.BatchSettings(max_bytes=10 * 1000 * 1000, max_latency=0.01, max_messages=1)


def create_custom_retry(timeout):
"""Create a custom `Retry` object specifying that the given Google Cloud request should retry for the given amount
of time for the given exceptions.
:param float timeout:
:return google.api_core.retry.Retry:
"""
return retry.Retry(
maximum=timeout / 4,
deadline=timeout,
predicate=google.api_core.retry.if_exception_type(
google.api_core.exceptions.NotFound,
google.api_core.exceptions.Aborted,
google.api_core.exceptions.DeadlineExceeded,
google.api_core.exceptions.InternalServerError,
google.api_core.exceptions.ResourceExhausted,
google.api_core.exceptions.ServiceUnavailable,
google.api_core.exceptions.Unknown,
google.api_core.exceptions.Cancelled,
),
)


class Service(CoolNameable):
"""A Twined service that can be used in two modes:
* As a server accepting questions (input values and manifests), running them through its app, and responding to the
Expand All @@ -39,11 +65,7 @@ def __init__(self, backend, id=None, run_function=None):
self.backend = backend
self.run_function = run_function

if backend.credentials_environment_variable is None:
credentials = None
else:
credentials = GCPCredentialsManager(backend.credentials_environment_variable).get_credentials()

credentials = GCPCredentialsManager(backend.credentials_environment_variable).get_credentials()
self.publisher = pubsub_v1.PublisherClient(credentials=credentials, batch_settings=BATCH_SETTINGS)
self.subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
super().__init__()
Expand Down Expand Up @@ -81,7 +103,7 @@ def receive_question_then_answer(self, question):
question.ack()
self.answer(data, question_uuid)

def answer(self, data, question_uuid):
def answer(self, data, question_uuid, timeout=30):
"""Answer a question (i.e. run the Service's app to analyse the given data, and return the output values to the
asker). Answers are published to a topic whose name is generated from the UUID sent with the question, and are
in the format specified in the Service's Twine file.
Expand All @@ -101,6 +123,7 @@ def answer(self, data, question_uuid):
data=json.dumps(
{"output_values": analysis.output_values, "output_manifest": serialised_output_manifest}
).encode(),
retry=create_custom_retry(timeout),
)
logger.info("%r responded on topic %r.", self, topic.path)

Expand All @@ -110,6 +133,13 @@ def ask(self, service_id, input_values, input_manifest=None):
before sending the question to the serving Service - the topic is the expected publishing place for the answer
from the serving Service when it comes, and the subscription is set up to subscribe to this.
"""
if (input_manifest is not None) and (not input_manifest.all_datasets_are_in_cloud):
raise FileLocationError(
"All datasets of the input manifest and all files of the datasets must be uploaded to the cloud before "
"asking a service to perform an analysis upon them. The manifest must then be updated with the new "
"cloud locations."
)

question_topic = Topic(name=service_id, namespace=OCTUE_NAMESPACE, service=self)
if not question_topic.exists():
raise exceptions.ServiceNotFound(f"Service with ID {service_id!r} cannot be found.")
Expand Down Expand Up @@ -141,14 +171,20 @@ def ask(self, service_id, input_values, input_manifest=None):
logger.debug("%r asked question to %r service. Question UUID is %r.", self, service_id, question_uuid)
return response_subscription, question_uuid

def wait_for_answer(self, subscription, timeout=20):
def wait_for_answer(self, subscription, timeout=30):
"""Wait for an answer to a question on the given subscription, deleting the subscription and its topic once
the answer is received.
"""
answer = self.subscriber.pull(
pull_response = self.subscriber.pull(
request={"subscription": subscription.path, "max_messages": 1},
retry=retry.Retry(deadline=timeout),
).received_messages[0]
timeout=timeout,
retry=create_custom_retry(timeout),
)

try:
answer = pull_response.received_messages[0]
except IndexError:
raise TimeoutError("No answer received from topic %r", subscription.topic.path)

self.subscriber.acknowledge(request={"subscription": subscription.path, "ack_ids": [answer.ack_id]})
logger.debug("%r received a response to question on topic %r", self, subscription.topic.path)
Expand Down
File renamed without changes.
File renamed without changes.
5 changes: 5 additions & 0 deletions octue/cloud/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from octue.cloud.storage import path
from octue.cloud.storage.client import GoogleCloudStorageClient


__all__ = ["path", "GoogleCloudStorageClient"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google_crc32c import Checksum

from octue.utils.cloud.credentials import GCPCredentialsManager
from octue.cloud.credentials import GCPCredentialsManager


logger = logging.getLogger(__name__)
Expand All @@ -29,6 +29,23 @@ def __init__(self, project_name, credentials=OCTUE_MANAGED_CREDENTIALS):

self.client = storage.Client(project=project_name, credentials=credentials)

def create_bucket(self, name, location=None, allow_existing=False, timeout=_DEFAULT_TIMEOUT):
"""Create a new bucket. If the bucket already exists, and `allow_existing` is `True`, do nothing; if it is
`False`, raise an error.
:param str name:
:param str|None location: physical region of bucket; e.g. "europe-west6"; defaults to "US"
:param bool allow_existing:
:param float timeout:
:raise google.cloud.exceptions.Conflict:
:return None:
"""
if allow_existing:
if self.client.lookup_bucket(bucket_name=name, timeout=timeout) is not None:
return

self.client.create_bucket(bucket_or_name=name, location=location, timeout=timeout)

def upload_file(self, local_path, bucket_name, path_in_bucket, metadata=None, timeout=_DEFAULT_TIMEOUT):
"""Upload a local file to a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>.
Expand Down
File renamed without changes.
21 changes: 11 additions & 10 deletions octue/deployment/google/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:slim-buster
FROM python:3.9-slim-buster

# Allow statements and log messages to immediately appear in the Knative logs on Google Cloud.
ENV PYTHONUNBUFFERED True
Expand All @@ -9,20 +9,21 @@ WORKDIR $PROJECT_ROOT
RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm -rf /var/lib/apt/lists/*

# This will cache bust if any of the requirements change.
COPY requirements.txt ./

# Gunicorn is required to run the Flask server that connected the Docker container to Cloud Run.
RUN echo "gunicorn" >> requirements.txt

# Upgrade to latest pip and setuptools after the cache bust, then install requirements
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY requirements*.txt .
COPY setup.* .

COPY . .

# Install requirements (supports requirements.txt, requirements-dev.txt, and setup.py; all will be run if all are present.)
RUN if [ ! -f "requirements.txt" ] && [ ! -f "requirements-dev.txt" ] && [ ! -f "setup.py" ]; then exit 1; fi
RUN if [ -f "requirements.txt" ]; then pip install --upgrade pip && pip install -r requirements.txt; fi
RUN if [ -f "requirements-dev.txt" ]; then pip install --upgrade pip && pip install -r requirements-dev.txt; fi
RUN if [ -f "setup.py" ]; then pip install --upgrade pip && pip install -e .; fi

EXPOSE $PORT

ARG _TRIGGER_ID
ENV SERVICE_ID=$_TRIGGER_ID
ARG _SERVICE_ID
ENV SERVICE_ID=$_SERVICE_ID

ARG _GUNICORN_WORKERS=1
ENV _GUNICORN_WORKERS=$_GUNICORN_WORKERS
Expand Down
18 changes: 12 additions & 6 deletions octue/deployment/google/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import os
from flask import Flask, request

from octue.cloud.pub_sub.service import Service
from octue.logging_handlers import apply_log_handler
from octue.resources.communication.google_pub_sub.service import Service
from octue.resources.communication.service_backends import GCPPubSubBackend
from octue.resources.service_backends import GCPPubSubBackend
from octue.runner import Runner


Expand Down Expand Up @@ -55,22 +55,27 @@ def _log_bad_request_and_return_400_response(message):
return (f"Bad Request: {message}", 400)


def answer_question(project_name, data, question_uuid, deployment_configuration_path=None):
def answer_question(project_name, data, question_uuid):
"""Answer a question from a service by running the deployed app with the deployment configuration. Either the
`deployment_configuration_path` should be specified, or the `deployment_configuration`.
:param str project_name:
:param dict data:
:param str question_uuid:
:param str|None deployment_configuration_path:
:return None:
"""
deployment_configuration = {}
deployment_configuration_path = "deployment_configuration.json"

if deployment_configuration_path is not None:
try:
with open(deployment_configuration_path) as f:
deployment_configuration = json.load(f)

logger.info("Deployment configuration loaded from %r.", os.path.abspath(deployment_configuration_path))

except FileNotFoundError:
deployment_configuration = {}
logger.info("Default deployment configuration used.")

runner = Runner(
app_src=deployment_configuration.get("app_dir", "."),
twine=deployment_configuration.get("twine", "twine.json"),
Expand All @@ -82,6 +87,7 @@ def answer_question(project_name, data, question_uuid, deployment_configuration_
log_level=deployment_configuration.get("log_level", "INFO"),
handler=deployment_configuration.get("log_handler", None),
show_twined_logs=deployment_configuration.get("show_twined_logs", False),
project_name=project_name,
)

service = Service(
Expand Down
10 changes: 10 additions & 0 deletions octue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class FileNotFoundException(InvalidInputException, FileNotFoundError):
"""Raise when a required folder (e.g. <data_dir>/input) cannot be found"""


class FileLocationError(Exception):
"""Raise when a file exists in an unsupported location for a given operation."""


class ProtectedAttributeException(OctueSDKException, KeyError):
"""Raise when a user attempts to set an attribute whose value should be protected"""

Expand Down Expand Up @@ -68,3 +72,9 @@ class BackendNotFound(OctueSDKException):
"""Raise when details of a backend that doesn't exist in `octue.resources.service_backends` are given for use as a
Service backend.
"""


class AttributeConflict(OctueSDKException):
"""Raise if, when trying to set an attribute whose current value has a significantly higher confidence than the new
value, the new value conflicts with the current value.
"""
7 changes: 6 additions & 1 deletion octue/mixins/identifiable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class MyResource(Identifiable):
```
"""

def __init__(self, *args, id=None, **kwargs):
def __init__(self, *args, id=None, name=None, **kwargs):
"""Constructor for Identifiable class"""
self._name = name
super().__init__(*args, **kwargs)

# Store a boolean record of whether this object was created with a previously-existing uuid or was created new.
Expand Down Expand Up @@ -55,3 +56,7 @@ def __repr__(self):
@property
def id(self):
return self._id

@property
def name(self):
return self._name

0 comments on commit eb0817b

Please sign in to comment.