diff --git a/octue/cloud/deployment/google/answer_pub_sub_question.py b/octue/cloud/deployment/google/answer_pub_sub_question.py index 703a67f5c..0f22453f0 100644 --- a/octue/cloud/deployment/google/answer_pub_sub_question.py +++ b/octue/cloud/deployment/google/answer_pub_sub_question.py @@ -1,5 +1,6 @@ import logging +from octue.cloud.events.counter import EventCounter from octue.cloud.pub_sub.service import Service from octue.cloud.service_id import create_sruid, get_sruid_parts from octue.configuration import load_service_and_app_configuration @@ -32,6 +33,7 @@ def answer_question(question, project_name): service = Service(service_id=service_sruid, backend=GCPPubSubBackend(project_name=project_name)) question_uuid = get_nested_attribute(question, "attributes.question_uuid") + order = EventCounter() try: runner = Runner.from_configuration( @@ -42,9 +44,9 @@ def answer_question(question, project_name): ) service.run_function = runner.run - service.answer(question) + service.answer(question, order) logger.info("Analysis successfully run and response sent for question %r.", question_uuid) except BaseException as error: # noqa - service.send_exception(question_uuid=question_uuid, originator="UNKNOWN") + service.send_exception(question_uuid=question_uuid, originator="UNKNOWN", order=order) logger.exception(error) diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index 054d8c255..69768edf4 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -203,16 +203,19 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow return future, subscriber - def answer(self, question, heartbeat_interval=120, timeout=30): + def answer(self, question, order=None, heartbeat_interval=120, timeout=30): """Answer a question from a parent - i.e. run the child's app on the given data and return the output values. Answers conform to the output values and output manifest schemas specified in the child's Twine file. :param dict|google.cloud.pubsub_v1.subscriber.message.Message question: + :param octue.cloud.events.counter.EventCounter|None order: an event counter keeping track of the order of emitted events :param int|float heartbeat_interval: the time interval, in seconds, at which to send heartbeats :param float|None timeout: time in seconds to keep retrying sending of the answer once it has been calculated :raise Exception: if any exception arises during running analysis and sending its results :return None: """ + order = order or EventCounter() + try: ( question, @@ -226,7 +229,6 @@ def answer(self, question, heartbeat_interval=120, timeout=30): return heartbeater = None - order = EventCounter() try: self._send_delivery_acknowledgment(question_uuid, originator, order)