generated from rl-institut/rli_template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Snakefile
179 lines (156 loc) · 6.01 KB
/
Snakefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os
# Configuration
scenario_yml = "scenarios/{scenario}.yml"
raw_dir = "data/In/v0.06"
preprocessed_dir = "results/{scenario}/01_preprocessed"
optimized_dir = "results/{scenario}/02_optimized"
postprocessed_dir = "results/{scenario}/03_postprocessed"
plotted_dir_dispatch = "results/{scenario}/04_plotted/dispatch"
results_template = "flexmex_config/output_template/v0.07/Template"
log_dir = "results/{scenario}"
results_joined_dir = "results/{experiment}"
# Set oemof.tabular sub-paths
preprocessed_data = os.path.join(preprocessed_dir, "data")
inferred_datapackage = os.path.join(preprocessed_dir, "datapackage.json")
rule all:
# Test: snakemake -npr
# Run: snakemake -j1
message:
"Run entire analysis."
input:
# Read-in a complete list of scenarios (all the files found by 'scenario_yml')
expand(
postprocessed_dir,
scenario=[os.path.splitext(d)[0] for d in os.listdir(os.path.dirname(scenario_yml))]
)
rule preprocess:
message:
"Preprocess input data for scenario '{wildcards.scenario}'."
input:
raw=raw_dir,
scenario_yml=scenario_yml,
script="scripts/preprocessing.py", # re-run if updated
output:
directory(preprocessed_data)
params:
log=log_dir
benchmark:
os.path.join(log_dir, "benchmark-preprocess.log")
shell:
"python scripts/preprocessing.py {input.scenario_yml} {input.raw} {output} {params.log}"
rule infer:
message:
"Infer meta-data from preprocessed data for scenario '{wildcards.scenario}'."
input:
preprocessed_data, # for Snakemake monitoring only
scenario_yml=scenario_yml,
script="scripts/infer.py", # re-run if updated
output:
inferred_datapackage
params:
# tabular's infer_metadata() expects the datapackage base dir as input:
preprocessed_dir=preprocessed_dir,
benchmark:
os.path.join(log_dir, "benchmark-infer.log")
shell:
"python scripts/infer.py {input.scenario_yml} {params.preprocessed_dir}"
rule optimize:
message:
"Optimize scenario '{wildcards.scenario}'."
input:
preprocessed_data, # for Snakemake monitoring only
inferred_datapackage, # for Snakemake monitoring only
scenario_yml=scenario_yml,
script="scripts/optimization.py" # re-run if updated
output:
directory(optimized_dir)
params:
# oemoflex's optimize() expects the datapackage base dir as input:
preprocessed_dir=preprocessed_dir,
log=log_dir,
benchmark:
os.path.join(log_dir, "benchmark-optimize.log")
shell:
"python scripts/optimization.py {input.scenario_yml} {params.preprocessed_dir}"
" {output} {params.log}"
rule postprocess:
message:
"Postprocess results for scenario '{wildcards.scenario}'."
input:
preprocessed_data, # for Snakemake monitoring only
scenario_yml=scenario_yml,
optimized=optimized_dir,
results_template=results_template,
script="scripts/postprocessing.py" # re-run if updated
output:
data=directory(postprocessed_dir),
solver_time=os.path.join(log_dir, "solver_time.csv")
params:
# Not necessarily as input, whole pipeline must be re-run anyway if this changes:
raw=raw_dir,
# postprocessing load_elements() expects the datapackage base dir as input:
preprocessed_dir=preprocessed_dir,
log=log_dir,
benchmark:
os.path.join(log_dir, "benchmark-postprocess.log")
shell:
"python scripts/postprocessing.py {input.scenario_yml}"
" {params.raw} {params.preprocessed_dir}"
" {input.optimized} {input.results_template}"
" {output.data} {params.log}"
rule plot_dispatch:
input:
postprocessed_dir
output:
directory(plotted_dir_dispatch)
shell:
"python scripts/plot_dispatch.py {input} {output}"
def processed_scenarios(wildcards):
# Returns a list of scenarios with completed postprocessing.
def postprocessing_data_exist(path):
return os.path.isdir(postprocessed_dir.format(scenario=path))
scenarios = [
dirname for dirname in os.listdir("results")
if dirname.startswith(wildcards.experiment) # only FlexMex1_ or FlexMex2_ directories
and dirname != wildcards.experiment # exclude this rule's output directory
and postprocessing_data_exist(dirname) # only postprocessed scenarios
]
return scenarios
def postprocessed_paths(wildcards):
# Wrap scenario names into their respective postprocessed paths (pre-defined above)
# 'scenario' is wildcard in 'postprocessed'
return expand(postprocessed_dir, scenario=processed_scenarios(wildcards))
rule join_results:
message:
"Join results."
wildcard_constraints:
experiment="(FlexMex1|FlexMex2)"
input:
script="scripts/join_results.py" # re-run if updated
output:
directory(results_joined_dir)
params:
# Only use existing scenario runs as an input (function call)
# As 'params' to prevent preceding steps from being run
scenario_paths=postprocessed_paths,
scenarios=processed_scenarios,
script:
"scripts/join_results.py"
rule analyze_cputime:
message:
"Time measurement output."
input:
os.path.join(log_dir, "benchmark-preprocess.log"), # for Snakemake monitoring only
os.path.join(log_dir, "benchmark-infer.log"), # for Snakemake monitoring only
os.path.join(log_dir, "benchmark-optimize.log"), # for Snakemake monitoring only
os.path.join(log_dir, "benchmark-postprocess.log"), # for Snakemake monitoring only
os.path.join(log_dir, "solver_time.csv"),
script="scripts/analyze_cputime.py" # re-run if updated
output:
os.path.join(log_dir, "cpu_time_analysis.csv")
params:
input_dir=log_dir,
shell:
"python scripts/analyze_cputime.py {wildcards.scenario}"
" {params.input_dir}"
" {output}"