/
shell.py
308 lines (263 loc) · 10.7 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2021, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import _io
import sys
import os
import subprocess as sp
import inspect
import shutil
import stat
import tempfile
import threading
from snakemake.utils import format, argvquote, cmd_exe_quote, find_bash_on_windows
from snakemake.common import ON_WINDOWS, RULEFUNC_CONTEXT_MARKER
from snakemake.logging import logger
from snakemake.deployment import singularity
from snakemake.deployment.conda import Conda
from snakemake.exceptions import WorkflowError
__author__ = "Johannes Köster"
STDOUT = sys.stdout
if not isinstance(sys.stdout, _io.TextIOWrapper):
# workaround for nosetest since it overwrites sys.stdout
# in a strange way that does not work with Popen
STDOUT = None
# There is a max length for a command executed as well as a maximum
# length for each argument passed to a command. The latter impacts us
# especially when doing `sh -c 'long script from user'`. On Linux, it's
# hardcoded in the kernel as 32 pages, or 128kB. On OSX it appears to be
# close to `getconf ARG_MAX`, about 253kb.
MAX_ARG_LEN = 16 * 4096 - 1
class shell:
_process_args = {}
_process_prefix = ""
_process_suffix = ""
_lock = threading.Lock()
_processes = {}
_win_command_prefix = ""
conda_block_conflicting_envvars = True
@classmethod
def get_executable(cls):
return cls._process_args.get("executable", None)
@classmethod
def check_output(cls, cmd, **kwargs):
executable = cls.get_executable()
if ON_WINDOWS and executable:
cmd = '"{}" {} {}'.format(
executable, cls._win_command_prefix, argvquote(cmd)
)
return sp.check_output(cmd, shell=False, executable=executable, **kwargs)
else:
return sp.check_output(cmd, shell=True, executable=executable, **kwargs)
@classmethod
def executable(cls, cmd):
if cmd and not os.path.isabs(cmd):
# always enforce absolute path
cmd = shutil.which(cmd)
if not cmd:
raise WorkflowError(
"Cannot set default shell {} because it "
"is not available in your "
"PATH.".format(cmd)
)
if ON_WINDOWS:
if cmd is None:
cls._process_prefix = ""
cls._win_command_prefix = ""
elif os.path.split(cmd)[-1].lower() in ("bash", "bash.exe"):
if cmd == r"C:\Windows\System32\bash.exe":
raise WorkflowError(
"Cannot use WSL bash.exe on Windows. Ensure that you have "
"a usable bash.exe availble on your path."
)
cls._process_prefix = "set -euo pipefail; "
cls._win_command_prefix = "-c"
elif os.path.split(cmd)[-1].lower() == "bash":
cls._process_prefix = "set -euo pipefail; "
cls._process_args["executable"] = cmd
@classmethod
def prefix(cls, prefix):
cls._process_prefix = format(prefix, stepout=2)
@classmethod
def suffix(cls, suffix):
cls._process_suffix = format(suffix, stepout=2)
@classmethod
def win_command_prefix(cls, cmd):
"""The command prefix used on windows when specifing a explicit
shell executable. This would be "-c" for bash.
Note: that if no explicit executable is set commands are executed
with Popen(..., shell=True) which uses COMSPEC on windows where this
is not needed.
"""
cls._win_command_prefix = cmd
@classmethod
def kill(cls, jobid):
with cls._lock:
if jobid in cls._processes:
cls._processes[jobid].kill()
del cls._processes[jobid]
@classmethod
def cleanup(cls):
with cls._lock:
cls._processes.clear()
def __new__(
cls, cmd, *args, iterable=False, read=False, bench_record=None, **kwargs
):
if "stepout" in kwargs:
raise KeyError("Argument stepout is not allowed in shell command.")
if ON_WINDOWS and not cls.get_executable():
# If bash is not used on Windows quoting must be handled in a special way
kwargs["quote_func"] = cmd_exe_quote
cmd = format(cmd, *args, stepout=2, **kwargs)
stdout = sp.PIPE if iterable or read else STDOUT
close_fds = sys.platform != "win32"
func_context = inspect.currentframe().f_back.f_locals
if func_context.get(RULEFUNC_CONTEXT_MARKER):
# If this comes from a rule, we expect certain information to be passed
# implicitly via the rule func context, which is added here.
context = func_context
else:
# Otherwise, context is just filled via kwargs.
context = dict()
# add kwargs to context (overwriting the locals of the caller)
context.update(kwargs)
jobid = context.get("jobid")
if not context.get("is_shell"):
logger.shellcmd(cmd)
conda_env = context.get("conda_env", None)
conda_base_path = context.get("conda_base_path", None)
container_img = context.get("container_img", None)
env_modules = context.get("env_modules", None)
shadow_dir = context.get("shadow_dir", None)
resources = context.get("resources", {})
singularity_args = context.get("singularity_args", "")
threads = context.get("threads", 1)
cmd = " ".join((cls._process_prefix, cmd, cls._process_suffix)).strip()
if env_modules:
cmd = env_modules.shellcmd(cmd)
logger.info("Activating environment modules: {}".format(env_modules))
if conda_env:
if ON_WINDOWS and not cls.get_executable():
# If we use cmd.exe directly on winodws we need to prepend batch activation script.
cmd = Conda(container_img, prefix_path=conda_base_path).shellcmd_win(
conda_env, cmd
)
else:
cmd = Conda(container_img, prefix_path=conda_base_path).shellcmd(
conda_env, cmd
)
tmpdir = None
if len(cmd.replace("'", r"'\''")) + 2 > MAX_ARG_LEN:
tmpdir = tempfile.mkdtemp(dir=".snakemake", prefix="shell_tmp.")
script = os.path.join(os.path.abspath(tmpdir), "script.sh")
with open(script, "w") as script_fd:
print(cmd, file=script_fd)
os.chmod(script, os.stat(script).st_mode | stat.S_IXUSR | stat.S_IRUSR)
cmd = '"{}" "{}"'.format(cls.get_executable() or "/bin/sh", script)
if container_img:
cmd = singularity.shellcmd(
container_img,
cmd,
singularity_args,
envvars=None,
shell_executable=cls._process_args["executable"],
container_workdir=shadow_dir,
is_python_script=context.get("is_python_script", False),
)
logger.info("Activating singularity image {}".format(container_img))
if conda_env:
logger.info("Activating conda environment: {}".format(conda_env))
tmpdir_resource = resources.get("tmpdir", None)
# environment variable lists for linear algebra libraries taken from:
# https://stackoverflow.com/a/53224849/2352071
# https://github.com/xianyi/OpenBLAS/tree/59243d49ab8e958bb3872f16a7c0ef8c04067c0a#setting-the-number-of-threads-using-environment-variables
envvars = dict(os.environ)
threads = str(threads)
envvars["OMP_NUM_THREADS"] = threads
envvars["GOTO_NUM_THREADS"] = threads
envvars["OPENBLAS_NUM_THREADS"] = threads
envvars["MKL_NUM_THREADS"] = threads
envvars["VECLIB_MAXIMUM_THREADS"] = threads
envvars["NUMEXPR_NUM_THREADS"] = threads
if tmpdir_resource:
envvars["TMPDIR"] = tmpdir_resource
envvars["TMP"] = tmpdir_resource
envvars["TEMPDIR"] = tmpdir_resource
envvars["TEMP"] = tmpdir_resource
if conda_env and cls.conda_block_conflicting_envvars:
# remove envvars that conflict with conda
for var in ["R_LIBS", "PYTHONPATH", "PERLLIB", "PERL5LIB"]:
try:
del envvars[var]
except KeyError:
pass
use_shell = True
if ON_WINDOWS and cls.get_executable():
# If executable is set on Windows shell mode can not be used
# and the executable should be prepended the command together
# with a command prefix (e.g. -c for bash).
use_shell = False
cmd = '"{}" {} {}'.format(
cls.get_executable(), cls._win_command_prefix, argvquote(cmd)
)
proc = sp.Popen(
cmd,
bufsize=-1,
shell=use_shell,
stdout=stdout,
universal_newlines=iterable or read or None,
close_fds=close_fds,
**cls._process_args,
env=envvars,
)
if jobid is not None:
with cls._lock:
cls._processes[jobid] = proc
ret = None
if iterable:
return cls.iter_stdout(proc, cmd, tmpdir)
if read:
ret = proc.stdout.read()
if bench_record is not None:
from snakemake.benchmark import benchmarked
with benchmarked(proc.pid, bench_record):
retcode = proc.wait()
else:
retcode = proc.wait()
if tmpdir:
shutil.rmtree(tmpdir)
if jobid is not None:
with cls._lock:
del cls._processes[jobid]
if retcode:
raise sp.CalledProcessError(retcode, cmd)
return ret
@staticmethod
def iter_stdout(proc, cmd, tmpdir):
for l in proc.stdout:
yield l[:-1]
retcode = proc.wait()
if tmpdir:
shutil.rmtree(tmpdir)
if retcode:
raise sp.CalledProcessError(retcode, cmd)
# set bash as default shell on posix compatible OS
if os.name == "posix":
if not shutil.which("bash"):
logger.warning(
"Cannot set bash as default shell because it is not "
"available in your PATH. Falling back to sh."
)
if not shutil.which("sh"):
logger.warning(
"Cannot fall back to sh since it seems to be not "
"available on this system. Using whatever is "
"defined as default."
)
else:
shell.executable("sh")
else:
shell.executable("bash")
elif ON_WINDOWS:
shell.executable(None)