Skip to content

Commit

Permalink
Merge pull request #661 from radical-cybertools/fix/darshan
Browse files Browse the repository at this point in the history
Fix/darshan
  • Loading branch information
mtitov committed Mar 20, 2024
2 parents dec1451 + 56c8485 commit 9d05579
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 179 deletions.
2 changes: 1 addition & 1 deletion bin/radical-entk-provenance
Expand Up @@ -6,7 +6,7 @@ __license__ = 'MIT'
import argparse
import sys

from radical.entk.utils.provenance import extract_provenance_graph
from radical.entk.tools import extract_provenance_graph


# ------------------------------------------------------------------------------
Expand Down
43 changes: 27 additions & 16 deletions examples/misc/darshan_enabled.py
Expand Up @@ -5,7 +5,10 @@
import radical.entk as re
import radical.pilot as rp

from radical.entk.utils.provenance import enable_darshan, get_provenance_graph
from radical.entk.tools import (cache_darshan_env,
with_darshan,
enable_darshan,
get_provenance_graph)

RESOURCE_DESCRIPTION = {
# https://radicalpilot.readthedocs.io/en/stable/supported/polaris.html
Expand All @@ -19,9 +22,11 @@
os.environ['RADICAL_LOG_LVL'] = 'DEBUG'
os.environ['RADICAL_REPORT'] = 'TRUE'

TASK_01_OUTPUT = 'output_01.dat'


# pylint: disable=anomalous-backslash-in-string
def get_stages():
def get_stage_0():

# hello-RP task
task_00 = re.Task({
Expand All @@ -33,42 +38,48 @@ def get_stages():
})

# R/W data
output_01 = 'output_01.dat'
task_01 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c', f'cat input.dat | wc > {output_01}'],
'arguments' : ['-c', f'cat input.dat | wc > {TASK_01_OUTPUT}'],
'upload_input_data': ['/etc/passwd > input.dat'],
'copy_output_data' : [f'{output_01} > $SHARED/{output_01}']
'copy_output_data' : [f'{TASK_01_OUTPUT} > $SHARED/{TASK_01_OUTPUT}']
})

stage_0 = re.Stage()
stage_0.add_tasks([task_00, task_01])
# --- enable Darshan for task "task_01" only
stage_0.add_tasks([task_00, enable_darshan(task_01)])
return stage_0


# --- enable Darshan for the whole "stage_1" using decorator
@with_darshan
def get_stage_1():

# R/W data and task depends on the task from the previous stage
task_10 = re.Task({
'executable' : '/bin/sh',
'arguments' : ['-c',
f"sed -r 's/\s+//g' {output_01} " + # noqa: W605
f"sed -r 's/\s+//g' {TASK_01_OUTPUT} " + # noqa: W605
'| grep -o . | sort | uniq -c > output_10.dat'],
'copy_input_data': [f'$SHARED/{output_01} > {output_01}']
'copy_input_data': [f'$SHARED/{TASK_01_OUTPUT} > {TASK_01_OUTPUT}']
})

stage_1 = re.Stage()
stage_1.add_tasks([task_10])

return [stage_0, stage_1]
return stage_1


def main():

cache_darshan_env(darshan_runtime_root='$DARSHAN_RUNTIME_ROOT',
modules=['e4s/22.08/PrgEnv-gnu',
'darshan-runtime',
'darshan-util'])

pipeline = re.Pipeline()
pipeline.add_stages(get_stages())
pipeline.add_stages([get_stage_0(), get_stage_1()])
workflow = [pipeline]

enable_darshan(pipelines=workflow,
modules=['e4s/22.08/PrgEnv-gnu',
'darshan-runtime',
'darshan-util'])

amgr = re.AppManager()
amgr.resource_desc = RESOURCE_DESCRIPTION
amgr.workflow = workflow
Expand Down
8 changes: 8 additions & 0 deletions src/radical/entk/appman/wfprocessor.py
Expand Up @@ -338,6 +338,14 @@ def _update_dequeued_task(self, deq_task):
if task.uid != deq_task.uid:
continue

# due to the possibility of race condition with
# AppManager._update_task(), we ensure that task
# attributes "path" and "rts_uid" are set.
if not task.path and deq_task.path:
task.path = str(deq_task.path)
if not task.rts_uid and deq_task.rts_uid:
task.rts_uid = str(deq_task.rts_uid)

# If there is no exit code, we assume success
# We are only concerned about state of task and not
# deq_task
Expand Down
2 changes: 2 additions & 0 deletions src/radical/entk/execman/rp/task_manager.py
Expand Up @@ -255,12 +255,14 @@ def task_state_cb(rp_task, state):

task = create_task_from_rp(rp_task, self._log, self._prof)

# to AppManager
self._advance(task, 'Task', states.COMPLETED, 'cb-to-sync')

load_placeholder(task)

tdict = task.as_dict()

# to WFprocessor
self._zmq_queue['put'].put(qname='completed', msgs=[tdict])
self._log.info('Pushed task %s with state %s to completed',
task.uid, task.state)
Expand Down
6 changes: 6 additions & 0 deletions src/radical/entk/tools/__init__.py
@@ -0,0 +1,6 @@

from .darshan import cache_darshan_env, with_darshan, enable_darshan
from .darshan import annotate_task_with_darshan

from .provenance import extract_provenance_graph, get_provenance_graph

176 changes: 176 additions & 0 deletions src/radical/entk/tools/darshan.py
@@ -0,0 +1,176 @@

__copyright__ = 'Copyright 2024, The RADICAL-Cybertools Team'
__license__ = 'MIT'

import glob

from typing import Optional, Dict, List, Union

import radical.utils as ru

from .. import Pipeline, Stage, Task

DARSHAN_LOG_DIR = '%(sandbox)s/darshan_logs'

_darshan_activation_cmds = None
_darshan_env = None
_darshan_runtime_root = None


# ------------------------------------------------------------------------------
#
def cache_darshan_env(darshan_runtime_root: Optional[str] = None,
modules: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = None) -> None:

global _darshan_runtime_root

if _darshan_runtime_root is None:
if (darshan_runtime_root and
not darshan_runtime_root.startswith('$') and
not darshan_runtime_root.startswith('/')):
raise RuntimeError('Darshan root directory should be set with '
'either env variable or an absolute path '
f'(provided path: {darshan_runtime_root})')
_darshan_runtime_root = darshan_runtime_root or '$DARSHAN_RUNTIME_ROOT'

global _darshan_activation_cmds
global _darshan_env

if _darshan_activation_cmds is None:

_darshan_activation_cmds = []
for module in modules or []:
_darshan_activation_cmds.append(f'module load {module}')
for k, v in (env or {}).items():
_darshan_activation_cmds.append(f'export {k.upper()}="{v}"')

_darshan_env = ru.env_prep(pre_exec_cached=_darshan_activation_cmds)


# ------------------------------------------------------------------------------
# decorator to enable darshan for function that generates Pipeline, Stage, Task
def with_darshan(func,
darshan_runtime_root: Optional[str] = None,
modules: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = None):
def wrapper(*args, **kwargs):
return enable_darshan(func(*args, **kwargs),
darshan_runtime_root=darshan_runtime_root,
modules=modules,
env=env)
return wrapper


# ------------------------------------------------------------------------------
#
def enable_darshan(pst: Union[Pipeline, Stage, Task, List[Pipeline]],
darshan_runtime_root: Optional[str] = None,
modules: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = None
) -> Union[Pipeline, Stage, Task]:

if not pst:
raise ValueError('PST object is not provided')
elif isinstance(pst, list):
if not isinstance(pst[0], Pipeline):
raise TypeError('List of Pipelines is not provided')
elif not isinstance(pst, (Pipeline, Stage, Task)):
raise TypeError('Non PST object provided')

cache_darshan_env(darshan_runtime_root, modules, env)

def _enable_darshan(src_task: Task):

if not src_task.executable:
return
elif 'libdarshan.so' in src_task.executable:
# Darshan is already enabled
return

darshan_log_dir = DARSHAN_LOG_DIR % {'sandbox': '${RP_TASK_SANDBOX}'}
darshan_enable = (f'LD_PRELOAD="{_darshan_runtime_root}'
'/lib/libdarshan.so" ')

if src_task.cpu_reqs.cpu_processes == 1:
darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 '

src_task.executable = darshan_enable + src_task.executable
src_task.pre_launch += [f'mkdir -p {darshan_log_dir}']
src_task.pre_exec.extend(
_darshan_activation_cmds +
[f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}'])

if isinstance(pst, list):
for pipeline in pst:
for stage in pipeline.stages:
for task in stage.tasks:
_enable_darshan(task)

elif isinstance(pst, Pipeline):
for stage in pst.stages:
for task in stage.tasks:
_enable_darshan(task)

elif isinstance(pst, Stage):
for task in pst.tasks:
_enable_darshan(task)

elif isinstance(pst, Task):
_enable_darshan(pst)

return pst


# ------------------------------------------------------------------------------
#
def get_parsed_data(log: str, target_counters: Union[str, List[str]]) -> set:

data = set()

if not target_counters:
return data

grep_patterns = '-e ' + ' -e '.join(ru.as_list(target_counters))
parser_cmd = (f'darshan-parser {log} | grep {grep_patterns} | '
"awk '{print $5\":\"$6}'")
out, err, ret = ru.sh_callout(parser_cmd, env=_darshan_env, shell=True)
if ret:
print(f'[ERROR] Darshan not able to parse "{log}": {err}')
else:
for line in out.split('\n'):
if not line:
continue
value, file = line.split(':', 1)
try: value = int(value)
except ValueError: value = 0
if value > 0 and file.startswith('/'):
data.add(file)

return data


# ------------------------------------------------------------------------------
#
def annotate_task_with_darshan(task: Task) -> None:

inputs = set()
outputs = set()

for log in glob.glob((DARSHAN_LOG_DIR % {'sandbox': task.path}) + '/*'):

inputs.update(get_parsed_data(log, ['POSIX_BYTES_READ', 'STDIO_OPENS']))
outputs.update(get_parsed_data(log, 'POSIX_BYTES_WRITTEN'))

arguments = ' '.join(task.arguments)
if '>' in arguments:
output = arguments.split('>')[1].split(';')[0].strip()
if not output.startswith('/') and not output.startswith('$'):
output = task.path + '/' + output
outputs.add(output)

task.annotate(inputs=sorted(inputs), outputs=sorted(outputs))


# ------------------------------------------------------------------------------

77 changes: 77 additions & 0 deletions src/radical/entk/tools/provenance.py
@@ -0,0 +1,77 @@

__copyright__ = 'Copyright 2024, The RADICAL-Cybertools Team'
__license__ = 'MIT'

from typing import Optional, Dict, List, Union

import radical.utils as ru

from .. import Pipeline

from .darshan import annotate_task_with_darshan


# ------------------------------------------------------------------------------
#
def get_provenance_graph(pipelines: Union[Pipeline, List[Pipeline]],
output_file: Optional[str] = None) -> Dict:
"""
Using UIDs of all entities to build a workflow provenance graph.
"""

graph = {}

for pipeline in ru.as_list(pipelines):
graph[pipeline.uid] = {}
for stage in pipeline.stages:
graph[pipeline.uid][stage.uid] = {}
for task in stage.tasks:
annotate_task_with_darshan(task)
graph[pipeline.uid][stage.uid][task.uid] = \
task.annotations.as_dict()

if output_file:
if not output_file.endswith('.json'):
output_file += '.json'
ru.write_json(graph, output_file)

return graph


# ------------------------------------------------------------------------------
#
def extract_provenance_graph(session_json: str,
output_file: Optional[str] = None) -> Dict:
"""
Using session JSON file to build a workflow provenance graph.
"""

session_entities = ru.read_json(session_json)

if not session_entities.get('task'):
raise ValueError('No task entities in provided session')

graph = {}

for task in session_entities['task']:
task_uid, _, stage_uid, _, pipeline_uid, _ = task['name'].split(',')
graph.\
setdefault(pipeline_uid, {}).\
setdefault(stage_uid, {}).\
setdefault(task_uid,
task['description']['metadata'].get('data') or {})

for pipeline_uid in graph:
for stage_uid in graph[pipeline_uid]:
graph[pipeline_uid][stage_uid].sort()

if output_file:
if not output_file.endswith('.json'):
output_file += '.json'
ru.write_json(graph, output_file)

return graph


# ------------------------------------------------------------------------------

0 comments on commit 9d05579

Please sign in to comment.