-
Notifications
You must be signed in to change notification settings - Fork 0
/
orchestrator_multimno.py
77 lines (59 loc) · 2.27 KB
/
orchestrator_multimno.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/python
"""
Module that orchestrates MultiMNO pipeline components. A spark-submit will be performed for each
component in the pipeline.
Usage:
```
python multimno/orchestrator.py <pipeline.json>
```
- pipeline.json: Path to a json file with the pipeline configuration.
"""
import json
import os
import subprocess
import sys
import re
if __name__ == "__main__":
pipeline_config_path = sys.argv[1]
MAIN_PATH = "multimno/main.py"
# Verify configuration files
if not os.path.exists(pipeline_config_path):
print(f"Pipeline config path not found: {pipeline_config_path}", file=sys.stderr)
sys.exit(1)
# Load pipeline
with open(pipeline_config_path, encoding="utf-8") as out:
pipeline_config = json.load(out)
# Load general config
general_config_path = pipeline_config["general_config_path"]
# Load spark submit args
spark_args = pipeline_config["spark_submit_args"]
# Check for special characters
SPECIAL_CHARS = r"[\`~!@#$%^&*()\+{}\[\]|;'\"<>?]"
for s in spark_args:
if re.search(SPECIAL_CHARS, s):
print(f"Spark submit argument contains special characters: {s}", file=sys.stderr)
sys.exit(1)
spark_submit_command_base = ["spark-submit"] + spark_args
# Start pipeline
print("Starting pipeline...", flush=True)
for step in pipeline_config["pipeline"]:
component_id = step["component_id"]
component_config_path = step["component_config_path"]
# Set spark submit command
spark_submit_suffix = [MAIN_PATH, component_id, general_config_path, component_config_path]
spark_submit_command = spark_submit_command_base + spark_submit_suffix
# Launch command
result = subprocess.run(spark_submit_command, check=False)
# Parse result
if result.returncode != 0:
print(
"[X] ------ Component Error ------",
f"Error executing component: {component_id}",
f"General config: {general_config_path}",
f"Component config: {component_config_path}",
"[X] -----------------------------",
file=sys.stderr,
sep=os.linesep,
)
sys.exit(1)
print("Pipeline finished successfully!")