Skip to content

Commit

Permalink
Merge pull request #553 from radical-cybertools/devel
Browse files Browse the repository at this point in the history
Release 1.6.0
  • Loading branch information
lee212 committed Feb 16, 2021
2 parents 1d78c94 + c1cc422 commit 00a27cc
Show file tree
Hide file tree
Showing 19 changed files with 749 additions and 412 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Expand Up @@ -6,4 +6,5 @@ coverage:
threshold: 1%
paths:
- "src"
base: "pr"
patch: off
15 changes: 11 additions & 4 deletions .github/workflows/python-app.yml
Expand Up @@ -36,21 +36,28 @@ jobs:
pip install hypothesis
pip install coverage
pip install codecov
pip install coveralls
pip install pytest
pip install pytest-xdist
pip install pytest-timeout
pip install timeout_decorator
pip install timeout-decorator
- name: Test with pytest
env:
RMQ_HOSTNAME: localhost
RMQ_PORT: ${{ job.services.rabbitmq.ports[5672] }} # get randomly assigned published port
RMQ_USERNAME: guest
RMQ_PASSWORD: guest
LOC: /opt/hostedtoolcache/Python/3.6.12/x64/lib/python3.6/site-packages
run: |
LOC=/opt/hostedtoolcache/Python/3.6.12/x64/lib/python3.6/site-packages
coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration
coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration
- name: Codecov
uses: codecov/codecov-action@v1.0.15
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: |
coverage combine; \
coverage xml; \
coverage report; \
curl -s https://codecov.io/bash | bash
flake8:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
python-version: '3.6'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -29,5 +29,5 @@ jobs:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist bdist_wheel
python setup.py sdist
twine upload dist/*
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
1.5.12
1.6.0
175 changes: 89 additions & 86 deletions examples/misc/lfs_tagging_dd.py
@@ -1,102 +1,105 @@
#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
from radical.entk.exceptions import *
import os
import sys
import argparse

hostname = os.environ.get('RMQ_HOSTNAME','localhost')
port = int(os.environ.get('RMQ_PORT',5672))


def get_pipeline(shared_fs=False, size=1):

p = Pipeline()
p.name = 'p'

n = 4

s1 = Stage()
s1.name = 's1'
for x in range(n):
t = Task()
t.name = 't%s'%x

# dd if=/dev/random bs=<byte size of a chunk> count=<number of chunks> of=<output file name>

t.executable = 'dd'

if not shared_fs:
t.arguments = ['if=/dev/urandom','bs=%sM'%size, 'count=1', 'of=$NODE_LFS_PATH/s1_t%s.txt'%x]
else:
t.arguments = ['if=/dev/urandom','bs=%sM'%size, 'count=1', 'of=/home/vivek91/s1_t%s.txt'%x]

t.cpu_reqs['processes'] = 1
t.cpu_reqs['threads_per_process'] = 24
t.cpu_reqs['thread_type'] = ''
t.cpu_reqs['process_type'] = ''
t.lfs_per_process = 1024

s1.add_tasks(t)

p.add_stages(s1)

s2 = Stage()
s2.name = 's2'
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'

# No need to change/set any variables if you installed RabbitMQ has a system
# process. If you are running RabbitMQ in a Docker container or on a dedicated
# virtual machine, set the variables "RMQ_HOSTNAME" and "RMQ_PORT" in the shell
# environment in which you are running this script.
hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', '5672'))
username = os.environ.get('RMQ_USERNAME')
password = os.environ.get('RMQ_PASSWORD')


# Each task of this example prints the hostname of the node on which it is
# executed. Tagged tasks should print the same hostname.
def get_pipeline(n=2):
'''
We create a pipeline with three stages, each with 1 task. The tasks of the
second and third stages are tagged to execute on the same compute node on
which the first stage's task executed.
'''

pipelines = list()
for x in range(n):
t = Task()
t.executable = 'dd'

if not shared_fs:
t.arguments = ['if=$NODE_LFS_PATH/s1_t%s.txt'%x,'bs=%sM'%size, 'count=1', 'of=$NODE_LFS_PATH/s2_t%s.txt'%x]
else:
t.arguments = ['if=/home/vivek91/s1_t%s.txt'%x,'bs=%sM'%size, 'count=1', 'of=/home/vivek91/s2_t%s.txt'%x]

t.cpu_reqs['processes'] = 1
t.cpu_reqs['threads_per_process'] = 24
t.cpu_reqs['thread_type'] = ''
t.cpu_reqs['process_type'] = ''
t.tag = 't%s'%x

s2.add_tasks(t)


p.add_stages(s2)

return p

pipeline = Pipeline()
pipeline.name = 'p.%04d' % x

stage1 = Stage()
stage1.name = 'stage1'
# task1 of stage1 will execute on the first available and suitable node.
task1 = Task()
task1.name = 'task1.%04d' % x
task1.executable = 'hostname'
# Set enough threads for task1 to get a whole compute node
task1.cpu_reqs = {'cpu_processes': 1,
'cpu_threads': 1,
'cpu_process_type': None,
'cpu_thread_type': None}
task1.lfs_per_process = 10
stage1.add_tasks(task1)

pipeline.add_stages(stage1)

stage2 = Stage()
stage2.name = 'stage2'

task2 = Task()
task2.name = 'task2.%04d' % x
task2.executable = 'hostname'
task2.cpu_reqs = {'cpu_processes': 1,
'cpu_threads': 1,
'cpu_process_type': None,
'cpu_thread_type': None}
# We use the ID of task1 as the tag of task2. In this way, task2 will
# execute on the same node on which task1 executed.
task2.tags = {'colocate': task1.uid}
task2.lfs_per_process = 10
stage2.add_tasks(task2)


pipeline.add_stages(stage2)

stage3 = Stage()
stage3.name = 'stage3'

task3 = Task()
task3.name = 'task3.%04d' % x
task3.executable = 'hostname'
task3.cpu_reqs = {'cpu_processes': 1,
'cpu_threads': 1,
'cpu_process_type': None,
'cpu_thread_type': None}
task3.lfs_per_process = 10
# We use the ID of task1 as the tag of task3. In this way, task3 will
# execute on the same node on which task1 and task2 executed.
task3.tag = {'colocate': task1.uid}
stage3.add_tasks(task3)

pipeline.add_stages(stage3)
pipelines.append(pipeline)

return pipelines


if __name__ == '__main__':

args = argparse.ArgumentParser()
args.add_argument('sharedfs')
args.add_argument('size')

args = args.parse_args()
if args.sharedfs == 'shared':
shared_fs = True
else:
shared_fs = False
size = args.size

print('SharedFS: ', shared_fs, size)

os.environ['RADICAL_PILOT_DBURL'] = 'mongodb://entk:entk123@ds159631.mlab.com:59631/da-lfs-test'

# Request at least two compute nodes
res_dict = {
'resource' : 'xsede.comet',
'walltime' : 30,
'cpus' : 120,
'project' : 'unc100'
# 'project' : 'gk4',
# 'queue' : 'high'
'resource' : 'local.localhost',
'walltime' : 20,
'cpus' : 2,
}

appman = AppManager(hostname=hostname, port=port)
appman = AppManager(hostname=hostname, port=port, username=username, password=password)
appman.resource_desc = res_dict

p = get_pipeline(shared_fs=shared_fs, size=size)
appman.workflow = [p]
# Select n to be >= to the number of available compute nodes.
p = get_pipeline(n=2)
appman.workflow = set(p)
appman.run()
4 changes: 3 additions & 1 deletion src/radical/entk/__init__.py
@@ -1,4 +1,4 @@

# pylint: disable=unused-argument
# ------------------------------------------------------------------------------
#
from .pipeline import Pipeline
Expand All @@ -17,10 +17,12 @@
import requests as req
from packaging.version import parse as parse_version


def custom_formatwarning(msg, *args, **kwargs):
# ignore everything except the message
return str(msg) + '\n'


warnings.formatwarning = custom_formatwarning

version_short, version_detail, version_base, version_branch, \
Expand Down
6 changes: 4 additions & 2 deletions src/radical/entk/execman/mock/task_manager.py
Expand Up @@ -244,13 +244,15 @@ def _process_tasks(self, task_queue, rmgr, rmq_conn_params):
bulk_tasks.append(task)

self._advance(task, 'Task', states.SUBMITTING,
mq_channel, '%s-tmgr-to-sync' % self._sid)
mq_channel, rmq_conn_params,
'%s-tmgr-to-sync' % self._sid)

# this mock RTS immmedialtely completes all tasks
for task in bulk_tasks:

self._advance(task, 'Task', states.COMPLETED,
mq_channel, '%s-cb-to-sync' % self._sid)
mq_channel, rmq_conn_params,
'%s-cb-to-sync' % self._sid)

task_as_dict = json.dumps(task.to_dict())
mq_channel.basic_publish(
Expand Down
4 changes: 2 additions & 2 deletions src/radical/entk/execman/rp/resource_manager.py
Expand Up @@ -162,8 +162,8 @@ def _pilot_state_cb(pilot, state):
'job_name' : self._job_name
}

# Create Compute Pilot with validated resource description
pdesc = rp.ComputePilotDescription(pd_init)
# Create Pilot with validated resource description
pdesc = rp.PilotDescription(pd_init)
self._prof.prof('rreq created', uid=self._uid)

# Launch the pilot
Expand Down

0 comments on commit 00a27cc

Please sign in to comment.