Skip to content

Commit

Permalink
FIX: Ensure order argument is given in Service.send_exception
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Apr 29, 2024
1 parent 8776b13 commit 6b2a28e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 4 additions & 2 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
@@ -1,5 +1,6 @@
import logging

from octue.cloud.events.counter import EventCounter
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
Expand Down Expand Up @@ -32,6 +33,7 @@ def answer_question(question, project_name):

service = Service(service_id=service_sruid, backend=GCPPubSubBackend(project_name=project_name))
question_uuid = get_nested_attribute(question, "attributes.question_uuid")
order = EventCounter()

try:
runner = Runner.from_configuration(
Expand All @@ -42,9 +44,9 @@ def answer_question(question, project_name):
)

service.run_function = runner.run
service.answer(question)
service.answer(question, order)
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

except BaseException as error: # noqa
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN")
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN", order=order)
logger.exception(error)
6 changes: 4 additions & 2 deletions octue/cloud/pub_sub/service.py
Expand Up @@ -203,16 +203,19 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow

return future, subscriber

def answer(self, question, heartbeat_interval=120, timeout=30):
def answer(self, question, order=None, heartbeat_interval=120, timeout=30):
"""Answer a question from a parent - i.e. run the child's app on the given data and return the output values.
Answers conform to the output values and output manifest schemas specified in the child's Twine file.
:param dict|google.cloud.pubsub_v1.subscriber.message.Message question:
:param octue.cloud.events.counter.EventCounter|None order: an event counter keeping track of the order of emitted events
:param int|float heartbeat_interval: the time interval, in seconds, at which to send heartbeats
:param float|None timeout: time in seconds to keep retrying sending of the answer once it has been calculated
:raise Exception: if any exception arises during running analysis and sending its results
:return None:
"""
order = order or EventCounter()

try:
(
question,
Expand All @@ -226,7 +229,6 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
return

heartbeater = None
order = EventCounter()

try:
self._send_delivery_acknowledgment(question_uuid, originator, order)
Expand Down

0 comments on commit 6b2a28e

Please sign in to comment.