diff --git a/.codecov.yml b/.codecov.yml index 25741db10..6b2c53024 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -6,4 +6,5 @@ coverage: threshold: 1% paths: - "src" + base: "pr" patch: off diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 27646a622..dfa0e9513 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -36,21 +36,28 @@ jobs: pip install hypothesis pip install coverage pip install codecov + pip install coveralls pip install pytest pip install pytest-xdist pip install pytest-timeout - pip install timeout_decorator + pip install timeout-decorator - name: Test with pytest env: RMQ_HOSTNAME: localhost RMQ_PORT: ${{ job.services.rabbitmq.ports[5672] }} # get randomly assigned published port RMQ_USERNAME: guest RMQ_PASSWORD: guest + LOC: /opt/hostedtoolcache/Python/3.6.12/x64/lib/python3.6/site-packages run: | - LOC=/opt/hostedtoolcache/Python/3.6.12/x64/lib/python3.6/site-packages - coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration + coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration - name: Codecov - uses: codecov/codecov-action@v1.0.15 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + run: | + coverage combine; \ + coverage xml; \ + coverage report; \ + curl -s https://codecov.io/bash | bash flake8: runs-on: ubuntu-latest diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index a53913f8a..8057c675c 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: '3.x' + python-version: '3.6' - name: Install dependencies run: | python -m pip install --upgrade pip @@ -29,5 +29,5 @@ jobs: TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} run: | - python setup.py sdist bdist_wheel + python setup.py sdist twine upload dist/* diff --git a/VERSION b/VERSION index 41336a1c0..dc1e644a1 100755 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.5.12 +1.6.0 diff --git a/examples/misc/lfs_tagging_dd.py b/examples/misc/lfs_tagging_dd.py index c2fab7dd7..183115632 100755 --- a/examples/misc/lfs_tagging_dd.py +++ b/examples/misc/lfs_tagging_dd.py @@ -1,102 +1,105 @@ #!/usr/bin/env python from radical.entk import Pipeline, Stage, Task, AppManager -from radical.entk.exceptions import * import os -import sys -import argparse -hostname = os.environ.get('RMQ_HOSTNAME','localhost') -port = int(os.environ.get('RMQ_PORT',5672)) - - -def get_pipeline(shared_fs=False, size=1): - - p = Pipeline() - p.name = 'p' - - n = 4 - - s1 = Stage() - s1.name = 's1' - for x in range(n): - t = Task() - t.name = 't%s'%x - - # dd if=/dev/random bs= count= of= - - t.executable = 'dd' - - if not shared_fs: - t.arguments = ['if=/dev/urandom','bs=%sM'%size, 'count=1', 'of=$NODE_LFS_PATH/s1_t%s.txt'%x] - else: - t.arguments = ['if=/dev/urandom','bs=%sM'%size, 'count=1', 'of=/home/vivek91/s1_t%s.txt'%x] - - t.cpu_reqs['processes'] = 1 - t.cpu_reqs['threads_per_process'] = 24 - t.cpu_reqs['thread_type'] = '' - t.cpu_reqs['process_type'] = '' - t.lfs_per_process = 1024 - - s1.add_tasks(t) - - p.add_stages(s1) - - s2 = Stage() - s2.name = 's2' +if os.environ.get('RADICAL_ENTK_VERBOSE') is None: + os.environ['RADICAL_ENTK_REPORT'] = 'True' + +# No need to change/set any variables if you installed RabbitMQ has a system +# process. If you are running RabbitMQ in a Docker container or on a dedicated +# virtual machine, set the variables "RMQ_HOSTNAME" and "RMQ_PORT" in the shell +# environment in which you are running this script. +hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') +port = int(os.environ.get('RMQ_PORT', '5672')) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') + + +# Each task of this example prints the hostname of the node on which it is +# executed. Tagged tasks should print the same hostname. +def get_pipeline(n=2): + ''' + We create a pipeline with three stages, each with 1 task. The tasks of the + second and third stages are tagged to execute on the same compute node on + which the first stage's task executed. + ''' + + pipelines = list() for x in range(n): - t = Task() - t.executable = 'dd' - - if not shared_fs: - t.arguments = ['if=$NODE_LFS_PATH/s1_t%s.txt'%x,'bs=%sM'%size, 'count=1', 'of=$NODE_LFS_PATH/s2_t%s.txt'%x] - else: - t.arguments = ['if=/home/vivek91/s1_t%s.txt'%x,'bs=%sM'%size, 'count=1', 'of=/home/vivek91/s2_t%s.txt'%x] - - t.cpu_reqs['processes'] = 1 - t.cpu_reqs['threads_per_process'] = 24 - t.cpu_reqs['thread_type'] = '' - t.cpu_reqs['process_type'] = '' - t.tag = 't%s'%x - - s2.add_tasks(t) - - - p.add_stages(s2) - - return p - + pipeline = Pipeline() + pipeline.name = 'p.%04d' % x + + stage1 = Stage() + stage1.name = 'stage1' + # task1 of stage1 will execute on the first available and suitable node. + task1 = Task() + task1.name = 'task1.%04d' % x + task1.executable = 'hostname' + # Set enough threads for task1 to get a whole compute node + task1.cpu_reqs = {'cpu_processes': 1, + 'cpu_threads': 1, + 'cpu_process_type': None, + 'cpu_thread_type': None} + task1.lfs_per_process = 10 + stage1.add_tasks(task1) + + pipeline.add_stages(stage1) + + stage2 = Stage() + stage2.name = 'stage2' + + task2 = Task() + task2.name = 'task2.%04d' % x + task2.executable = 'hostname' + task2.cpu_reqs = {'cpu_processes': 1, + 'cpu_threads': 1, + 'cpu_process_type': None, + 'cpu_thread_type': None} + # We use the ID of task1 as the tag of task2. In this way, task2 will + # execute on the same node on which task1 executed. + task2.tags = {'colocate': task1.uid} + task2.lfs_per_process = 10 + stage2.add_tasks(task2) + + + pipeline.add_stages(stage2) + + stage3 = Stage() + stage3.name = 'stage3' + + task3 = Task() + task3.name = 'task3.%04d' % x + task3.executable = 'hostname' + task3.cpu_reqs = {'cpu_processes': 1, + 'cpu_threads': 1, + 'cpu_process_type': None, + 'cpu_thread_type': None} + task3.lfs_per_process = 10 + # We use the ID of task1 as the tag of task3. In this way, task3 will + # execute on the same node on which task1 and task2 executed. + task3.tag = {'colocate': task1.uid} + stage3.add_tasks(task3) + + pipeline.add_stages(stage3) + pipelines.append(pipeline) + + return pipelines if __name__ == '__main__': - args = argparse.ArgumentParser() - args.add_argument('sharedfs') - args.add_argument('size') - - args = args.parse_args() - if args.sharedfs == 'shared': - shared_fs = True - else: - shared_fs = False - size = args.size - - print('SharedFS: ', shared_fs, size) - - os.environ['RADICAL_PILOT_DBURL'] = 'mongodb://entk:entk123@ds159631.mlab.com:59631/da-lfs-test' - + # Request at least two compute nodes res_dict = { - 'resource' : 'xsede.comet', - 'walltime' : 30, - 'cpus' : 120, - 'project' : 'unc100' - # 'project' : 'gk4', - # 'queue' : 'high' + 'resource' : 'local.localhost', + 'walltime' : 20, + 'cpus' : 2, } - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, password=password) appman.resource_desc = res_dict - p = get_pipeline(shared_fs=shared_fs, size=size) - appman.workflow = [p] + # Select n to be >= to the number of available compute nodes. + p = get_pipeline(n=2) + appman.workflow = set(p) appman.run() diff --git a/src/radical/entk/__init__.py b/src/radical/entk/__init__.py index ade36b191..8925f85d9 100644 --- a/src/radical/entk/__init__.py +++ b/src/radical/entk/__init__.py @@ -1,4 +1,4 @@ - +# pylint: disable=unused-argument # ------------------------------------------------------------------------------ # from .pipeline import Pipeline @@ -17,10 +17,12 @@ import requests as req from packaging.version import parse as parse_version + def custom_formatwarning(msg, *args, **kwargs): # ignore everything except the message return str(msg) + '\n' + warnings.formatwarning = custom_formatwarning version_short, version_detail, version_base, version_branch, \ diff --git a/src/radical/entk/execman/mock/task_manager.py b/src/radical/entk/execman/mock/task_manager.py index a94d8266d..4f7b65214 100644 --- a/src/radical/entk/execman/mock/task_manager.py +++ b/src/radical/entk/execman/mock/task_manager.py @@ -244,13 +244,15 @@ def _process_tasks(self, task_queue, rmgr, rmq_conn_params): bulk_tasks.append(task) self._advance(task, 'Task', states.SUBMITTING, - mq_channel, '%s-tmgr-to-sync' % self._sid) + mq_channel, rmq_conn_params, + '%s-tmgr-to-sync' % self._sid) # this mock RTS immmedialtely completes all tasks for task in bulk_tasks: self._advance(task, 'Task', states.COMPLETED, - mq_channel, '%s-cb-to-sync' % self._sid) + mq_channel, rmq_conn_params, + '%s-cb-to-sync' % self._sid) task_as_dict = json.dumps(task.to_dict()) mq_channel.basic_publish( diff --git a/src/radical/entk/execman/rp/resource_manager.py b/src/radical/entk/execman/rp/resource_manager.py index 1a3fbac07..203f90ec0 100644 --- a/src/radical/entk/execman/rp/resource_manager.py +++ b/src/radical/entk/execman/rp/resource_manager.py @@ -162,8 +162,8 @@ def _pilot_state_cb(pilot, state): 'job_name' : self._job_name } - # Create Compute Pilot with validated resource description - pdesc = rp.ComputePilotDescription(pd_init) + # Create Pilot with validated resource description + pdesc = rp.PilotDescription(pd_init) self._prof.prof('rreq created', uid=self._uid) # Launch the pilot diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 73d66b072..0a1e09f25 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -4,7 +4,6 @@ __license__ = "MIT" -import os import json import pika import queue @@ -17,7 +16,7 @@ from ...exceptions import EnTKError from ... import states, Task from ..base import Base_TaskManager -from .task_processor import create_cud_from_task, create_task_from_cu +from .task_processor import create_td_from_task, create_task_from_rp # ------------------------------------------------------------------------------ @@ -57,8 +56,6 @@ def __init__(self, sid, pending_queue, completed_queue, rmgr, rts='radical.pilot') self._rts_runner = None - self._rmq_ping_interval = int(os.getenv('RMQ_PING_INTERVAL', '10')) - self._log.info('Created task manager object: %s', self._uid) self._prof.prof('tmgr_create', uid=self._uid) @@ -83,7 +80,7 @@ def _tmgr(self, uid, rmgr, pending_queue, completed_queue, The new thread is responsible for pushing completed tasks (returned by the RTS) to the dequeueing queue. It also - converts Tasks into CUDs and CUs into (partially described) + converts Tasks into TDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed_queue. At all state @@ -121,9 +118,9 @@ def heartbeat_response(mq_channel, conn_params): nprops = pika.BasicProperties( correlation_id=props.correlation_id) channel.basic_publish(exchange='', - routing_key=self._hb_response_q, - properties=nprops, - body='response') + routing_key=self._hb_response_q, + properties=nprops, + body='response') except (pika.exceptions.ConnectionClosed, pika.exceptions.ChannelClosed): connection = pika.BlockingConnection(conn_params) @@ -131,9 +128,9 @@ def heartbeat_response(mq_channel, conn_params): nprops = pika.BasicProperties( correlation_id=props.correlation_id) channel.basic_publish(exchange='', - routing_key=self._hb_response_q, - properties=nprops, - body='response') + routing_key=self._hb_response_q, + properties=nprops, + body='response') self._log.info('Sent heartbeat response') @@ -227,56 +224,57 @@ def _process_tasks(self, task_queue, rmgr, rmq_conn_params): ''' placeholders = dict() + placeholder_lock = mt.Lock() # ---------------------------------------------------------------------- - def load_placeholder(task, rts_uid): - - parent_pipeline = str(task.parent_pipeline['name']) - parent_stage = str(task.parent_stage['name']) + def load_placeholder(task): + with placeholder_lock: + parent_pipeline = str(task.parent_pipeline['uid']) + parent_stage = str(task.parent_stage['uid']) - if parent_pipeline not in placeholders: - placeholders[parent_pipeline] = dict() + if parent_pipeline not in placeholders: + placeholders[parent_pipeline] = dict() - if parent_stage not in placeholders[parent_pipeline]: - placeholders[parent_pipeline][parent_stage] = dict() + if parent_stage not in placeholders[parent_pipeline]: + placeholders[parent_pipeline][parent_stage] = dict() - if None not in [parent_pipeline, parent_stage, task.name]: - placeholders[parent_pipeline][parent_stage][task.name] = \ - {'path' : task.path, - 'rts_uid': rts_uid} + if None not in [parent_pipeline, parent_stage, task.uid]: + placeholders[parent_pipeline][parent_stage][task.uid] = \ + {'path': task.path, + 'uid': task.uid} # ---------------------------------------------------------------------- - def unit_state_cb(unit, state, cb_data): + def task_state_cb(rp_task, state, cb_data): try: + channel = cb_data['channel'] conn_params = cb_data['params'] - self._log.debug('Unit %s in state %s' % (unit.uid, unit.state)) + self._log.debug('Task %s in state %s' % (rp_task.uid, + rp_task.state)) - if unit.state in rp.FINAL: + if rp_task.state in rp.FINAL: - task = None - task = create_task_from_cu(unit, self._prof) + task = create_task_from_rp(rp_task, self._prof) self._advance(task, 'Task', states.COMPLETED, channel, conn_params, '%s-cb-to-sync' % self._sid) - load_placeholder(task, unit.uid) + load_placeholder(task) task_as_dict = json.dumps(task.to_dict()) try: channel.basic_publish(exchange='', - routing_key='%s-completedq-1' % self._sid, - body=task_as_dict) + routing_key='%s-completedq-1' % self._sid, + body=task_as_dict) except (pika.exceptions.ConnectionClosed, pika.exceptions.ChannelClosed): connection = pika.BlockingConnection(conn_params) channel = connection.channel() channel.basic_publish(exchange='', - routing_key='%s-completedq-1' % self._sid, - body=task_as_dict) - + routing_key='%s-completedq-1' % self._sid, + body=task_as_dict) self._log.info('Pushed task %s with state %s to completed ' 'queue %s-completedq-1', @@ -285,7 +283,7 @@ def unit_state_cb(unit, state, cb_data): except KeyboardInterrupt as ex: self._log.exception('Execution interrupted (probably by Ctrl+C)' ' exit callback thread gracefully...') - raise KeyboardInterrupt from ex + raise KeyboardInterrupt(ex) from ex except Exception as ex: self._log.exception('Error in RP callback thread: %s', ex) @@ -296,10 +294,11 @@ def unit_state_cb(unit, state, cb_data): mq_connection = pika.BlockingConnection(rmq_conn_params) mq_channel = mq_connection.channel() - umgr = rp.UnitManager(session=rmgr._session) - umgr.add_pilots(rmgr.pilot) - umgr.register_callback(unit_state_cb, cb_data={'channel': mq_channel, - 'params': rmq_conn_params}) + rp_tmgr = rp.TaskManager(session=rmgr._session) + rp_tmgr.add_pilots(rmgr.pilot) + rp_tmgr.register_callback(task_state_cb, + cb_data={'channel': mq_channel, + 'params' : rmq_conn_params}) try: @@ -319,22 +318,22 @@ def unit_state_cb(unit, state, cb_data): task_queue.task_done() - bulk_tasks = list() - bulk_cuds = list() + bulk_tds = list() for msg in body: task = Task() task.from_dict(msg) - bulk_tasks.append(task) - bulk_cuds.append(create_cud_from_task( + + load_placeholder(task) + bulk_tds.append(create_td_from_task( task, placeholders, self._prof)) self._advance(task, 'Task', states.SUBMITTING, mq_channel, rmq_conn_params, '%s-tmgr-to-sync' % self._sid) - umgr.submit_units(bulk_cuds) + rp_tmgr.submit_tasks(bulk_tds) mq_connection.close() self._log.debug('Exited RTS main loop. TMGR terminating') except KeyboardInterrupt as ex: @@ -346,7 +345,7 @@ def unit_state_cb(unit, state, cb_data): raise EnTKError(ex) from ex finally: - umgr.close() + rp_tmgr.close() # -------------------------------------------------------------------------- @@ -362,7 +361,6 @@ def start_manager(self): self._log.warn('tmgr process already running!') return - try: self._prof.prof('creating tmgr process', uid=self._uid) diff --git a/src/radical/entk/execman/rp/task_processor.py b/src/radical/entk/execman/rp/task_processor.py index b701e5735..2c89cc5c7 100644 --- a/src/radical/entk/execman/rp/task_processor.py +++ b/src/radical/entk/execman/rp/task_processor.py @@ -1,5 +1,6 @@ import os + import radical.pilot as rp import radical.utils as ru @@ -101,6 +102,8 @@ def resolve_placeholders(path, placeholders): raise +# ------------------------------------------------------------------------------ +# def resolve_arguments(args, placeholders): resolved_args = list() @@ -153,14 +156,22 @@ def resolve_arguments(args, placeholders): # ------------------------------------------------------------------------------ # -def resolve_tags(tag, parent_pipeline_name, placeholders): +def resolve_tags(task, parent_pipeline_name, placeholders): + + # entk only handles co_location tags. If tags are given as strings, they + # get translated into `{'colocation': ''}`. Tags passed as dictionaies + # are checked to conform with above form. + # + # In both cases, the tag string is expanded with the given placeholders. + + tags = task.tags if task.tags else {'colocate': task.uid} + + colo_tag = tags['colocate'] # Check self pipeline first for sname in placeholders[parent_pipeline_name]: - for tname in placeholders[parent_pipeline_name][sname]: - if tag != tname: - continue - return placeholders[parent_pipeline_name][sname][tname]['rts_uid'] + if colo_tag in placeholders[parent_pipeline_name][sname]: + return placeholders[parent_pipeline_name][sname][colo_tag]['uid'] for pname in placeholders: @@ -169,13 +180,10 @@ def resolve_tags(tag, parent_pipeline_name, placeholders): continue for sname in placeholders[pname]: - for tname in placeholders[pname][sname]: - if tag != tname: - continue - return placeholders[pname][sname][tname]['rts_uid'] + if colo_tag in placeholders[pname][sname]: + return placeholders[pname][sname][colo_tag]['uid'] - raise ree.EnTKError(msg='Tag %s cannot be used as no previous task with ' - 'that name is found' % tag) + return task.uid # ------------------------------------------------------------------------------ @@ -413,126 +421,120 @@ def get_output_list_from_task(task, placeholders): # ------------------------------------------------------------------------------ # -def create_cud_from_task(task, placeholders, prof=None): +def create_td_from_task(task, placeholders, prof=None): """ - Purpose: Create a Compute Unit description based on the defined Task. + Purpose: Create an RP Task description based on the defined Task. :arguments: :task: EnTK Task object :placeholders: dictionary holding the values for placeholders - :return: ComputeUnitDescription + :return: rp.TaskDescription """ try: - logger.debug('Creating CU from Task %s' % (task.uid)) + logger.debug('Creating Task from Task %s: %s' % (task.uid, task.sandbox)) if prof: - prof.prof('cud_create', uid=task.uid) - - cud = rp.ComputeUnitDescription() - cud.name = '%s,%s,%s,%s,%s,%s' % (task.uid, task.name, - task.parent_stage['uid'], - task.parent_stage['name'], - task.parent_pipeline['uid'], - task.parent_pipeline['name']) - cud.pre_exec = task.pre_exec - cud.executable = task.executable - cud.arguments = resolve_arguments(task.arguments, placeholders) - cud.sandbox = task.sandbox - cud.post_exec = task.post_exec - - if task.tag: - if task.parent_pipeline['name']: - cud.tag = resolve_tags( - tag=task.tag, - parent_pipeline_name=task.parent_pipeline['name'], - placeholders=placeholders) - - cud.cpu_processes = task.cpu_reqs['cpu_processes'] - cud.cpu_threads = task.cpu_reqs['cpu_threads'] - cud.cpu_process_type = task.cpu_reqs['cpu_process_type'] - cud.cpu_thread_type = task.cpu_reqs['cpu_thread_type'] - cud.gpu_processes = task.gpu_reqs['gpu_processes'] - cud.gpu_threads = task.gpu_reqs['gpu_threads'] - cud.gpu_process_type = task.gpu_reqs['gpu_process_type'] - cud.gpu_thread_type = task.gpu_reqs['gpu_thread_type'] + prof.prof('td_create', uid=task.uid) + + td = rp.TaskDescription() + td.uid = task.uid + td.name = '%s,%s,%s,%s,%s,%s' % (task.uid, task.name, + task.parent_stage['uid'], + task.parent_stage['name'], + task.parent_pipeline['uid'], + task.parent_pipeline['name']) + td.pre_exec = task.pre_exec + td.executable = task.executable + td.arguments = resolve_arguments(task.arguments, placeholders) + td.sandbox = task.sandbox + td.post_exec = task.post_exec + + if task.parent_pipeline['uid']: + td.tag = resolve_tags(task=task, parent_pipeline_name=task.parent_pipeline['uid'], + placeholders=placeholders) + + td.cpu_processes = task.cpu_reqs['cpu_processes'] + td.cpu_threads = task.cpu_reqs['cpu_threads'] + td.cpu_process_type = task.cpu_reqs['cpu_process_type'] + td.cpu_thread_type = task.cpu_reqs['cpu_thread_type'] + td.gpu_processes = task.gpu_reqs['gpu_processes'] + td.gpu_threads = task.gpu_reqs['gpu_threads'] + td.gpu_process_type = task.gpu_reqs['gpu_process_type'] + td.gpu_thread_type = task.gpu_reqs['gpu_thread_type'] if task.lfs_per_process: - cud.lfs_per_process = task.lfs_per_process + td.lfs_per_process = task.lfs_per_process - if task.stdout: cud.stdout = task.stdout - if task.stderr: cud.stderr = task.stderr + if task.stdout: td.stdout = task.stdout + if task.stderr: td.stderr = task.stderr - cud.input_staging = get_input_list_from_task(task, placeholders) - cud.output_staging = get_output_list_from_task(task, placeholders) + td.input_staging = get_input_list_from_task(task, placeholders) + td.output_staging = get_output_list_from_task(task, placeholders) if prof: - prof.prof('cud from task - done', uid=task.uid) + prof.prof('td from task - done', uid=task.uid) - logger.debug('CU %s created from Task %s' % (cud.name, task.uid)) - - return cud + logger.debug('Task %s created from Task %s' % (td.name, task.uid)) + return td except Exception: - logger.exception('CU creation failed') + logger.exception('Task creation failed') raise # ------------------------------------------------------------------------------ # -def create_task_from_cu(cu, prof=None): +def create_task_from_rp(rp_task, prof=None): """ - Purpose: Create a Task based on the Compute Unit. + Purpose: Create a Task based on the RP Task. Details: Currently, only the uid, parent_stage and parent_pipeline are - retrieved. The exact initial Task (that was converted to a CUD) + retrieved. The exact initial Task (that was converted to a TD) cannot be recovered as the RP API does not provide the same - attributes for a CU as for a CUD. Also, this is not required for + attributes for a Task as for a TD. Also, this is not required for the most part. - TODO: Add exit code, stdout, stderr and path attributes to a Task. - These can be extracted from a CU - :arguments: - :cu: RP Compute Unit + :task: RP Task :return: Task """ try: - logger.debug('Create Task from CU %s' % cu.name) + logger.debug('Create Task from Task %s' % rp_task.name) if prof: - prof.prof('task_create', uid=cu.name.split(',')[0].strip()) + prof.prof('task_create', uid=rp_task.name.split(',')[0].strip()) task = Task() - task.uid = cu.name.split(',')[0].strip() - task.name = cu.name.split(',')[1].strip() - task.parent_stage['uid'] = cu.name.split(',')[2].strip() - task.parent_stage['name'] = cu.name.split(',')[3].strip() - task.parent_pipeline['uid'] = cu.name.split(',')[4].strip() - task.parent_pipeline['name'] = cu.name.split(',')[5].strip() - task.rts_uid = cu.uid + task.uid = rp_task.name.split(',')[0].strip() + task.name = rp_task.name.split(',')[1].strip() + task.parent_stage['uid'] = rp_task.name.split(',')[2].strip() + task.parent_stage['name'] = rp_task.name.split(',')[3].strip() + task.parent_pipeline['uid'] = rp_task.name.split(',')[4].strip() + task.parent_pipeline['name'] = rp_task.name.split(',')[5].strip() + task.rts_uid = rp_task.uid - if cu.state == rp.DONE : task.exit_code = 0 - elif cu.state in [rp.FAILED, rp.CANCELED] : task.exit_code = 1 + if rp_task.state == rp.DONE : task.exit_code = 0 + elif rp_task.state in [rp.FAILED, rp.CANCELED] : task.exit_code = 1 - task.path = ru.Url(cu.sandbox).path + task.path = ru.Url(rp_task.sandbox).path if prof: - prof.prof('task_created', uid=cu.name.split(',')[0].strip()) + prof.prof('task_created', uid=task.uid) - logger.debug('Task %s created from CU %s' % (task.uid, cu.name)) + logger.debug('Task %s created from Task %s' % (task.uid, rp_task.name)) return task except Exception: - logger.exception('Task creation from CU failed, error') + logger.exception('Task creation from RP Task failed, error') raise diff --git a/src/radical/entk/pipeline.py b/src/radical/entk/pipeline.py index 2f5193882..accba18ec 100644 --- a/src/radical/entk/pipeline.py +++ b/src/radical/entk/pipeline.py @@ -3,7 +3,6 @@ __license__ = 'MIT' import threading -import warnings import radical.utils as ru @@ -166,11 +165,10 @@ def name(self, value): actual_type=type(value)) if any(symbol in value for symbol in invalid_symbols): - warnings.warn(NAME_MESSAGE, DeprecationWarning) - # raise ree.ValueError(obj=self._uid, - # attribute='name', - # actual_value=value, - # expected_value=NAME_MESSAGE) + raise ree.ValueError(obj=self._uid, + attribute='name', + actual_value=value, + expected_value=NAME_MESSAGE) self._name = value @@ -266,9 +264,7 @@ def from_dict(self, d): raise ree.ValueError(obj=self._uid, attribute='name', actual_value=d['name'], - expected_value="Valid object names can " + - "contains letters, numbers and '.'. Any " - "other character is not allowed") + expected_value=NAME_MESSAGE) self._name = d['name'] diff --git a/src/radical/entk/stage.py b/src/radical/entk/stage.py index 918bd85c1..3e4dda831 100644 --- a/src/radical/entk/stage.py +++ b/src/radical/entk/stage.py @@ -4,7 +4,6 @@ import radical.utils as ru -import warnings from string import punctuation from . import exceptions as ree @@ -175,11 +174,10 @@ def name(self, value): actual_type=type(value)) if any(symbol in value for symbol in invalid_symbols): - warnings.warn(NAME_MESSAGE, DeprecationWarning) - # raise ree.ValueError(obj=self._uid, - # attribute='name', - # actual_value=value, - # expected_value=NAME_MESSAGE) + raise ree.ValueError(obj=self._uid, + attribute='name', + actual_value=value, + expected_value=NAME_MESSAGE) self._name = value @tasks.setter @@ -285,9 +283,7 @@ def from_dict(self, d): raise ree.ValueError(obj=self._uid, attribute='name', actual_value=d['name'], - expected_value="Valid object names can " + - "contains letters, numbers and '.'. Any " - "other character is not allowed") + expected_value=NAME_MESSAGE) self._name = d['name'] if 'state' in d: diff --git a/src/radical/entk/task.py b/src/radical/entk/task.py index 0c9d6c06c..e397a26af 100644 --- a/src/radical/entk/task.py +++ b/src/radical/entk/task.py @@ -1,7 +1,8 @@ - __copyright__ = 'Copyright 2014-2020, http://radical.rutgers.edu' __license__ = 'MIT' +import warnings + import radical.utils as ru from string import punctuation @@ -10,9 +11,9 @@ from . import exceptions as ree from . import states as res -import warnings -warnings.simplefilter(action="once", category=DeprecationWarning, lineno=707) -warnings.simplefilter(action="once", category=DeprecationWarning, lineno=764) +warnings.simplefilter(action="once", category=DeprecationWarning, lineno=728) +warnings.simplefilter(action="once", category=DeprecationWarning, lineno=786) +warnings.simplefilter(action="once", category=DeprecationWarning, lineno=954) # ------------------------------------------------------------------------------ @@ -30,6 +31,9 @@ class Task(object): the profiling if not taken care. ''' + + _uids = list() + # FIXME: this should be converted into an RU/RS Attribute object, almost all # of the code is redundant with the attribute class... @@ -77,7 +81,7 @@ def __init__(self, from_dict=None): # to cuds and cus to tasks self._path = None self._exit_code = None - self._tag = None + self._tags = None # Keep track of res attained self._state_history = [res.INITIAL] @@ -525,12 +529,21 @@ def path(self): @property def tag(self): ''' - Set the tag for the task that can be used while scheduling by the RTS + WARNING: It will be deprecated. + ''' + + return self._tags + - :getter: return the tag of the current task + @property + def tags(self): + ''' + Set the tags for the task that can be used while scheduling by the RTS + + :getter: return the tags of the current task ''' - return self._tag + return self._tags @property @@ -579,11 +592,16 @@ def rts_uid(self): # @uid.setter def uid(self, value): - + invalid_symbols = punctuation.replace('.','') if not isinstance(value, str): raise ree.TypeError(expected_type=str, actual_type=type(value)) + if any(symbol in value for symbol in invalid_symbols): + raise ree.ValueError(obj=self._uid, + attribute='uid', + actual_value=value, + expected_value=NAME_MESSAGE) self._uid = value @rts_uid.setter @@ -603,11 +621,10 @@ def name(self, value): actual_type=type(value)) if any(symbol in value for symbol in invalid_symbols): - warnings.warn(NAME_MESSAGE, DeprecationWarning, stacklevel=2) - # raise ree.ValueError(obj=self._uid, - # attribute='name', - # actual_value=value, - # expected_value=NAME_MESSAGE) + raise ree.ValueError(obj=self._uid, + attribute='name', + actual_value=value, + expected_value=NAME_MESSAGE) self._name = value @@ -934,11 +951,32 @@ def path(self, value): @tag.setter def tag(self, value): + warnings.warn("Attribute tag is depcrecated. Use tags instead", DeprecationWarning) + + # this method exists for backward compatibility if not isinstance(value, str): raise ree.TypeError(entity='tag', expected_type=str, actual_type=type(value)) + self._tags = {'colocate': value} + + + @tags.setter + def tags(self, value): - self._tag = value + if not isinstance(value, dict): + raise ree.TypeError(entity='tags', expected_type=dict, + actual_type=type(value)) + + if list(value.keys()) != ['colocate']: + raise ree.TypeError(expected_type=dict, + actual_type=type(value.get('colocate')), + entity='colocate') + + if not isinstance(value['colocate'], str): + raise ree.TypeError(entity='tag', expected_type=str, + actual_type=type(value)) + + self._tags = value @parent_stage.setter @@ -997,7 +1035,7 @@ def to_dict(self): 'exit_code' : self._exit_code, 'path' : self._path, - 'tag' : self._tag, + 'tags' : self._tags, 'rts_uid' : self._rts_uid, 'parent_stage' : self._p_stage, @@ -1017,13 +1055,28 @@ def from_dict(self, d): :return: None ''' + invalid_symbols = punctuation.replace('.','') # FIXME: uid, name, state and state_history to use setter type checks - if d.get('uid') is not None: self._uid = d['uid'] - if d.get('name') is not None: self._name = d['name'] + if d.get('uid') is not None: + if any(symbol in d['uid'] for symbol in invalid_symbols): + raise ree.ValueError(obj=self._uid, + attribute='uid', + actual_value=d['uid'], + expected_value=NAME_MESSAGE) + else: + self._uid = d['uid'] + + if d.get('name') is not None: + if any(symbol in d['name'] for symbol in invalid_symbols): + raise ree.ValueError(obj=self._uid, + attribute='name', + actual_value=d['name'], + expected_value=NAME_MESSAGE) + else: + self._name = d['name'] if 'state' not in d: self._state = res.INITIAL - else: # avoid adding state to state history, thus do typecheck here if not isinstance(d['state'], str): @@ -1047,6 +1100,11 @@ def _validate(self): executable has been specified for the task. ''' + if self._uid in Task._uids: + raise ree.EnTKError(msg='Task ID %s already exists' % self._uid) + else: + Task._uids.append(self._uid) + if self._state is not res.INITIAL: raise ree.ValueError(obj=self._uid, attribute='state', expected_value=res.INITIAL, diff --git a/tests/test_component/test_amgr.py b/tests/test_component/test_amgr.py index ccd5c4970..270c94450 100755 --- a/tests/test_component/test_amgr.py +++ b/tests/test_component/test_amgr.py @@ -184,4 +184,4 @@ def test_run_workflow(self, mocked_init, mocked_ResourceManager, appman._sync_thread.is_alive = mock.MagicMock(return_value=True) with self.assertRaises(ree.EnTKError): - appman._run_workflow() \ No newline at end of file + appman._run_workflow() diff --git a/tests/test_component/test_task.py b/tests/test_component/test_task.py index efc3f5e93..11e327107 100755 --- a/tests/test_component/test_task.py +++ b/tests/test_component/test_task.py @@ -28,7 +28,7 @@ class TestTask(TestCase): @mock.patch('radical.utils.generate_id', return_value='test.0000') def test_task_initialization(self, mocked_generate_id): ''' - **Purpose**: Test if the task attributes have, thus expect, the correct + **Purpose**: Test if the task attributes have, thus expect, the correct data types ''' @@ -53,25 +53,25 @@ def test_task_initialization(self, mocked_generate_id): self.assertEqual(t._gpu_reqs['threads_per_process'], 0) self.assertIsNone(t._gpu_reqs['thread_type']) - self.assertEqual(t.lfs_per_process, 0) - self.assertEqual(t.sandbox, '') - self.assertIsInstance(t.upload_input_data, list) - self.assertIsInstance(t.copy_input_data, list) - self.assertIsInstance(t.link_input_data, list) - self.assertIsInstance(t.move_input_data, list) - self.assertIsInstance(t.copy_output_data, list) - self.assertIsInstance(t.link_output_data, list) - self.assertIsInstance(t.move_output_data, list) - self.assertIsInstance(t.download_output_data, list) - self.assertEqual(t.stdout, '') - self.assertEqual(t.stderr, '') - self.assertIsNone(t.exit_code) - self.assertIsNone(t.tag) - self.assertIsNone(t.path) - self.assertIsNone(t.parent_pipeline['uid']) - self.assertIsNone(t.parent_pipeline['name']) - self.assertIsNone(t.parent_stage['name']) - self.assertIsNone(t.parent_stage['uid']) + self.assertEqual(t._lfs_per_process, 0) + self.assertEqual(t._sandbox, '') + self.assertIsInstance(t._upload_input_data, list) + self.assertIsInstance(t._copy_input_data, list) + self.assertIsInstance(t._link_input_data, list) + self.assertIsInstance(t._move_input_data, list) + self.assertIsInstance(t._copy_output_data, list) + self.assertIsInstance(t._link_output_data, list) + self.assertIsInstance(t._move_output_data, list) + self.assertIsInstance(t._download_output_data, list) + self.assertEqual(t._stdout, '') + self.assertEqual(t._stderr, '') + self.assertIsNone(t._exit_code) + self.assertIsNone(t._tags) + self.assertIsNone(t._path) + self.assertIsNone(t._p_pipeline['uid']) + self.assertIsNone(t._p_pipeline['name']) + self.assertIsNone(t._p_stage['name']) + self.assertIsNone(t._p_stage['uid']) # -------------------------------------------------------------------------- @@ -84,35 +84,35 @@ def test_cpu_reqs(self, mocked_generate_id, mocked_init): 'process_type' : None, 'threads_per_process' : 1, 'thread_type' : None} - cpu_reqs = {'processes' : 2, - 'process_type' : None, - 'threads_per_process' : 1, + cpu_reqs = {'processes' : 2, + 'process_type' : None, + 'threads_per_process' : 1, 'thread_type' : 'OpenMP'} - task.cpu_reqs = {'processes' : 2, - 'process_type' : None, - 'threads_per_process' : 1, - 'thread_type' : 'OpenMP'} + task.cpu_reqs = {'cpu_processes' : 2, + 'cpu_process_type' : None, + 'cpu_threads' : 1, + 'cpu_thread_type' : 'OpenMP'} self.assertEqual(task._cpu_reqs, cpu_reqs) - self.assertEqual(task.cpu_reqs, {'cpu_processes' : 2, - 'cpu_process_type' : None, - 'cpu_threads' : 1, + self.assertEqual(task.cpu_reqs, {'cpu_processes' : 2, + 'cpu_process_type' : None, + 'cpu_threads' : 1, 'cpu_thread_type' : 'OpenMP'}) with self.assertRaises(ree.MissingError): - task.cpu_reqs = {'cpu_processes' : 2, - 'cpu_process_type' : None, + task.cpu_reqs = {'cpu_processes' : 2, + 'cpu_process_type' : None, 'cpu_thread_type' : 'OpenMP'} with self.assertRaises(ree.TypeError): - task.cpu_reqs = {'cpu_processes' : 'a', - 'cpu_process_type' : None, + task.cpu_reqs = {'cpu_processes' : 'a', + 'cpu_process_type' : None, 'cpu_threads' : 1, 'cpu_thread_type' : 'OpenMP'} with self.assertRaises(ree.TypeError): - task.cpu_reqs = {'cpu_processes' : 1, - 'cpu_process_type' : None, + task.cpu_reqs = {'cpu_processes' : 1, + 'cpu_process_type' : None, 'cpu_threads' : 'a', 'cpu_thread_type' : 'OpenMP'} @@ -120,14 +120,14 @@ def test_cpu_reqs(self, mocked_generate_id, mocked_init): task.cpu_reqs = list() with self.assertRaises(ree.ValueError): - task.cpu_reqs = {'cpu_processes' : 1, - 'cpu_process_type' : None, + task.cpu_reqs = {'cpu_processes' : 1, + 'cpu_process_type' : None, 'cpu_threads' : 1, 'cpu_thread_type' : 'MPI'} with self.assertRaises(ree.ValueError): - task.cpu_reqs = {'cpu_processes' : 1, - 'cpu_process_type' : 'test', + task.cpu_reqs = {'cpu_processes' : 1, + 'cpu_process_type' : 'test', 'cpu_threads' : 1, 'cpu_thread_type' : 'OpenMP'} @@ -142,50 +142,50 @@ def test_gpu_reqs(self, mocked_generate_id, mocked_init): 'process_type' : None, 'threads_per_process' : 1, 'thread_type' : None} - gpu_reqs = {'processes' : 2, - 'process_type' : None, - 'threads_per_process' : 1, + gpu_reqs = {'processes' : 2, + 'process_type' : None, + 'threads_per_process' : 1, 'thread_type' : 'OpenMP'} - task.gpu_reqs = {'processes' : 2, - 'process_type' : None, - 'threads_per_process' : 1, - 'thread_type' : 'OpenMP'} + task.gpu_reqs = {'gpu_processes' : 2, + 'gpu_process_type' : None, + 'gpu_threads' : 1, + 'gpu_thread_type' : 'OpenMP'} self.assertEqual(task._gpu_reqs, gpu_reqs) - self.assertEqual(task.gpu_reqs, {'gpu_processes' : 2, - 'gpu_process_type' : None, - 'gpu_threads' : 1, + self.assertEqual(task.gpu_reqs, {'gpu_processes' : 2, + 'gpu_process_type' : None, + 'gpu_threads' : 1, 'gpu_thread_type' : 'OpenMP'}) with self.assertRaises(ree.TypeError): task.gpu_reqs = list() with self.assertRaises(ree.MissingError): - task.gpu_reqs = {'gpu_processes' : 2, - 'gpu_process_type' : None, + task.gpu_reqs = {'gpu_processes' : 2, + 'gpu_process_type' : None, 'gpu_thread_type' : 'OpenMP'} with self.assertRaises(ree.TypeError): - task.gpu_reqs = {'gpu_processes' : 'a', - 'gpu_process_type' : None, + task.gpu_reqs = {'gpu_processes' : 'a', + 'gpu_process_type' : None, 'gpu_threads' : 1, 'gpu_thread_type' : 'OpenMP'} with self.assertRaises(ree.TypeError): - task.gpu_reqs = {'gpu_processes' : 1, - 'gpu_process_type' : None, + task.gpu_reqs = {'gpu_processes' : 1, + 'gpu_process_type' : None, 'gpu_threads' : 'a', 'gpu_thread_type' : 'OpenMP'} with self.assertRaises(ree.ValueError): - task.gpu_reqs = {'gpu_processes' : 1, - 'gpu_process_type' : None, + task.gpu_reqs = {'gpu_processes' : 1, + 'gpu_process_type' : None, 'gpu_threads' : 1, 'gpu_thread_type' : 'MPI'} with self.assertRaises(ree.ValueError): - task.gpu_reqs = {'gpu_processes' : 1, - 'gpu_process_type' : 'test', + task.gpu_reqs = {'gpu_processes' : 1, + 'gpu_process_type' : 'test', 'gpu_threads' : 1, 'gpu_thread_type' : 'OpenMP'} @@ -231,14 +231,14 @@ def test_dict_to_task(self): 'pre_exec' : ['bar'], 'executable': 'buz', 'arguments' : ['baz', 'fiz'], - 'cpu_reqs' : {'processes' : 1, - 'process_type' : None, - 'threads_per_process': 1, - 'thread_type' : None}, - 'gpu_reqs' : {'processes' : 0, - 'process_type' : None, - 'threads_per_process': 0, - 'thread_type' : None}} + 'cpu_reqs' : {'cpu_processes' : 1, + 'cpu_process_type': None, + 'cpu_threads' : 1, + 'cpu_thread_type' : None}, + 'gpu_reqs' : {'gpu_processes' : 0, + 'gpu_process_type': None, + 'gpu_threads' : 0, + 'gpu_thread_type' : None}} t = Task(from_dict=d) for k,v in d.items(): @@ -249,14 +249,14 @@ def test_dict_to_task(self): 'pre_exec' : ['bar'], 'executable': 'buz', 'arguments' : ['baz', 'fiz'], - 'cpu_reqs' : {'processes' : 1, - 'process_type' : None, - 'threads_per_process': 1, - 'thread_type' : None}, - 'gpu_reqs' : {'processes' : 0, - 'process_type' : None, - 'threads_per_process': 0, - 'thread_type' : None}} + 'cpu_reqs' : {'cpu_processes' : 1, + 'cpu_process_type': None, + 'cpu_threads' : 1, + 'cpu_thread_type' : None}, + 'gpu_reqs' : {'gpu_processes' : 0, + 'gpu_process_type': None, + 'gpu_threads' : 0, + 'gpu_thread_type' : None}} t = Task() t.from_dict(d) @@ -282,6 +282,138 @@ def test_executable(self, mocked_init): with self.assertRaises(ree.TypeError): task.executable = ['test_exec'] + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_name(self, mocked_init): + + task = Task() + task._uid = 'test' + task._name = 'test_name' + self.assertEqual(task.name, 'test_name') + + task.name = 'task.0000' + self.assertEqual(task._name, 'task.0000') + + with self.assertRaises(ree.TypeError): + task.name = 0 + + with self.assertRaises(ree.ValueError): + task.name = 'task,0000' + + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_state_history(self, mocked_init): + + task = Task() + task._uid = 'test' + task._state_history = [states.INITIAL] + self.assertEqual(task.state_history, [states.INITIAL]) + + task.state_history = [states.SCHEDULED] + self.assertEqual(task._state_history, [states.SCHEDULED]) + + with self.assertRaises(ree.TypeError): + task.state_history = states.SCHEDULING + + with self.assertRaises(ree.ValueError): + task.state_history = ['EXECUTING'] + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_pre_exec(self, mocked_init): + + task = Task() + task._pre_exec = ['module load mymodule'] + self.assertEqual(task.pre_exec, ['module load mymodule']) + + task.pre_exec = ['module load mymodule2'] + self.assertEqual(task._pre_exec, ['module load mymodule2']) + with self.assertRaises(ree.TypeError): + task.pre_exec = 'module load mymodule' + + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_arguments(self, mocked_init): + + task = Task() + task._arguments = ['module load mymodule'] + self.assertEqual(task.arguments, ['module load mymodule']) + + task.arguments = ['module load mymodule2'] + self.assertEqual(task._arguments, ['module load mymodule2']) + with self.assertRaises(ree.TypeError): + task.arguments = 'module load mymodule' + + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_sandbox(self, mocked_init): + + task = Task() + task._sandbox = '/path/to/a/sandbox' + self.assertEqual(task.sandbox, '/path/to/a/sandbox') + + task.sandbox = '/path_to_a_sandbox' + self.assertEqual(task._sandbox, '/path_to_a_sandbox') + with self.assertRaises(ree.TypeError): + task.sandbox = [] + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_post_exec(self, mocked_init): + + task = Task() + task._post_exec = ['module load mymodule'] + self.assertEqual(task.post_exec, ['module load mymodule']) + + task.post_exec = ['module load mymodule2'] + self.assertEqual(task._post_exec, ['module load mymodule2']) + with self.assertRaises(ree.TypeError): + task.post_exec = 'module load mymodule' + + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_tag(self, mocked_init): + + task = Task() + task._tags = {'colocate':'tasks'} + self.assertEqual(task.tag, {'colocate':'tasks'}) + + task.tag = 'task.tag' + self.assertEqual(task._tags, {'colocate': 'task.tag'}) + + with self.assertRaises(ree.TypeError): + task.tag = {'colocate':'tasks'} + + + # -------------------------------------------------------------------------- + # + @mock.patch.object(Task, '__init__', return_value=None) + def test_tags(self, mocked_init): + + task = Task() + task._tags = {'colocate':'tasks'} + self.assertEqual(task.tags, {'colocate':'tasks'}) + + task.tags = {'colocate':'task'} + self.assertEqual(task._tags, {'colocate':'task'}) + + with self.assertRaises(ree.TypeError): + task.tags = 'task' + + with self.assertRaises(ree.TypeError): + task.tags = {'key':'task'} + # -------------------------------------------------------------------------- # @mock.patch.object(Task, '__init__', return_value=None) @@ -321,7 +453,7 @@ def test_task_to_dict(self, mocked_init): t._stdout = 'Hello World' t._stderr = 'Hello World' t._exit_code = 0 - t._tag = None + t._tags = None t._path = 'some_path' t._p_pipeline = dict() t._p_pipeline['uid'] = 'pipe.0000' @@ -360,7 +492,7 @@ def test_task_to_dict(self, mocked_init): 'stderr': 'Hello World', 'exit_code': 0, 'path': 'some_path', - 'tag': None, + 'tags': None, 'rts_uid': 'unit.0000', 'parent_stage': {'name': 'stage.0000', 'uid': 'stage.0000'}, diff --git a/tests/test_component/test_tmgr_rp.py b/tests/test_component/test_tmgr_rp.py index 7a22d50ef..f8e0efd84 100644 --- a/tests/test_component/test_tmgr_rp.py +++ b/tests/test_component/test_tmgr_rp.py @@ -49,7 +49,6 @@ def test_init(self, mocked_generate_id, mocked_getcwd, mocked_Logger, tmgr = RPTmgr('test_tmgr', ['pending_queues'], ['completed_queues'], rmgr, rmq_params) self.assertIsNone(tmgr._rts_runner) - self.assertEqual(tmgr._rmq_ping_interval, 10) # -------------------------------------------------------------------------- # @@ -71,7 +70,6 @@ def test_start_manager(self, mocked_init, mocked_Logger, mocked_Profiler): tmgr._completed_queue = ['completed_queues'] tmgr._tmgr = _tmgr_side_effect - tmgr._tmgr_terminate = None tmgr._tmgr_process = None tmgr.start_manager() diff --git a/tests/test_component/test_tproc_rp.py b/tests/test_component/test_tproc_rp.py index 6aa7fba43..6ea3ccfd1 100755 --- a/tests/test_component/test_tproc_rp.py +++ b/tests/test_component/test_tproc_rp.py @@ -12,8 +12,8 @@ from radical.entk.execman.rp.task_processor import resolve_tags from radical.entk.execman.rp.task_processor import get_input_list_from_task from radical.entk.execman.rp.task_processor import get_output_list_from_task -from radical.entk.execman.rp.task_processor import create_cud_from_task -from radical.entk.execman.rp.task_processor import create_task_from_cu +from radical.entk.execman.rp.task_processor import create_td_from_task +from radical.entk.execman.rp.task_processor import create_task_from_rp try: import mock @@ -98,68 +98,76 @@ def test_resolve_tags(self, mocked_Logger): pipeline_name = 'p1' stage_name = 's1' - t1_name = 't1' + task = mock.Mock() + task.uid = 'task.0000' + task.tags = {'colocate': task.uid} + task2 = mock.Mock() + task2.uid = 'task.0001' + task2.tags = None t2_name = 't2' placeholders = { pipeline_name: { stage_name: { - t1_name: { + task.uid: { 'path' : '/home/vivek/t1', - 'rts_uid': 'unit.0002' + 'uid': 'unit.0002' }, t2_name: { 'path' : '/home/vivek/t2', - 'rts_uid': 'unit.0003' + 'uid': 'unit.0003' } } } } - self.assertEqual(resolve_tags(tag=t1_name, + + + self.assertEqual(resolve_tags(task=task, parent_pipeline_name=pipeline_name, - placeholders=placeholders), 'unit.0002') + placeholders=placeholders), + 'unit.0002') - with self.assertRaises(ree.EnTKError): - resolve_tags(tag='t3', parent_pipeline_name=pipeline_name, - placeholders=placeholders) + self.assertEqual(resolve_tags(task=task2, parent_pipeline_name=pipeline_name, + placeholders=placeholders), 'task.0001') # ------------------------------------------------------------------------------ # - @mock.patch('radical.pilot.ComputeUnitDescription') + @mock.patch('radical.pilot.TaskDescription') @mock.patch('radical.utils.Logger') @mock.patch.object(radical.entk.execman.rp.task_processor, 'get_output_list_from_task', return_value='outputs') @mock.patch.object(radical.entk.execman.rp.task_processor, 'resolve_arguments', return_value='test_args') + @mock.patch.object(radical.entk.execman.rp.task_processor, 'resolve_tags', return_value='test_tag') @mock.patch.object(radical.entk.execman.rp.task_processor, 'get_input_list_from_task', return_value='inputs') - def test_create_cud_from_task(self, mocked_ComputeUnitDescription, + def test_create_td_from_task(self, mocked_TaskDescription, mocked_Logger, mocked_get_input_list_from_task, mocked_get_output_list_from_task, - mocked_resolve_arguments): - - mocked_ComputeUnitDescription.name = None - mocked_ComputeUnitDescription.pre_exec = None - mocked_ComputeUnitDescription.executable = None - mocked_ComputeUnitDescription.arguments = None - mocked_ComputeUnitDescription.sandbox = None - mocked_ComputeUnitDescription.post_exec = None - mocked_ComputeUnitDescription.tag = None - mocked_ComputeUnitDescription.cpu_processes = None - mocked_ComputeUnitDescription.cpu_threads = None - mocked_ComputeUnitDescription.cpu_process_type = None - mocked_ComputeUnitDescription.cpu_thread_type = None - mocked_ComputeUnitDescription.gpu_processes = None - mocked_ComputeUnitDescription.gpu_threads = None - mocked_ComputeUnitDescription.gpu_process_type = None - mocked_ComputeUnitDescription.gpu_thread_type = None - mocked_ComputeUnitDescription.lfs_per_process = None - mocked_ComputeUnitDescription.stdout = None - mocked_ComputeUnitDescription.stderr = None - mocked_ComputeUnitDescription.input_staging = None - mocked_ComputeUnitDescription.output_staging = None + mocked_resolve_arguments, mocked_resolve_tags): + + mocked_TaskDescription.name = None + mocked_TaskDescription.pre_exec = None + mocked_TaskDescription.executable = None + mocked_TaskDescription.arguments = None + mocked_TaskDescription.sandbox = None + mocked_TaskDescription.post_exec = None + mocked_TaskDescription.tag = None + mocked_TaskDescription.cpu_processes = None + mocked_TaskDescription.cpu_threads = None + mocked_TaskDescription.cpu_process_type = None + mocked_TaskDescription.cpu_thread_type = None + mocked_TaskDescription.gpu_processes = None + mocked_TaskDescription.gpu_threads = None + mocked_TaskDescription.gpu_process_type = None + mocked_TaskDescription.gpu_thread_type = None + mocked_TaskDescription.lfs_per_process = None + mocked_TaskDescription.stdout = None + mocked_TaskDescription.stderr = None + mocked_TaskDescription.input_staging = None + mocked_TaskDescription.output_staging = None task = mock.Mock() task.uid = 'task.0000' - task.name = 'task.0000' + task.name = 'task.name' task.parent_stage = {'uid' : 'stage.0000', 'name' : 'stage.0000'} task.parent_pipeline = {'uid' : 'pipe.0000', @@ -177,39 +185,39 @@ def test_create_cud_from_task(self, mocked_ComputeUnitDescription, 'gpu_threads': 6, 'gpu_process_type': 'POSIX', 'gpu_thread_type': None} - task.tag = None + task.tags = None task.lfs_per_process = 235 task.stderr = 'stderr' task.stdout = 'stdout' - test_cud = create_cud_from_task(task, None) - self.assertEqual(test_cud.name, 'task.0000,task.0000,stage.0000,stage.0000,pipe.0000,pipe.0000') - self.assertEqual(test_cud.pre_exec, 'post_exec') - self.assertEqual(test_cud.executable, '/bin/date') - self.assertEqual(test_cud.arguments, 'test_args') - self.assertEqual(test_cud.sandbox, 'unit.0000') - self.assertEqual(test_cud.post_exec, '') - self.assertEqual(test_cud.cpu_processes, 5) - self.assertEqual(test_cud.cpu_threads, 6) - self.assertEqual(test_cud.cpu_process_type, 'POSIX') - self.assertIsNone(test_cud.cpu_thread_type) - self.assertEqual(test_cud.gpu_processes, 5) - self.assertEqual(test_cud.gpu_threads, 6) - self.assertEqual(test_cud.gpu_process_type, 'POSIX') - self.assertIsNone(test_cud.gpu_thread_type) - self.assertEqual(test_cud.lfs_per_process, 235) - self.assertEqual(test_cud.stdout, 'stdout') - self.assertEqual(test_cud.stderr, 'stderr') - self.assertEqual(test_cud.input_staging, 'inputs') - self.assertEqual(test_cud.output_staging, 'outputs') - + test_td = create_td_from_task(task, None) + self.assertEqual(test_td.name, 'task.0000,task.name,stage.0000,stage.0000,pipe.0000,pipe.0000') + self.assertEqual(test_td.pre_exec, 'post_exec') + self.assertEqual(test_td.executable, '/bin/date') + self.assertEqual(test_td.arguments, 'test_args') + self.assertEqual(test_td.sandbox, 'unit.0000') + self.assertEqual(test_td.post_exec, '') + self.assertEqual(test_td.cpu_processes, 5) + self.assertEqual(test_td.cpu_threads, 6) + self.assertEqual(test_td.cpu_process_type, 'POSIX') + self.assertIsNone(test_td.cpu_thread_type) + self.assertEqual(test_td.gpu_processes, 5) + self.assertEqual(test_td.gpu_threads, 6) + self.assertEqual(test_td.gpu_process_type, 'POSIX') + self.assertIsNone(test_td.gpu_thread_type) + self.assertEqual(test_td.lfs_per_process, 235) + self.assertEqual(test_td.stdout, 'stdout') + self.assertEqual(test_td.stderr, 'stderr') + self.assertEqual(test_td.input_staging, 'inputs') + self.assertEqual(test_td.output_staging, 'outputs') + self.assertEqual(test_td.tag, 'test_tag') # ------------------------------------------------------------------------------ # @mock.patch('radical.entk.Task') @mock.patch('radical.utils.Logger') - def test_create_task_from_cu(self, mocked_Task, mocked_Logger): + def test_create_task_from_rp(self, mocked_Task, mocked_Logger): test_cud = mock.Mock() test_cud.name = 'task.0000,task.0000,stage.0000,stage.0000,pipe.0000,pipe.0000' test_cud.pre_exec = 'post_exec' @@ -241,7 +249,7 @@ def test_create_task_from_cu(self, mocked_Task, mocked_Logger): mocked_Task.path = None mocked_Task.rts_uid = None - task = create_task_from_cu(test_cud, None) + task = create_task_from_rp(test_cud, None) self.assertEqual(task.uid, 'task.0000') self.assertEqual(task.name, 'task.0000') self.assertEqual(task.parent_stage, {'uid': 'stage.0000', 'name': 'stage.0000'}) @@ -286,15 +294,15 @@ def test_issue_271(self, mocked_Task, mocked_Logger): mocked_Task.path = None mocked_Task.rts_uid = None - task = create_task_from_cu(test_cud, None) + task = create_task_from_rp(test_cud, None) self.assertEqual(task.exit_code, 0) test_cud.state = 'FAILED' - task = create_task_from_cu(test_cud, None) + task = create_task_from_rp(test_cud, None) self.assertEqual(task.exit_code, 1) test_cud.state = 'EXECUTING' - task = create_task_from_cu(test_cud, None) + task = create_task_from_rp(test_cud, None) self.assertIsNone(task.exit_code) # ------------------------------------------------------------------------------ @@ -372,8 +380,7 @@ def test_get_output_list_from_task(self, mocked_Logger): # @mock.patch('radical.pilot.ComputeUnitDescription') @mock.patch('radical.utils.Logger') - def test_issue_259(self, mocked_ComputeUnitDescription, - mocked_Logger): + def test_issue_259(self, mocked_ComputeUnitDescription, mocked_Logger): mocked_ComputeUnitDescription.name = None mocked_ComputeUnitDescription.pre_exec = None @@ -396,21 +403,21 @@ def test_issue_259(self, mocked_ComputeUnitDescription, mocked_ComputeUnitDescription.input_staging = None mocked_ComputeUnitDescription.output_staging = None - pipeline_name = 'p1' - stage_name = 's1' - t1_name = 't1' - t2_name = 't2' + pipeline_name = 'pipe.0000' + stage_name = 'stage.0000' + t1_name = 'task.0000' + t2_name = 'task.0001' placeholders = { pipeline_name: { stage_name: { t1_name: { 'path' : '/home/vivek/t1', - 'rts_uid': 'unit.0002' + 'uid': 'unit.0002' }, t2_name: { 'path' : '/home/vivek/t2', - 'rts_uid': 'unit.0003' + 'uid': 'unit.0003' } } } @@ -443,7 +450,7 @@ def test_issue_259(self, mocked_ComputeUnitDescription, 'gpu_threads': 6, 'gpu_process_type': 'POSIX', 'gpu_thread_type': None} - task.tag = None + task.tags = None task.lfs_per_process = 235 task.stderr = 'stderr' @@ -479,7 +486,7 @@ def test_issue_259(self, mocked_ComputeUnitDescription, task.copy_output_data = ['test_file > $SHARED/test_file'] task.move_output_data = ['test_file > $SHARED/test_file'] - test_cud = create_cud_from_task(task, placeholders) + test_cud = create_td_from_task(task, placeholders) self.assertEqual(test_cud.name, 'task.0000,task.0000,stage.0000,stage.0000,pipe.0000,pipe.0000') self.assertEqual(test_cud.pre_exec, 'post_exec') self.assertEqual(test_cud.executable, '/bin/date') diff --git a/tests/test_integration/test_tmgr_rp/test_tproc_rp.py b/tests/test_integration/test_tmgr_rp/test_tproc_rp.py new file mode 100755 index 000000000..fadf02230 --- /dev/null +++ b/tests/test_integration/test_tmgr_rp/test_tproc_rp.py @@ -0,0 +1,86 @@ +# pylint: disable=protected-access, unused-argument +# pylint: disable=no-value-for-parameter + +from unittest import TestCase + +from radical.entk.execman.rp.task_processor import create_td_from_task +from radical.entk import Task + + +class TestBase(TestCase): + + # ------------------------------------------------------------------------------ + # + def test_create_td_from_task(self): + + pipeline_name = 'pipe.0000' + stage_name = 'stage.0000' + t1_name = 'task.0000' + t2_name = 'task.0001' + + placeholders = { + pipeline_name: { + stage_name: { + t1_name: { + 'path' : '/home/vivek/t1', + 'uid': 'task.0000' + }, + t2_name: { + 'path' : '/home/vivek/t2', + 'uid': 'task.0003' + } + } + } + } + + task = Task() + task.uid = 'task.0000' + task.name = 'task.0000' + task.parent_stage = {'uid' : 'stage.0000', + 'name' : 'stage.0000'} + task.parent_pipeline = {'uid' : 'pipe.0000', + 'name' : 'pipe.0000'} + task.pre_exec = ['post_exec'] + task.executable = '/bin/date' + task.arguments = ['test_args'] + task.sandbox = 'unit.0000' + task.post_exec = [''] + task.cpu_reqs = {'cpu_processes': 5, + 'cpu_threads': 6, + 'cpu_process_type': 'MPI', + 'cpu_thread_type': None} + task.gpu_reqs = {'gpu_processes': 5, + 'gpu_threads': 6, + 'gpu_process_type': None, + 'gpu_thread_type': None} + + task.lfs_per_process = 235 + task.stderr = 'stderr' + task.stdout = 'stdout' + + test_td = create_td_from_task(task, placeholders) + self.assertEqual(test_td.name, 'task.0000,task.0000,stage.0000,stage.0000,pipe.0000,pipe.0000') + self.assertEqual(test_td.pre_exec, ['post_exec']) + self.assertEqual(test_td.executable, '/bin/date') + self.assertEqual(test_td.arguments, ['test_args']) + self.assertEqual(test_td.sandbox, 'unit.0000') + self.assertEqual(test_td.post_exec, ['']) + self.assertEqual(test_td.cpu_processes, 5) + self.assertEqual(test_td.cpu_threads, 6) + self.assertEqual(test_td.cpu_process_type, 'MPI') + self.assertIsNone(test_td.cpu_thread_type) + self.assertEqual(test_td.gpu_processes, 5) + self.assertEqual(test_td.gpu_threads, 6) + self.assertEqual(test_td.gpu_process_type, None) + self.assertIsNone(test_td.gpu_thread_type) + self.assertEqual(test_td.lfs_per_process, 235) + self.assertEqual(test_td.stdout, 'stdout') + self.assertEqual(test_td.stderr, 'stderr') + self.assertEqual(test_td.input_staging, []) + self.assertEqual(test_td.output_staging, []) + self.assertEqual(test_td.tag, 'task.0000') + + task.tags = {'colocate': 'task.0001'} + test_td = create_td_from_task(task, placeholders) + self.assertEqual(test_td.tag, 'task.0003') + diff --git a/tests/test_issues/test_issue_324.py b/tests/test_issues/test_issue_324.py new file mode 100644 index 000000000..0b217ff8d --- /dev/null +++ b/tests/test_issues/test_issue_324.py @@ -0,0 +1,49 @@ +from radical.entk import Task, states + + +def test_issue_271(): + ''' + Test if task.from_dict accepts executable as a str or list + ''' + + d = {'uid': 're.Task.0000', + 'name': 't1', + 'state': states.DONE, + 'state_history': [states.INITIAL, states.DONE], + 'pre_exec': [], + 'executable': 'sleep', + 'arguments': [], + 'post_exec': [], + 'cpu_reqs': {'processes': 1, + 'process_type': None, + 'threads_per_process': 1, + 'thread_type': None + }, + 'gpu_reqs': {'processes': 0, + 'process_type': None, + 'threads_per_process': 0, + 'thread_type': None + }, + 'lfs_per_process': 1024, + 'upload_input_data': [], + 'copy_input_data': [], + 'link_input_data': [], + 'move_input_data': [], + 'copy_output_data': [], + 'move_output_data': [], + 'download_output_data': [], + 'stdout': 'out', + 'stderr': 'err', + 'exit_code': 555, + 'path': 'here/it/is', + 'tags': {'colocate': 'task.0010'}, + 'parent_stage': {'uid': 's1', 'name': 'stage1'}, + 'parent_pipeline': {'uid': 'p1', 'name': 'pipe1'}} + + t = Task() + t.from_dict(d) + + d['executable'] = 'sleep' + + t = Task() + t.from_dict(d)