Skip to content

Commit

Permalink
MRG: Merge pull request #523 from octue/enhancement/remove-service-ar…
Browse files Browse the repository at this point in the history
…gument-from-topic

Remove `service` argument from `Topic` instantiation
  • Loading branch information
cortadocodes committed Sep 27, 2022
2 parents 1404cdb + 5d3dadf commit 476c930
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 239 deletions.
12 changes: 6 additions & 6 deletions octue/cloud/deployment/google/cloud_run/deployer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from octue.cloud.deployment.google.base_deployer import BaseDeployer, ProgressMessage
from octue.cloud.pub_sub.service import OCTUE_NAMESPACE, Service
from octue.cloud.pub_sub.service import OCTUE_NAMESPACE
from octue.cloud.pub_sub.subscription import Subscription
from octue.cloud.pub_sub.topic import Topic
from octue.exceptions import DeploymentError
from octue.resources.service_backends import GCPPubSubBackend


DEFAULT_CLOUD_RUN_DOCKERFILE_URL = (
Expand Down Expand Up @@ -188,11 +187,12 @@ def _create_eventarc_run_trigger(self, update=False):
5,
self.TOTAL_NUMBER_OF_STAGES,
) as progress_message:
service = Service(
backend=GCPPubSubBackend(project_name=self.service_configuration.project_name),
service_id=self.service_id,
topic = Topic(
name=self.service_id,
project_name=self.service_configuration.project_name,
namespace=OCTUE_NAMESPACE,
)
topic = Topic(name=self.service_id, namespace=OCTUE_NAMESPACE, service=service)

topic.create(allow_existing=True)

command = [
Expand Down
5 changes: 2 additions & 3 deletions octue/cloud/deployment/google/dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from octue import REPOSITORY_ROOT
from octue.cloud.deployment.google.answer_pub_sub_question import answer_question
from octue.cloud.pub_sub import Topic
from octue.cloud.pub_sub.service import OCTUE_NAMESPACE, Service
from octue.cloud.pub_sub.service import OCTUE_NAMESPACE
from octue.exceptions import DeploymentError
from octue.resources.service_backends import GCPPubSubBackend


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,8 +80,8 @@ def create_streaming_job(

service_topic = Topic(
name=service_id,
project_name=project_name,
namespace=OCTUE_NAMESPACE,
service=Service(backend=GCPPubSubBackend(project_name=project_name)),
)

service_topic.create(allow_existing=True)
Expand Down
3 changes: 3 additions & 0 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def cancel(self):
class MockPublisher:
"""A mock publisher that puts messages in a global dictionary instead of Google Pub/Sub."""

def __init__(self, *args, **kwargs):
pass

def publish(self, topic, data, retry=None, **attributes):
"""Put the data and attributes into a MockMessage and add it to the global messages dictionary before returning
a MockFuture.
Expand Down
1 change: 1 addition & 0 deletions octue/cloud/emulators/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def __init__(self):
patches=[
patch("octue.cloud.pub_sub.service.Topic", new=MockTopic),
patch("octue.cloud.pub_sub.service.Subscription", new=MockSubscription),
patch("octue.cloud.pub_sub.message_handler.SubscriberClient", new=MockSubscriber),
patch("google.cloud.pubsub_v1.SubscriberClient", new=MockSubscriber),
]
)
91 changes: 47 additions & 44 deletions octue/cloud/pub_sub/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pkg_resources
from google.api_core import retry
from google.cloud.pubsub_v1 import SubscriberClient

from octue.cloud import EXCEPTIONS_MAPPING
from octue.compatibility import warn_if_incompatible
Expand All @@ -30,8 +31,8 @@
class OrderedMessageHandler:
"""A handler for Google Pub/Sub messages that ensures messages are handled in the order they were sent.
:param google.pubsub_v1.services.subscriber.client.SubscriberClient subscriber: a Google Pub/Sub subscriber
:param octue.cloud.pub_sub.subscription.Subscription subscription: the subscription messages are pulled from
:param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the messages
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
:param str|None record_messages_to: if given a path to a JSON file, received messages are saved to it
:param str service_name: an arbitrary name to refer to the service subscribed to by (used for labelling its remote log messages)
Expand All @@ -41,20 +42,21 @@ class OrderedMessageHandler:

def __init__(
self,
subscriber,
subscription,
receiving_service,
handle_monitor_message=None,
record_messages_to=None,
service_name="REMOTE",
message_handlers=None,
):
self.subscriber = subscriber
self.subscription = subscription
self.receiving_service = receiving_service
self.handle_monitor_message = handle_monitor_message
self.record_messages_to = record_messages_to
self.service_name = service_name

self.received_delivery_acknowledgement = None
self._subscriber = SubscriberClient()
self._child_sdk_version = None
self._heartbeat_checker = None
self._last_heartbeat = None
Expand Down Expand Up @@ -108,55 +110,56 @@ def handle_messages(self, timeout=60, delivery_acknowledgement_timeout=120, maxi
kwargs={"maximum_heartbeat_interval": maximum_heartbeat_interval},
)

self._heartbeat_checker.daemon = True
self._heartbeat_checker.start()
try:
self._heartbeat_checker.daemon = True
self._heartbeat_checker.start()

while self._alive:
while self._alive:

if timeout is not None:
run_time = time.perf_counter() - self._start_time
if timeout is not None:
run_time = time.perf_counter() - self._start_time

if run_time > timeout:
raise TimeoutError(
f"No final answer received from topic {self.subscription.topic.path!r} after {timeout} seconds.",
)
if run_time > timeout:
raise TimeoutError(
f"No final answer received from topic {self.subscription.topic.path!r} after {timeout} seconds.",
)

pull_timeout = timeout - run_time
pull_timeout = timeout - run_time

message = self._pull_message(
timeout=pull_timeout,
delivery_acknowledgement_timeout=delivery_acknowledgement_timeout,
)
message = self._pull_message(
timeout=pull_timeout,
delivery_acknowledgement_timeout=delivery_acknowledgement_timeout,
)

self._waiting_messages[int(message["message_number"])] = message
self._waiting_messages[int(message["message_number"])] = message

try:
while self._waiting_messages:
message = self._waiting_messages.pop(self._previous_message_number + 1)
try:
while self._waiting_messages:
message = self._waiting_messages.pop(self._previous_message_number + 1)

if self.record_messages_to:
recorded_messages.append(message)
if self.record_messages_to:
recorded_messages.append(message)

result = self._handle_message(message)
result = self._handle_message(message)

if result is not None:
self._heartbeat_checker.cancel()
return result
if result is not None:
return result

except KeyError:
pass
except KeyError:
pass

finally:
self._heartbeat_checker.cancel()
finally:
self._heartbeat_checker.cancel()
self._subscriber.close()

if self.record_messages_to:
directory_name = os.path.dirname(self.record_messages_to)
if self.record_messages_to:
directory_name = os.path.dirname(self.record_messages_to)

if not os.path.exists(directory_name):
os.makedirs(directory_name)
if not os.path.exists(directory_name):
os.makedirs(directory_name)

with open(self.record_messages_to, "w") as f:
json.dump(recorded_messages, f)
with open(self.record_messages_to, "w") as f:
json.dump(recorded_messages, f)

raise TimeoutError(
f"No heartbeat has been received within the maximum allowed interval of {maximum_heartbeat_interval}s."
Expand Down Expand Up @@ -193,7 +196,7 @@ def _pull_message(self, timeout, delivery_acknowledgement_timeout):
while True:
logger.debug("Pulling messages from Google Pub/Sub: attempt %d.", attempt)

pull_response = self.subscriber.pull(
pull_response = self._subscriber.pull(
request={"subscription": self.subscription.path, "max_messages": 1},
retry=retry.Retry(),
)
Expand All @@ -220,11 +223,11 @@ def _pull_message(self, timeout, delivery_acknowledgement_timeout):
f"after {delivery_acknowledgement_timeout} seconds."
)

self.subscriber.acknowledge(request={"subscription": self.subscription.path, "ack_ids": [answer.ack_id]})
self._subscriber.acknowledge(request={"subscription": self.subscription.path, "ack_ids": [answer.ack_id]})

logger.debug(
"%r received a message related to question %r.",
self.subscription.topic.service,
self.receiving_service,
self.subscription.topic.path.split(".")[-1],
)

Expand Down Expand Up @@ -260,7 +263,7 @@ def _handle_message(self, message):
if isinstance(error, KeyError):
logger.warning(
"%r received a message of unknown type %r.",
self.subscription.topic.service,
self.receiving_service,
message.get("type", "unknown"),
)
return
Expand All @@ -275,7 +278,7 @@ def _handle_delivery_acknowledgement(self, message):
:return None:
"""
self.received_delivery_acknowledgement = True
logger.info("%r's question was delivered at %s.", self.subscription.topic.service, message["delivery_time"])
logger.info("%r's question was delivered at %s.", self.receiving_service, message["delivery_time"])

def _handle_heartbeat(self, message):
"""Record the time the heartbeat was received.
Expand All @@ -292,7 +295,7 @@ def _handle_monitor_message(self, message):
:param dict message:
:return None:
"""
logger.debug("%r received a monitor message.", self.subscription.topic.service)
logger.debug("%r received a monitor message.", self.receiving_service)

if self.handle_monitor_message is not None:
self.handle_monitor_message(json.loads(message["data"]))
Expand Down Expand Up @@ -359,7 +362,7 @@ def _handle_result(self, message):
"""
logger.info(
"%r received an answer to question %r.",
self.subscription.topic.service,
self.receiving_service,
self.subscription.topic.path.split(".")[-1],
)

Expand Down
33 changes: 7 additions & 26 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import google.api_core.exceptions
import pkg_resources
from google import auth
from google.api_core import retry
from google.cloud import pubsub_v1

Expand Down Expand Up @@ -70,7 +69,6 @@ def __init__(self, backend, service_id=None, run_function=None, *args, **kwargs)
self._record_sent_messages = False
self._sent_messages = []
self._publisher = None
self._credentials = None
super().__init__(*args, **kwargs)

def __repr__(self):
Expand All @@ -85,23 +83,10 @@ def publisher(self):
:return google.cloud.pubsub_v1.PublisherClient:
"""
if not self._publisher:
self._publisher = pubsub_v1.PublisherClient(credentials=self.credentials, batch_settings=BATCH_SETTINGS)
self._publisher = pubsub_v1.PublisherClient(batch_settings=BATCH_SETTINGS)

return self._publisher

@property
def credentials(self):
"""Get or instantiate the Google Cloud credentials for the service. No credentials are instantiated until this
property is called for the first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` environment
variable to be put off until it's needed.
:return google.auth.credentials.Credentials:
"""
if not self._credentials:
self._credentials = auth.default()[0]

return self._credentials

def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow_existing=False):
"""Start the service as a child, waiting to accept questions from any other Octue service using Google Pub/Sub
on the same Google Cloud project. Questions are accepted, processed, and answered asynchronously.
Expand All @@ -113,18 +98,18 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow
"""
logger.info("Starting %r.", self)

topic = Topic(name=self.id, namespace=OCTUE_NAMESPACE, service=self)
subscriber = pubsub_v1.SubscriberClient(credentials=self.credentials)
topic = Topic(name=self.id, project_name=self.backend.project_name, namespace=OCTUE_NAMESPACE)

subscription = Subscription(
name=self.id,
topic=topic,
namespace=OCTUE_NAMESPACE,
project_name=self.backend.project_name,
subscriber=subscriber,
expiration_time=None,
)

subscriber = pubsub_v1.SubscriberClient()

try:
topic.create(allow_existing=allow_existing)
subscription.create(allow_existing=allow_existing)
Expand Down Expand Up @@ -276,7 +261,7 @@ def ask(

unlinted_service_id = service_id
service_id = self._clean_service_id(service_id)
question_topic = Topic(name=service_id, namespace=OCTUE_NAMESPACE, service=self)
question_topic = Topic(name=service_id, project_name=self.backend.project_name, namespace=OCTUE_NAMESPACE)

if not question_topic.exists(timeout=timeout):
raise octue.exceptions.ServiceNotFound(f"Service with ID {unlinted_service_id!r} cannot be found.")
Expand All @@ -291,7 +276,6 @@ def ask(
topic=answer_topic,
namespace=OCTUE_NAMESPACE,
project_name=self.backend.project_name,
subscriber=pubsub_v1.SubscriberClient(credentials=self.credentials),
push_endpoint=push_endpoint,
)
answer_subscription.create(allow_existing=False)
Expand Down Expand Up @@ -342,11 +326,9 @@ def wait_for_answer(
f"Cannot pull from {subscription.path!r} subscription as it is a push subscription."
)

subscriber = pubsub_v1.SubscriberClient(credentials=self.credentials)

message_handler = OrderedMessageHandler(
subscriber=subscriber,
subscription=subscription,
receiving_service=self,
handle_monitor_message=handle_monitor_message,
service_name=service_name,
record_messages_to=record_messages_to,
Expand All @@ -360,7 +342,6 @@ def wait_for_answer(
)
finally:
subscription.delete()
subscriber.close()

def instantiate_answer_topic(self, question_uuid, service_id=None):
"""Instantiate the answer topic for the given question UUID and child service ID.
Expand All @@ -371,8 +352,8 @@ def instantiate_answer_topic(self, question_uuid, service_id=None):
"""
return Topic(
name=".".join((service_id or self.id, ANSWERS_NAMESPACE, question_uuid)),
project_name=self.backend.project_name,
namespace=OCTUE_NAMESPACE,
service=self,
)

def send_exception(self, topic, timeout=30):
Expand Down

0 comments on commit 476c930

Please sign in to comment.