Skip to content

Commit

Permalink
Introduce support for custom callbacks, various fixes and improvements (
Browse files Browse the repository at this point in the history
#67)

* Draft base callback

* Add cb param in manager

* Draft base callback

* Add cb param in manager

* Support paraller runner

* Re-introduce support for parent cb init

* Merge changes in utils

* Small fixes

* fix ctrld by parent flag for callbacks

* Improve parameters support of the callbacks (#64)

---------

Co-authored-by: pagmatt <mattpagg@gmail.com>
Co-authored-by: Andrea Lacava <lacava.a@norhteastern.edu>

---------

Co-authored-by: Andrea Lacava <thecave003@gmail.com>
Co-authored-by: Andrea Lacava <lacava.a@norhteastern.edu>
  • Loading branch information
3 people committed May 5, 2023
1 parent 101eb56 commit 1aaa051
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 15 deletions.
4 changes: 2 additions & 2 deletions sem/__init__.py
Expand Up @@ -4,11 +4,11 @@
from .lptrunner import LptRunner
from .gridrunner import BUILD_GRID_PARAMS, SIMULATION_GRID_PARAMS
from .database import DatabaseManager
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files
from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, CallbackBase
from .cli import cli

__all__ = ('CampaignManager', 'SimulationRunner', 'ParallelRunner', 'LptRunner',
'DatabaseManager', 'list_param_combinations', 'automatic_parser',
'only_load_some_files')
'only_load_some_files', 'CallbackBase')

name = 'sem'
20 changes: 16 additions & 4 deletions sem/manager.py
Expand Up @@ -290,7 +290,7 @@ def check_and_fill_parameters(self, param_list, needs_rngrun):
# Simulation running #
######################

def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
def run_simulations(self, param_list, show_progress=True, callbacks: list = [], stop_on_errors=True):
"""
Run several simulations specified by a list of parameter combinations.
Expand All @@ -305,6 +305,10 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
can be either a string or a number).
show_progress (bool): whether or not to show a progress bar with
percentage and expected remaining time.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""

# Make sure we have a runner to run simulations with.
Expand All @@ -315,7 +319,7 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
" for this CampaignManager.")

# Return if the list is empty
if param_list == []:
if not param_list:
return

self.check_and_fill_parameters(param_list, needs_rngrun=True)
Expand All @@ -339,6 +343,7 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True):
# computation is performed on this line.
results = self.runner.run_simulations(param_list,
self.db.get_data_dir(),
callbacks=callbacks,
stop_on_errors=stop_on_errors)

# Wrap the result generator in the progress bar generator.
Expand Down Expand Up @@ -377,8 +382,7 @@ def run_and_save_results(self, result_generator, batch_results=True):
self.db.insert_results(results_batch)
self.db.write_to_disk()

def get_missing_simulations(self, param_list, runs=None,
with_time_estimate=False):
def get_missing_simulations(self, param_list, runs=None, with_time_estimate=False):
"""
Return a list of the simulations among the required ones that are not
available in the database.
Expand All @@ -389,6 +393,7 @@ def get_missing_simulations(self, param_list, runs=None,
runs (int): an integer representing how many repetitions are wanted
for each parameter combination, None if the dictionaries in
param_list already feature the desired RngRun value.
with_time_estimate (bool): a boolean representing ...
"""

params_to_simulate = []
Expand Down Expand Up @@ -451,6 +456,7 @@ def get_missing_simulations(self, param_list, runs=None,

def run_missing_simulations(self, param_list, runs=None,
condition_checking_function=None,
callbacks=[],
stop_on_errors=True):
"""
Run the simulations from the parameter list that are not yet available
Expand All @@ -470,6 +476,10 @@ def run_missing_simulations(self, param_list, runs=None,
runs (int): the number of runs to perform for each parameter
combination. This parameter is only allowed if the param_list
specification doesn't feature an 'RngRun' key already.
callbacks (list): list of objects extending CallbackBase to be
triggered during the run.
stop_on_errors (bool): whether or not to stop the execution of the simulations
if an error occurs.
"""
# Expand the parameter specification
param_list = list_param_combinations(param_list)
Expand Down Expand Up @@ -503,10 +513,12 @@ def run_missing_simulations(self, param_list, runs=None,
self.get_missing_simulations(param_list,
runs,
with_time_estimate=True),
callbacks=callbacks,
stop_on_errors=stop_on_errors)
else:
self.run_simulations(
self.get_missing_simulations(param_list, runs),
callbacks=callbacks,
stop_on_errors=stop_on_errors)

#####################
Expand Down
25 changes: 22 additions & 3 deletions sem/parallelrunner.py
@@ -1,28 +1,46 @@
from .runner import SimulationRunner
from multiprocessing import Pool

from .utils import CallbackBase
from multiprocessing.pool import ThreadPool as Pool
# We use ThreadPool to share the process memory among the different simulations to enable the use of callbacks.
# This may be improved eventually using a grain-fined solution that checks the presence or not of callbacks

class ParallelRunner(SimulationRunner):

"""
A Runner which can perform simulations in parallel on the current machine.
"""
data_folder: str = None
stop_on_errors: bool = False
callbacks: [CallbackBase] = []

def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
This function runs multiple simulations in parallel.
Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to create output folders.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): check whether simulation has to stop on errors or not
"""

if callbacks is not None:
for cb in callbacks:
cb.on_simulation_start(len(list(enumerate(parameter_list))))
cb.controlled_by_parent = True

self.data_folder = data_folder
self.stop_on_errors = stop_on_errors
self.callbacks = callbacks
with Pool(processes=self.max_parallel_processes) as pool:
for result in pool.imap_unordered(self.launch_simulation,
parameter_list):
yield result

if callbacks is not None:
for cb in callbacks:
cb.on_simulation_end()

def launch_simulation(self, parameter):
"""
Launch a single simulation, using SimulationRunner's facilities.
Expand All @@ -35,4 +53,5 @@ def launch_simulation(self, parameter):
"""
return next(SimulationRunner.run_simulations(self, [parameter],
self.data_folder,
callbacks=self.callbacks,
stop_on_errors=self.stop_on_errors))
36 changes: 31 additions & 5 deletions sem/runner.py
Expand Up @@ -8,6 +8,7 @@
import sys
from importlib.machinery import SourceFileLoader
import types
from .utils import CallbackBase

from tqdm import tqdm

Expand Down Expand Up @@ -72,7 +73,7 @@ def __init__(self, path, script, optimized=True, skip_configuration=False,
build_status_fname = ".lock-ns3_%s_build" % sys.platform
build_status_path = os.path.join(path, build_status_fname)
else:
build_status_fname = "build-status.py"
build_status_fname = "build.py"
if optimized:
build_status_path = os.path.join(path,
'build/optimized/build-status.py')
Expand Down Expand Up @@ -289,17 +290,25 @@ def get_available_parameters(self):
# Simulation running #
######################

def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False):
"""
Run several simulations using a certain combination of parameters.
Yields results as simulations are completed.
Yield results as simulations are completed.
Args:
parameter_list (list): list of parameter combinations to simulate.
data_folder (str): folder in which to save subfolders containing
simulation output.
callbacks (list): list of callbacks to be triggered
stop_on_errors (bool): if true, when a simulation outputs an error the whole campaign will be stopped
"""

# Log simulation start if not already done by parent class
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_start(len(list(enumerate(parameter_list))))

for _, parameter in enumerate(parameter_list):

Expand All @@ -314,13 +323,19 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
parameter.items()]

# Run from dedicated temporary folder
current_result['meta']['id'] = str(uuid.uuid4())
sim_uuid = str(uuid.uuid4())
current_result['meta']['id'] = sim_uuid
temp_dir = os.path.join(data_folder, current_result['meta']['id'])
os.makedirs(temp_dir)

start = time.time() # Time execution
stdout_file_path = os.path.join(temp_dir, 'stdout')
stderr_file_path = os.path.join(temp_dir, 'stderr')

if callbacks is not None:
for cb in callbacks:
cb.on_run_start(parameter, sim_uuid)

with open(stdout_file_path, 'w') as stdout_file, open(
stderr_file_path, 'w') as stderr_file:
return_code = subprocess.call(command, cwd=temp_dir,
Expand All @@ -329,7 +344,12 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
stderr=stderr_file)
end = time.time() # Time execution

if callbacks is not None:
for cb in callbacks:
cb.on_run_end(sim_uuid, return_code, end - start)

if return_code != 0:

with open(stdout_file_path, 'r') as stdout_file, open(
stderr_file_path, 'r') as stderr_file:
complete_command = sem.utils.get_command_from_result(self.script, current_result)
Expand All @@ -349,9 +369,15 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False):
complete_command_debug))
if stop_on_errors:
raise Exception(error_message)
print(error_message)
print(error_message)

current_result['meta']['elapsed_time'] = end-start
current_result['meta']['exitcode'] = return_code

yield current_result

# Log simulation start if not already done by parent class
if callbacks is not None:
for cb in callbacks:
if not cb.is_controlled_by_parent():
cb.on_simulation_end()
84 changes: 84 additions & 0 deletions sem/utils.py
Expand Up @@ -4,6 +4,8 @@
import warnings
from itertools import product
from functools import wraps
from abc import ABC, abstractmethod
from typing import Dict, Any

import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -305,6 +307,88 @@ def compute_sensitivity_analysis(
return salib_analyze_function(problem, results)


class CallbackBase(ABC):
"""
Base class for SEM callbacks.
:param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages
"""

def __init__(self, verbose: int = 0):
super().__init__()
# Number of time the callback was called
self.controlled_by_parent = False # type: bool
self.n_runs_over = 0 # type: int
self.n_runs_over_no_errors = 0 # type: int
self.n_runs_over_errors = 0 # type: int
self.n_runs_total = 0 # type: int
self.run_sim_times = []
self.verbose = verbose

def init_callback(self, controlled_by_parent) -> None:
"""
Initialize the callback.
"""
self.controlled_by_parent = controlled_by_parent

def is_controlled_by_parent(self) -> bool:
"""
Whether this runner is aware of all simulations (false)
or it has been triggered by a multithread runner and is thus
aware of a subset of all runs only (true).
"""
return self.controlled_by_parent

def on_simulation_start(self, n_runs_total) -> None:
self.n_runs_total = n_runs_total
self._on_simulation_start()

@abstractmethod
def _on_simulation_start(self) -> None:
pass

def on_run_start(self, configuration, sim_uuid) -> None:
"""
Args:
configuration (dict): dictionary representing the combination of parameters simulated in this specific
sim_uuid (str): unique identifier string for the simulation. This value is used to name the result folder,
and it is referenced in the result JSON file.
"""
self._on_run_start(configuration, sim_uuid)

@abstractmethod
def _on_run_start(self, configuration: dict, sim_uuid: str) -> None:
pass

@abstractmethod
def _on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
"""
:return: If the callback returns False, training is aborted early.
# TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user
"""
return True

def on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
"""
This method will be called when each simulation run finishes
# TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user
:return: If the callback returns False, a run has failed.
"""
self.n_runs_over += 1
self.run_sim_times.append(sim_time)
if return_code == 0:
self.n_runs_over_no_errors += 1 # type: int
else:
self.n_runs_over_errors += 1 # type: int

return self._on_run_end(sim_uuid, return_code, sim_time)

def on_simulation_end(self) -> None:
self._on_simulation_end()

@abstractmethod
def _on_simulation_end(self) -> None:
pass

# def interactive_plot(campaign, param_ranges, result_parsing_function, x_axis,
# runs=None):
# # Average over RngRuns if param_ranges does not contain RngRun
Expand Down
40 changes: 39 additions & 1 deletion tests/test_utils.py
@@ -1,4 +1,4 @@
from sem import list_param_combinations, automatic_parser, stdout_automatic_parser
from sem import list_param_combinations, automatic_parser, stdout_automatic_parser, CallbackBase, CampaignManager
import json
import numpy as np
from operator import getitem
Expand Down Expand Up @@ -101,3 +101,41 @@ def test_automatic_parser(result):
[6, 7, 8, 9, 10]])
assert parsed['stderr'] == []


class TestCallback(CallbackBase):

# Prevent pytest from trying to collect this function as a test
__test__ = False

def __init__(self):
CallbackBase.__init__(self, verbose=2)
self.output = ''

def _on_simulation_start(self) -> None:
self.output += 'Starting the simulations!\n'

def _on_simulation_end(self) -> None:
self.output += 'Simulations are over!\n'

def _on_run_start(self, configuration: dict, sim_uuid: str) -> None:
self.output += 'Start single run!\n'

def _on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool:
self.output += f'Run ended! {return_code}\n'
return True


def test_callback(ns_3_compiled, config, parameter_combination):
cb = TestCallback()
n_runs = 10
expected_output = 'Starting the simulations!\n' + \
f'Start single run!\nRun ended! {0}\n' * \
n_runs + 'Simulations are over!\n'

campaign = CampaignManager.new(ns_3_compiled, config['script'], config['campaign_dir'],
runner_type='SimulationRunner', overwrite=True)
parameter_combination.update({'RngRun': [
run for run in range(n_runs)]})
campaign.run_missing_simulations(
param_list=[parameter_combination], callbacks=[cb])
assert expected_output == cb.output

0 comments on commit 1aaa051

Please sign in to comment.