Skip to content

Commit

Permalink
Merge pull request #658 from radical-cybertools/feature/darshan
Browse files Browse the repository at this point in the history
automate dataflow annotation
  • Loading branch information
mtitov committed Feb 8, 2024
2 parents 562cc0a + 3820213 commit 0e334c4
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 13 deletions.
83 changes: 83 additions & 0 deletions examples/misc/darshan_enabled.py
@@ -0,0 +1,83 @@
#!/usr/bin/env python3

import os

import radical.entk as re
import radical.pilot as rp

from radical.entk.utils.provenance import enable_darshan, get_provenance_graph

RESOURCE_DESCRIPTION = {
# https://radicalpilot.readthedocs.io/en/stable/supported/polaris.html
'resource': 'anl.polaris',
'project' : 'RECUP',
'queue' : 'debug',
'cpus' : 32,
'walltime': 15
}

os.environ['RADICAL_LOG_LVL'] = 'DEBUG'
os.environ['RADICAL_REPORT'] = 'TRUE'


# pylint: disable=anomalous-backslash-in-string
def get_stages():

# hello-RP task
task_00 = re.Task({
'executable': 'radical-pilot-hello.sh',
'arguments' : [10],
'cpu_reqs' : {'cpu_processes' : 1,
'cpu_threads' : 4,
'cpu_thread_type': rp.OpenMP}
})

# R/W data
output_01 = 'output_01.dat'
task_01 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c', f'cat input.dat | wc > {output_01}'],
'upload_input_data': ['/etc/passwd > input.dat'],
'copy_output_data' : [f'{output_01} > $SHARED/{output_01}']
})

stage_0 = re.Stage()
stage_0.add_tasks([task_00, task_01])

# R/W data and task depends on the task from the previous stage
task_10 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c',
f"sed -r 's/\s+//g' {output_01} " + # noqa: W605
'| grep -o . | sort | uniq -c > output_10.dat'],
'copy_input_data': [f'$SHARED/{output_01} > {output_01}']
})

stage_1 = re.Stage()
stage_1.add_tasks([task_10])

return [stage_0, stage_1]


def main():
pipeline = re.Pipeline()
pipeline.add_stages(get_stages())
workflow = [pipeline]

enable_darshan(pipelines=workflow,
modules=['e4s/22.08/PrgEnv-gnu',
'darshan-runtime',
'darshan-util'])

amgr = re.AppManager()
amgr.resource_desc = RESOURCE_DESCRIPTION
amgr.workflow = workflow
amgr.run()

print(get_provenance_graph(pipelines=workflow,
output_file='entk_provenance.json'))


if __name__ == '__main__':
main()

109 changes: 96 additions & 13 deletions src/radical/entk/utils/provenance.py
@@ -1,36 +1,119 @@

__copyright__ = 'Copyright 2023, The RADICAL-Cybertools Team'
__copyright__ = 'Copyright 2024, The RADICAL-Cybertools Team'
__license__ = 'MIT'

from typing import Optional, Dict, List
import glob

from typing import Optional, Dict, List, Union

import radical.utils as ru

from .. import Pipeline
from .. import Pipeline, Task

_darshan_env = None


# ------------------------------------------------------------------------------
#
def enable_darshan(pipelines: List[Pipeline],
darshan_runtime_root: Optional[str] = None,
modules: Optional[List[str]] = None) -> None:

if darshan_runtime_root:
if not darshan_runtime_root.startswith('/'):
raise RuntimeError('Path for the darshan installation '
'should be an absolute path '
f'(provided path: {darshan_runtime_root})')
else:
darshan_runtime_root = '$DARSHAN_RUNTIME_ROOT'

global _darshan_env

darshan_activation_cmds = []
for module in modules or []:
darshan_activation_cmds.append(f'module load {module}')
_darshan_env = ru.env_prep(pre_exec_cached=darshan_activation_cmds)

for pipeline in pipelines:
for stage in pipeline.stages:
for task in stage.tasks:

darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan'
darshan_enable = (f'LD_PRELOAD="{darshan_runtime_root}'
'/lib/libdarshan.so" ')

if task.cpu_reqs.cpu_processes == 1:
darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 '

task.executable = darshan_enable + task.executable
task.pre_launch += [f'mkdir -p {darshan_log_dir}']
task.pre_exec.extend(
darshan_activation_cmds +
[f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}'])


# ------------------------------------------------------------------------------
#
def get_provenance_graph(pipelines: List[Pipeline],
def get_parsed_data(log: str, target_counters: Union[str, List[str]]) -> set:

data = set()

grep_patterns = '-e ' + ' -e '.join(ru.as_list(target_counters))
parser_cmd = (f'darshan-parser {log} | grep {grep_patterns} | '
"awk '{print $5\":\"$6}'")
out, err, ret = ru.sh_callout(parser_cmd, env=_darshan_env, shell=True)
if ret:
print(f'[ERROR] Darshan not able to parse "{log}": {err}')
else:
for o in out.split('\n'):
if not o:
continue
value, file = o.split(':')
try: value = int(value)
except ValueError: value = 0
if value > 0 and file.startswith('/'):
data.add(file)

return data


# ------------------------------------------------------------------------------
#
def annotate_task_with_darshan(task: Task) -> None:

inputs = set()
outputs = set()

for log in glob.glob(f'{task.path}/{task.uid}_darshan/*'):

inputs.update(get_parsed_data(log, ['POSIX_BYTES_READ', 'STDIO_OPENS']))
outputs.update(get_parsed_data(log, 'POSIX_BYTES_WRITTEN'))

arguments = ' '.join(task.arguments)
if '>' in arguments:
outputs.add(arguments.split('>')[1].split(';')[0].strip())

task.annotate(inputs=sorted(inputs), outputs=sorted(outputs))


# ------------------------------------------------------------------------------
#
def get_provenance_graph(pipelines: Union[Pipeline, List[Pipeline]],
output_file: Optional[str] = None) -> Dict:
"""
Using UIDs of all entities to build a workflow provenance graph.
"""

graph = {}

pipelines = ru.as_list(pipelines)

for pipeline in pipelines:
for pipeline in ru.as_list(pipelines):
graph[pipeline.uid] = {}

for stage in pipelines.stages:
for stage in pipeline.stages:
graph[pipeline.uid][stage.uid] = {}

for task in stage.tasks:
g_task = graph[pipeline.uid][stage.uid].setdefault(task.uid, {})
if task.annotations:
g_task.update(task.annotations.as_dict())
annotate_task_with_darshan(task)
graph[pipeline.uid][stage.uid][task.uid] = \
task.annotations.as_dict()

if output_file:
if not output_file.endswith('.json'):
Expand Down

0 comments on commit 0e334c4

Please sign in to comment.