Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyjrobins committed Apr 17, 2018
2 parents 9c670a0 + 227c44c commit add4e77
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 66 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,12 @@
<!-- Use the tags Added, Changed, Deprecated, Removed, Fixed, Security, and
Contributor to describe changes -->

## [0.8.1]
###### 2018-04-17

### Fixed
* Bug where Workflows with unbounded Actions were unable to be executed

## [0.8.0]
###### 2018-04-16

Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
@@ -1,4 +1,4 @@
version: 0.8.0.{build}
version: 0.8.1.{build}

branches:
only:
Expand Down
28 changes: 14 additions & 14 deletions tests/test_action.py
Expand Up @@ -106,7 +106,7 @@ def test_init_with_triggers(self):
def test_execute_no_args(self):
action = Action(app_name='HelloWorld', action_name='helloWorld', name='helloWorld')
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
self.assertEqual(action.execute(instance.instance, {}), ActionResult({'message': 'HELLO WORLD'}, 'Success'))
self.assertEqual(action.execute({}, instance.instance), ActionResult({'message': 'HELLO WORLD'}, 'Success'))
self.assertEqual(action._output, ActionResult({'message': 'HELLO WORLD'}, 'Success'))

def test_execute_return_failure(self):
Expand All @@ -130,7 +130,7 @@ def callback_is_sent(sender, **kwargs):

WalkoffEvent.CommonWorkflowSignal.connect(callback_is_sent)

action.execute(instance.instance, {})
action.execute({}, instance.instance)
self.assertTrue(result['started_triggered'])
self.assertTrue(result['result_triggered'])

Expand All @@ -155,7 +155,7 @@ def callback_is_sent(sender, **kwargs):

WalkoffEvent.CommonWorkflowSignal.connect(callback_is_sent)

action.execute(instance.instance, {})
action.execute({}, instance.instance)

self.assertTrue(result['started_triggered'])
self.assertTrue(result['result_triggered'])
Expand All @@ -164,7 +164,7 @@ def test_execute_generates_id(self):
action = Action(app_name='HelloWorld', action_name='helloWorld', name='helloWorld')
original_execution_id = action.get_execution_id()
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
action.execute(instance.instance, {})
action.execute({}, instance.instance)
self.assertNotEqual(action.get_execution_id(), original_execution_id)

def test_execute_with_args(self):
Expand All @@ -173,7 +173,7 @@ def test_execute_with_args(self):
Argument('num2', value='4.3'),
Argument('num3', value='10.2')])
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
result = action.execute(instance.instance, {})
result = action.execute({}, instance.instance)
self.assertAlmostEqual(result.result, 8.9)
self.assertEqual(result.status, 'Success')
self.assertEqual(action._output, result)
Expand Down Expand Up @@ -202,7 +202,7 @@ def callback_is_sent(sender, **kwargs):

WalkoffEvent.CommonWorkflowSignal.connect(callback_is_sent)

action.execute(instance.instance, {})
action.execute({}, instance.instance)
self.assertTrue(result['started_triggered'])
self.assertTrue(result['result_triggered'])

Expand All @@ -213,7 +213,7 @@ def test_execute_with_accumulator_with_conversion(self):
Argument('num3', value='10.2')])
accumulator = {'1': '-5.6', 'action2': '4.3'}
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
result = action.execute(instance.instance, accumulator)
result = action.execute(accumulator, instance.instance)
self.assertAlmostEqual(result.result, 8.9)
self.assertEqual(result.status, 'Success')
self.assertEqual(action._output, result)
Expand All @@ -225,7 +225,7 @@ def test_execute_with_accumulator_with_extra_actions(self):
Argument('num3', value='10.2')])
accumulator = {'1': '-5.6', 'action2': '4.3', '3': '45'}
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
result = action.execute(instance.instance, accumulator)
result = action.execute(accumulator, instance.instance)
self.assertAlmostEqual(result.result, 8.9)
self.assertEqual(result.status, 'Success')
self.assertEqual(action._output, result)
Expand All @@ -237,7 +237,7 @@ def test_execute_with_accumulator_missing_action(self):
Argument('num3', value='10.2')])
accumulator = {'1': '-5.6', 'missing': '4.3', '3': '45'}
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
action.execute(instance.instance, accumulator)
action.execute(accumulator, instance.instance)

def test_execute_with_accumulator_missing_action_callbacks(self):
action = Action(app_name='HelloWorld', action_name='Add Three', name='helloWorld',
Expand All @@ -259,7 +259,7 @@ def callback_is_sent(sender, **kwargs):
result['result_triggered'] = True

WalkoffEvent.CommonWorkflowSignal.connect(callback_is_sent)
action.execute(instance.instance, accumulator)
action.execute(accumulator, instance.instance)

self.assertTrue(result['started_triggered'])
self.assertTrue(result['result_triggered'])
Expand All @@ -271,15 +271,15 @@ def test_execute_with_complex_args(self):
'd': [{'a': '', 'b': 3}, {'a': '', 'b': -1.5},
{'a': '', 'b': -0.5}]})])
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
result = action.execute(instance.instance, {})
result = action.execute({}, instance.instance)
self.assertAlmostEqual(result.result, 11.0)
self.assertEqual(result.status, 'Success')
self.assertEqual(action._output, result)

def test_execute_action_which_raises_exception(self):
action = Action(app_name='HelloWorld', action_name='Buggy', name='helloWorld')
instance = AppInstance.create(app_name='HelloWorld', device_name='device1')
action.execute(instance.instance, {})
action.execute({}, instance.instance)
self.assertIsNotNone(action.get_output())

def test_execute_action_which_raises_exception_sends_callbacks(self):
Expand All @@ -299,7 +299,7 @@ def callback_is_sent(sender, **kwargs):

WalkoffEvent.CommonWorkflowSignal.connect(callback_is_sent)

action.execute(instance.instance, {})
action.execute({}, instance.instance)

self.assertTrue(result['started_triggered'])
self.assertTrue(result['result_triggered'])
Expand All @@ -308,7 +308,7 @@ def test_execute_global_action(self):
action = Action(app_name='HelloWorld', action_name='global2', name='helloWorld',
arguments=[Argument('arg1', value='something')])
instance = AppInstance.create(app_name='HelloWorld', device_name='')
result = action.execute(instance.instance, {})
result = action.execute({}, instance.instance)
self.assertAlmostEqual(result.result, 'something')
self.assertEqual(result.status, 'Success')
self.assertEqual(action._output, result)
Expand Down
2 changes: 1 addition & 1 deletion walkoff.py
Expand Up @@ -87,7 +87,7 @@ def convert_host_port(args):
try:
run(app, *convert_host_port(args))
except KeyboardInterrupt:
logger.info('Caught KeyboardInterrupt!')
logger.info('Caught KeyboardInterrupt! Please wait a few seconds for WALKOFF to shutdown.')
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exc()
Expand Down
2 changes: 1 addition & 1 deletion walkoff/__init__.py
@@ -1 +1 @@
__version__ = '0.8.0'
__version__ = '0.8.1'
14 changes: 8 additions & 6 deletions walkoff/appgateway/appinstancerepo.py
Expand Up @@ -31,12 +31,14 @@ def setup_app_instance(self, action, workflow):
Returns:
(tuple(app_name, device_id)): A tuple containing the app name for the Action, and the device_id int
"""
device_id = (action.app_name, action.device_id.get_value(workflow.get_accumulator()))
if device_id not in self._instances:
self._instances[device_id] = AppInstance.create(action.app_name, action.device_id)
WalkoffEvent.CommonWorkflowSignal.send(workflow, event=WalkoffEvent.AppInstanceCreated)
logger.debug('Created new app instance: App {0}, device {1}'.format(action.app_name, action.device_id))
return device_id
if action.device_id:
device_id = (action.app_name, action.device_id.get_value(workflow.get_accumulator()))
if device_id not in self._instances:
self._instances[device_id] = AppInstance.create(action.app_name, action.device_id)
WalkoffEvent.CommonWorkflowSignal.send(workflow, event=WalkoffEvent.AppInstanceCreated)
logger.debug('Created new app instance: App {0}, device {1}'.format(action.app_name, action.device_id))
return device_id
return None

def get_app_instance(self, device_id):
"""Gets the AppInstance given a device ID
Expand Down
13 changes: 11 additions & 2 deletions walkoff/executiondb/action.py
Expand Up @@ -71,6 +71,7 @@ def __init__(self, app_name, action_name, name, device_id=None, id=None, argumen
self._output = None
self._execution_id = 'default'
self._action_executable = None
self._resolved_device_id = -1
self.validate()

@orm.reconstructor
Expand All @@ -81,6 +82,7 @@ def init_on_load(self):
self._action_executable = get_app_action(self.app_name, self._run)
self._output = None
self._execution_id = 'default'
self._resolved_device_id = -1

def validate(self):
"""Validates the object"""
Expand Down Expand Up @@ -116,12 +118,13 @@ def get_execution_id(self):
"""
return self._execution_id

def execute(self, instance, accumulator, arguments=None, resume=False):
def execute(self, accumulator, instance=None, arguments=None, resume=False):
"""Executes an Action by calling the associated app function.
Args:
instance (App): The instance of an App object to be used to execute the associated function.
accumulator (dict): Dict containing the results of the previous actions
instance (App, optional): The instance of an App object to be used to execute the associated function.
This field is required if the Action is a bounded action. Otherwise, it defaults to None.
arguments (list[Argument], optional): List of Arguments to be used if the Action is the starting step of
the Workflow. Defaults to None.
resume (bool, optional): Optional boolean to resume a previously paused workflow. Defaults to False.
Expand All @@ -131,6 +134,9 @@ def execute(self, instance, accumulator, arguments=None, resume=False):
"""
self._execution_id = str(uuid.uuid4())

if self.device_id:
self._resolved_device_id = self.device_id.get_value(accumulator)

WalkoffEvent.CommonWorkflowSignal.send(self, event=WalkoffEvent.ActionStarted)
if self.trigger and not resume:
WalkoffEvent.CommonWorkflowSignal.send(self, event=WalkoffEvent.TriggerActionAwaitingData)
Expand Down Expand Up @@ -192,6 +198,9 @@ def execute_trigger(self, data_in, accumulator):
logger.debug('Trigger is not valid for input {0}'.format(data_in))
return False

def get_resolved_device_id(self):
return self._resolved_device_id


@event.listens_for(Action, 'before_update')
def validate_before_update(mapper, connection, target):
Expand Down
15 changes: 8 additions & 7 deletions walkoff/executiondb/workflow.py
Expand Up @@ -138,7 +138,6 @@ def execute(self, execution_id, start=None, start_arguments=None, resume=False):

def __execute(self, start, start_arguments=None, resume=False):
actions = self.__actions(start=start)
first = True
for action in (action_ for action_ in actions if action_ is not None):
self._executing_action = action
logger.debug('Executing action {0} of workflow {1}'.format(action, self.name))
Expand All @@ -154,13 +153,15 @@ def __execute(self, start, start_arguments=None, resume=False):

device_id = self._instance_repo.setup_app_instance(action, self)

if first:
first = False
result = action.execute(instance=self._instance_repo.get_app_instance(device_id)(),
accumulator=self._accumulator, arguments=start_arguments, resume=resume)
if device_id:
result = action.execute(self._accumulator, instance=self._instance_repo.get_app_instance(device_id)(),
arguments=start_arguments, resume=resume)
else:
result = action.execute(instance=self._instance_repo.get_app_instance(device_id)(),
accumulator=self._accumulator, resume=resume)
result = action.execute(self._accumulator, arguments=start_arguments, resume=resume)

if start_arguments:
start_arguments = None

if result and result.status == "trigger":
yield
self._accumulator[action.id] = action.get_output().result
Expand Down
3 changes: 1 addition & 2 deletions walkoff/multiprocessedexecutor/proto_helpers.py
Expand Up @@ -127,8 +127,7 @@ def add_sender_to_action_packet_proto(action_packet, sender):
action_packet.sender.execution_id = sender.get_execution_id()
action_packet.sender.app_name = sender.app_name
action_packet.sender.action_name = sender.action_name
if sender.device_id is not None:
set_argument_proto(action_packet.sender.device_id, sender.device_id)
action_packet.sender.device_id = sender.get_resolved_device_id()


def add_arguments_to_action_proto(action_packet, sender):
Expand Down
2 changes: 1 addition & 1 deletion walkoff/multiprocessedexecutor/worker.py
Expand Up @@ -36,7 +36,7 @@
class WorkflowResultsHandler(object):
def __init__(self, socket_id, client_secret_key, client_public_key, server_public_key, zmq_results_address,
execution_db, case_logger):
"""Initialize a WorkflowResultsHandler object, which will be executing workflows.
"""Initialize a WorkflowResultsHandler object, which will be sending results of workflow execution
Args:
socket_id (str): The ID for the results socket
Expand Down

0 comments on commit add4e77

Please sign in to comment.