-
Notifications
You must be signed in to change notification settings - Fork 13
/
workflow.py
706 lines (588 loc) · 32.1 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
"""
Provide workflow-level utilities and classes
============================================
Single-sim example:
.. code-block:: python
>>> md = gmx.workflow.from_tpr(filename)
>>> gmx.run(md)
>>>
>>> # The above is shorthand for
>>> md = gmx.workflow.from_tpr(filename)
>>> with gmx.get_context(md.workspec) as session:
... session.run()
Array sim example:
.. code-block:: python
>>> md = gmx.workflow.from_tpr([filename1, filename2])
>>> gmx.run(md)
The representation of work and the way it is dispatched are areas of active
development. See also https://github.com/kassonlab/gmxapi/milestone/3
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
__all__ = ['WorkSpec', 'WorkElement']
import warnings
import weakref
from gmx import exceptions
from gmx import logging
from gmx.util import to_string
from gmx.util import to_utf8
# Module-level logger
logger = logging.getLogger(__name__)
logger.info('Importing gmx.workflow')
# Work specification version string.
workspec_version = "gmxapi_workspec_0_1"
logger.info("Using schema version {}.".format(workspec_version))
# module-level constant indicating a workflow implementing parallel array work.
# ARRAY = 0
class GmxMap(dict):
"""Utility/compatibility class to ensure consistent keys.
Allow subscripting to use native str or Python 2 Unicode objects.
Internally, converts all keys to native str for the current interpreter.
"""
def keys(self):
for key in dict.keys(self):
if not isinstance(key, str):
raise exceptions.ApiError('Invalid key type found: {} {}'.format(key, type(key)))
yield key
def __getitem__(self, key):
return super(GmxMap, self).__getitem__(str(key))
def __setitem__(self, key, item):
super(GmxMap, self).__setitem__(str(key), item)
def __delitem__(self, key):
super(GmxMap, self).__delitem__(str(key))
class WorkSpec(object):
"""
Container of workflow elements with data dependency
information and requirements for execution.
An element cannot be added to a WorkSpec if it has dependencies that are not
in the WorkSpec.
Work is added to the specification by passing a WorkElement object to
:py:func:`WorkSpec.add_element()`.
Any dependencies in the WorkElement must already be specified in the target WorkSpec.
When iterated over, a WorkSpec object returns WorkElement objects.
WorkElement objects are yielded in a valid order to keep dependencies
satisfied, but not necessarily the same order in which add_element()
calls were originally made. In other words, the WorkSpec is a directed
acyclic dependency graph, and its iterator returns nodes in an arbitrary
but topologically correct order.
The string representation of a WorkSpec object is a valid JSON serialized data object.
The schema for version 0.1 of the specification allows data structures like
the following.
::
{
'version': 'gmxapi_workspec_0_1',
'elements':
{
'myinput':
{
'namespace': 'gromacs',
'operation': 'load_tpr',
'params': {'input': ['tpr_filename1', 'tpr_filename2']}
},
'mydata':
{
'namespace': 'gmxapi',
'operation': 'open_global_data_with_barrier',
'params': ['data_filename']
},
'mypotential':
{
'namespace': 'myplugin',
'operation': 'create_mdmodule',
'params': {...},
'depends': ['mydata']
},
'mysim':
{
'namespace': 'gmxapi',
'operation': 'md',
'depends': ['myinput', 'mypotential']
}
}
}
The first mapping (``version``) is required as shown. The ``elements`` map
contains uniquely named elements specifying an operation, the operation's
namespace, and parameters and dependencies of the operation for this element.
``depends`` is a sequence of string names of elements that are also in the
work spec. ``params`` is a key-value map with string keys and values that
are valid JSON data. ``namespace`` and ``operation`` are strings that the
:py:class:`Context <gmx.context.Context>` can map to directors it uses to
construct the session. Namespace ``gmxapi`` is reserved for operations
specified by the API. Namespace ``gromacs`` is reserved for operations
implemented as GROMACS adapters (versioned separately from gmxapi). The
period character (".") has special meaning and should not be used in naming
elements, namespaces, or operations.
"""
def __init__(self):
self.version = workspec_version
self.elements = GmxMap()
self.__context_weak_ref = None
@property
def _context(self):
referent = None
if self.__context_weak_ref is not None:
referent = self.__context_weak_ref()
return referent
@_context.setter
def _context(self, context):
# We're moving towards having the context own the work, so the work should
# not own the context.
self.__context_weak_ref = weakref.ref(context)
def _chase_deps(self, source_set, name_list):
"""Helper to recursively generate dependencies before dependents.
Given a set of WorkElement objects and a list of element names, generate WorkElements for
the members of name_list plus their dependencies in an order such that dependencies are
guaranteed to occur before their dependent elements.
For example, to sequence an entire work specification into a reasonable order for instantiation, use
>>> workspec._chase_deps(set(workspec.elements.keys()), list(workspec.elements.keys()))
Note: as a member function of WorkSpec, we have access to the full WorkSpec elements data at all
times, giving us extra flexibility in implementation and arguments.
Args:
sources: a (super)set of element names from the current work spec (will be consumed)
name_list: subset of *sources* to be sequenced
Returns:
Sequence of WorkElement objects drawn from the names in *source_set*
Requires that WorkElements named in *name_list* and any elements on which
they depend are all named in *source_list* and available in the current
work spec.
Note: *source_set* is a reference to an object that is modified arbitrarily.
The caller should not re-use the object after calling _chase_deps().
TODO: Separate out DAG topology operations from here and Context.__enter__()
Our needs are simple enough that we probably don't need an external dependency
like networkx...
"""
# Recursively (depth-first) generate a topologically valid serialized DAG from source_set.
assert isinstance(source_set, set)
# Warning: This is not at all a rigorous check.
# It is hard to check whether this is string-like or list-like in both Py 2.7 and 3.x
if not isinstance(name_list, (list, tuple, set)):
raise exceptions.ValueError('Must disambiguate "name_list" by passing a list or tuple.')
# Make a copy of name_list in case the input reference is being used elsewhere during
# iteration, such as for source_set, which is modified during the loop.
for name in tuple(name_list):
if name in source_set:
source_set.remove(name)
element = WorkElement.deserialize(self.elements[name], name=name, workspec=self)
dependencies = element.depends
# items in element.depends are either element names or ensembles of element names.
for item in dependencies:
if isinstance(item, (list, tuple, set)):
dependency_list = item
else:
if not isinstance(item, str):
raise exceptions.ValueError(
'Dependencies should be a string or sequence of strings. Got {}'.format(type(item)))
dependency_list = [item]
for dependency in dependency_list:
for recursive_dep in self._chase_deps(source_set, (dependency,)):
yield recursive_dep
yield element
else:
# Note: The user is responsible for ensuring that source_set is complete.
# Otherwise, we would need to maintain a list of elements previously yielded.
pass
def __iter__(self):
source_set = set(self.elements.keys())
for element in self._chase_deps(source_set, source_set):
yield element
def __hash__(self):
"""Uniquely identify this work specification.
Allows the spec to be used as a dictionary key in Python. Note that this hash is possibly dependent on the Python
implementation. It is not part of the gmxapi specification and should not be used outside of a single invocation
of a script.
"""
# Hash the serialized elements, concatenated as a single string. Note that the order of elements and their
# contents is not guaranteed, but should be consistent within a script invocation.
return hash(to_string(self.serialize()))
def add_element(self, element):
"""Add an element to a work specification if possible.
Adding an element to a WorkSpec must preserve the validity of the workspec, which involves several checks.
We do not yet check for element uniqueness beyond a string name.
If an element is added that was previously in another WorkSpec, it must first be removed from the
other WorkSpec.
"""
if hasattr(element, "namespace") and hasattr(element, "operation") and hasattr(element, "serialize"):
if not hasattr(element, "name") or element.name is None or len(str(element.name)) < 1:
raise exceptions.UsageError("Only named elements may be added to a WorkSpec.")
if element.name in self.elements:
raise exceptions.UsageError("Elements in WorkSpec must be uniquely identifiable.")
if hasattr(element, "depends"):
for dependency in element.depends:
if not dependency in self.elements:
raise exceptions.UsageError("Element dependencies must already be specified before an Element may be added.")
# Okay, it looks like we have an element we can add
if hasattr(element, "workspec") and element.workspec is not None and element.workspec is not self:
raise exceptions.Error("Element must be removed from its current WorkSpec to be added to this WorkSpec, but element removal is not yet implemented.")
self.elements[element.name] = element.serialize()
element.workspec = self
else:
raise exceptions.ValueError("Provided object does not appear to be compatible with gmx.workflow.WorkElement.")
logger.info("Added element {} to workspec.".format(element.name))
# def remove_element(self, name):
# """Remove named element from work specification.
#
# Does not delete references to WorkElement objects, but WorkElement objects will be moved to a None WorkSpec."""
# To implement, WorkElement attributes should be reworked as properties that dynamically act on the
# workspec reference. Additionally, WorkSpec may have to keep weak references to WorkElements in order
# to reset the WorkElement.workspec strong reference.
# def add(self, spec):
# """
# Merge the provided spec into this one.
#
# We can't easily replace references to ``spec`` with references to the WorkSpec we are merging into, but we can
# steal the work elements out of ``spec`` and leave it empty. We could also set an ``alias`` attribute in it or
# something, but that seems unnecessary. Alternatively, we can set the new and old spec to be equal, but we would
# need an additional abstraction layer to keep them from diverging again. Since client code will retain references
# to the elements in the work spec, we need to be clear about when we are duplicating a WorkSpec versus obtaining
# different references to the same.
#
# This is an implementation detail that can be unresolved and hidden for now. The high-level interface only
# requires that client code can bind different workflow elements together in a sensible way and get expected
# results.
#
# :param spec: WorkSpec to be merged into this one.
# :return:
#
# To do: consider instead a gmx.workflow.merge(workspecA, workspecB) free function that returns a new WorkSpec.
# """
def serialize(self):
"""Serialize the work specification in a form suitable to pass to any Context implementation.
Serialization is performed with the JSON data serialization module.
To simplify unique identification of work specifications, this function will also impose rules for reproducibility.
1. All key-value maps are sorted alphanumerically by their string keys.
2. Strings must consist of valid ASCII characters.
3. Output is a byte sequence of the utf-8 encoded densely formatted JSON document.
Returns:
``unicode`` object in Python 2, ``bytes`` object in Python 3
Output of serialize() should be explicitly converted to a string before passing to a JSON deserializer.
>>> my_object = my_workspec.serialize()
>>> my_data_structure = json.loads(my_object.decode('utf-8'))
>>> # or...
>>> my_data_structure = json.loads(my_object, encoding='utf-8')
"""
import json
# Build the normalized dictionary
dict_representation = {'version': self.version,
'elements': {}
}
for name, element in [(e, json.loads(to_string(self.elements[e]))) for e in sorted(self.elements.keys())]:
dict_representation['elements'][str(name)] = element
serialization = json.dumps(dict_representation, ensure_ascii=True, sort_keys=True, separators=(',',':'))
return serialization.encode('utf-8')
@classmethod
def deserialize(serialized):
import json
workspec = WorkSpec()
dict_representation = json.loads(to_string(serialized))
ver_in = dict_representation['version']
ver_out = workspec.version
if ver_in != ver_out:
message = "Expected work spec version {}. Got work spec version {}.".format(ver_out, ver_in)
raise exceptions.CompatibilityError(message)
for element in dict_representation['elements']:
workspec.elements[element] = dict_representation['elements'][element]
return workspec
def uid(self):
"""Get a unique identifier for this work specification.
Returns:
hash value
Generate a cryptographic hash of this work specification that is guaranteed to match that of another equivalent
work specification. The returned string is a 64-character hexadecimal encoded SHA-256 hash digest of the
serialized WorkSpec.
The definition of equivalence is likely to evolve, but currently means a work spec of the
same version with the same named elements containing the same operations, dependencies, and parameters, as
represented in the serialized version of the work specification. Note that this does not include checks on the
actual contents of input files or anything that does not appear in the work specification directly. Also, the
hash is lossy, so it is remotely conceivable that two specs could have the same hash. The work specs
should be compared before making any expensive decisions based on work spec equivalence, such as with hash(workspec).
Element names probably shouldn't be included in the unique identifying information (so that we can optimize out
duplicated artifacts), but they are. A future API specification may add unique identification to the elements...
"""
# Get an alphanumeric string of the checksum of the serialized work spec. SHA-256 should require about 43 characters
# of base64 to represent, which seems reasonable. We need to replace some of the base64 characters to make them
# filesystem friendly, though. Hexadecimal may be more friendly, but would require 64 characters.
import hashlib
data = to_utf8(self.serialize())
result = hashlib.sha256(data)
return result.hexdigest()
def __str__(self):
"""Generate string representation for str() or print().
The string output should look like the abstract schema for gmxapi_workspec_1_0, but the exact
format is unspecified and may change in future versions.
For consistent JSON output, use WorkSpec.serialize().
"""
import json
string = to_string(self.serialize())
data = json.loads(string)
reserialized = json.dumps(data, indent=4, sort_keys=True)
return str(reserialized)
def __repr__(self):
"""Generate Pythonic representation for repr(workspec)."""
return 'gmx.workflow.WorkSpec()'
# A possible alternative name for WorkElement would be Operator, since there is a one-to-one
# mapping between WorkElements and applications of "operation"s. We need to keep in mind the
# sensible distinction between the WorkElement abstraction and the API objects and DAG nodes.
class WorkElement(object):
"""Encapsulate an element of a work specification."""
def __init__(self, namespace="gmxapi", operation=None, params=None, depends=()):
self._namespace = str(to_string(namespace))
# We can add an operations submodule to validate these. E.g. self.operation = gmx.workflow.operations.normalize(operation)
if operation is not None:
self._operation = str(to_string(operation))
else:
raise exceptions.UsageError("Invalid argument type for operation.")
# Note: Nothing currently prevents attribute updates by assignment after adding the element to a workspec,
# but this protocol will be clarified with https://github.com/kassonlab/gmxapi/issues/92
if params is None:
self.params = GmxMap()
elif isinstance(params, dict):
self.params = GmxMap({to_string(name): params[name] for name in params})
else:
raise exceptions.UsageError("If provided, params must be a dictionary of keyword arguments")
self.depends = []
for d in depends:
if isinstance(d, (list, tuple)):
self.depends.append([str(name) for name in d])
else:
self.depends.append(str(d))
# The Python class for work elements keeps a strong reference to a WorkSpec object containing its description
self._name = None
self._workspec = None
@property
def namespace(self):
assert isinstance(self._namespace, str)
return self._namespace
@property
def operation(self):
assert isinstance(self._operation, str)
return self._operation
@property
def name(self):
assert isinstance(self._name, (str, type(None)))
return self._name
@name.setter
def name(self, new_name):
self._name = str(to_string(new_name))
@property
def workspec(self):
return self._workspec
@workspec.setter
def workspec(self, input):
self._workspec = input
def add_dependency(self, element):
"""Add another element as a dependency.
First move the provided element to the same WorkSpec, if not already here.
Then, add to ``depends`` and update the WorkSpec.
"""
def check_element(element):
if element.workspec is None:
self.workspec.add_element(element)
assert element.workspec is self.workspec
assert element.name in self.workspec.elements
elif element.workspec is not self.workspec:
raise exceptions.ApiError("Element will need to be moved to the same workspec.")
return True
if hasattr(element, 'workspec') and hasattr(element, 'name'):
check_element(element)
self.depends.append(element.name)
else:
assert isinstance(element, (list, tuple))
self.depends.append(tuple([item.name for item in element if check_element(item)]))
self.workspec.elements[self.name] = self.serialize()
def serialize(self):
"""Create a byte sequence representation of the work element.
The WorkElement class exists just to provide convenient handles in Python. The WorkSpec is not actually a
container of WorkElement objects.
Returns:
Byte sequence of utf-8 encoded JSON document. May need to be decoded if needed as a (Unicode) string.
"""
import json
output_dict = {'namespace': self.namespace,
'operation': self.operation,
'params': self.params,
'depends': self.depends
}
serialization = json.dumps(output_dict)
return to_utf8(serialization)
@classmethod
def deserialize(cls, input, name=None, workspec=None):
"""Create a new WorkElement object from a serialized representation.
Arguments:
input: a serialized WorkElement
name: new element name (optional) (deprecated)
workspec: an existing workspec to attach this element to (optional)
When subclasses become distinct, this factory function will need to do additional dispatching to create an object of the correct type.
Alternatively, instead of subclassing, a slightly heavier single class may suffice, or more flexible duck typing might be better.
"""
import json
input_string = to_string(input)
args = json.loads(input_string)
element = cls(namespace=args['namespace'], operation=args['operation'], params=args['params'], depends=args['depends'])
if name is not None:
element.name = name
# This conditional is nested because we can only add named elements to a WorkSpec.
if workspec is not None:
element.workspec = workspec
if element.name not in workspec.elements:
workspec.add_element(element)
return element
class SharedDataElement(WorkElement):
"""Work element with MD-specific extensions.
The schema may not need to be changed, but the API object may be expected to provide additional functionality.
"""
def __init__(self, params, name=None):
"""Create a blank SharedDataElement representation.
It may be appropriate to insist on creating objects of this type via helpers or factories, particularly if
creation requires additional parameters.
"""
self.args = params['args']
self.kwargs = params['kwargs']
super(SharedDataElement, self).__init__(namespace="gmxapi",
operation="global_data",
params={'args': self.args, 'kwargs': self.kwargs})
self.name = name
def get_source_elements(workspec):
"""Get an iterator of the starting nodes in the work spec.
Source elements have no dependencies and can be processed immediately. Elements with dependencies
cannot be processed, instantiated, or added to a work spec until after their dependencies have been.
Args:
workspec : an existing work specification to analyze, such as by a Context implementation preparing to schedule work.
Returns:
iterator of gmx.workflow.WorkElement objects that may be processed without dependencies.
This function is provided in the API to allow flexibility in how source elements are determined.
"""
for name in workspec.elements:
element_data = workspec.elements[name]
element = WorkElement.deserialize(element_data)
if len(element.depends) == 0:
element.name = name
element.workspec = workspec
yield(element)
def from_tpr(input=None, **kwargs):
"""Create a WorkSpec from a (list of) tpr file(s).
Generates a work specification based on the provided simulation input and returns a handle to the
MD simulation element of the workflow. Key word arguments can override simulation behavior from
``input``.
If the MD operation discovers artifacts from a previous simulation that was launched from the same input,
the simulation resumes from the last checkpointed step. If ``append_output`` is set ``False``, existing
artifacts are kept separate from new output with the standard file naming convention,
and new output begins from the last checkpointed step, if any.
Setting ``end_time`` redefines the end point of the simulation trajectory from what was provided in
``input``. It is equivalent to changing the number of steps requested in the MDP (or TPR) input, but
it time is provided as picoseconds instead of a number of time steps.
.. deprecated:: 0.0.7
If ``steps=N`` is provided and N is an integer
greater than or equal to 1, the MD operation advances the trajectory by ``N`` steps, regardless of the number
of simulation steps specified in ``input`` or ``end_time``. For convenience, setting ``steps=None`` does not override
``input``.
Note that when it is not ``None``, ``steps`` takes precedence over ``end_time`` and ``input``, but can still be
superceded by a signal, such as if an MD plugin or other code has a simulation completion condition that occurs
before ``N`` additional steps have run.
Where key word arguments correspond to ``gmx mdrun`` command line options, the corresponding flags are noted below.
Keyword Arguments:
input (str): *Required* string or list of strings giving the filename(s) of simulation input
append_output (bool): Append output for continuous trajectories if True, truncate existing output data if False. (default True)
end_time (float): Specify the final time in the simulation trajectory, overriding input read from TPR.
grid (tuple): Domain decomposition grid divisions (nx, ny, nz). (-dd)
max_hours (float): Terminate after 0.99 times this many hours if simulation is still running. (-maxh)
pme_ranks (int): number of separate ranks to be used for PME electrostatics. (-npme)
pme_threads_per_rank (int): Number of OpenMP threads per PME rank. (-ntomp_pme)
steps (int): Override input files and run for this many steps. (-nsteps; deprecated)
threads (int): Total number of threads to start. (-nt)
threads_per_rank (int): number of OpenMP threads to start per MPI rank. (-ntomp)
tmpi (int): number of thread-MPI ranks to start. (-ntmpi)
Returns:
simulation member of a gmx.workflow.WorkSpec object
Produces a WorkSpec with the following data::
version: gmxapi_workspec_0_1
elements:
tpr_input:
namespace: gromacs
operation: load_tpr
params: {'input': ['tpr_filename1', 'tpr_filename2', ...]}
md_sim:
namespace: gmxapi
operation: md
depends: ['tpr_input']
params: {'kw1': arg1, 'kw2': arg2, ...}
Bugs: version 0.0.6
* There is not a way to programatically check the current step number on disk.
See https://github.com/kassonlab/gmxapi/issues/56 and https://github.com/kassonlab/gmxapi/issues/85
"""
import os
usage = "argument to from_tpr() should be a valid filename or list of filenames, followed by optional key word arguments."
# Normalize to tuple input type.
if isinstance(input, list) or isinstance(input, tuple):
tpr_list = tuple([to_string(element) for element in input])
else:
try:
tpr_list = (to_string(input),)
except:
raise exceptions.UsageError(usage)
# Check for valid filenames
for arg in tpr_list:
if not (os.path.exists(arg) and os.path.isfile(arg)):
arg_path = os.path.abspath(arg)
raise exceptions.UsageError(usage + " Got {}".format(arg_path))
# Note: These are runner parameters, not MD parameters, and should be in the call to gmx.run() instead of here.
# Reference https://github.com/kassonlab/gmxapi/issues/95
params = {}
for arg_key in kwargs:
if arg_key == 'grid' or arg_key == 'dd':
params['grid'] = tuple(kwargs[arg_key])
elif arg_key == 'pme_ranks' or arg_key == 'npme':
params['pme_ranks'] = int(kwargs[arg_key])
elif arg_key == 'threads' or arg_key == 'nt':
params['threads'] = int(kwargs[arg_key])
elif arg_key == 'tmpi' or arg_key == 'ntmpi':
params['tmpi'] = int(kwargs[arg_key])
elif arg_key == 'threads_per_rank' or arg_key == 'ntomp':
params['threads_per_rank'] = int(kwargs[arg_key])
elif arg_key == 'pme_threads_per_rank' or arg_key == 'ntomp_pme':
params['pme_threads_per_rank'] = int(kwargs[arg_key])
elif arg_key == 'steps' or arg_key == 'nsteps':
if kwargs[arg_key] is None:
# None means "don't override the input" which is indicated by a parameter value of -2 in GROMACS 2019
steps = -2
else:
# Otherwise we require steps to be a positive integer
try:
steps = int(kwargs[arg_key])
if steps < 1:
raise exceptions.ValueError('steps to run must be at least 1')
except (TypeError, ValueError) as e:
# steps is not an integer.
raise exceptions.TypeError('"steps" could not be interpreted as an integer.')
# The "nsteps" command line flag will be removed in GROMACS 2020
# and so "steps" is deprecated in gmxapi 0.0.7
warnings.warn("`steps` keyword argument is deprecated. Consider `end_time` instead.",
DeprecationWarning)
params['steps'] = steps
elif arg_key == 'max_hours' or arg_key == 'maxh':
params['max_hours'] = float(kwargs[arg_key])
elif arg_key == 'append_output':
# Try not to encourage confusion with the `mdrun` `-noappend` flag, which would be a confusing double negative if represented as a bool.
params['append_output'] = bool(kwargs[arg_key])
elif arg_key == 'end_time':
params[arg_key] = float(kwargs[arg_key])
else:
raise exceptions.UsageError("Invalid key word argument: {}. {}".format(arg_key, usage))
# Create an empty WorkSpec
workspec = WorkSpec()
# Create and add the Element for the tpr file(s)
inputelement = WorkElement(namespace='gromacs', operation='load_tpr', params={'input': tpr_list})
inputelement.name = 'tpr_input'
if inputelement.name not in workspec.elements:
# Operations such as this need to be replaced with accessors or properties that can check the validity of the WorkSpec
workspec.elements[inputelement.name] = inputelement.serialize()
inputelement.workspec = workspec
# Create and add the simulation element
# We can add smarter handling of the `depends` argument, but it is only critical to check when adding the element
# to a WorkSpec.
mdelement = WorkElement(operation='md', depends=[inputelement.name], params=params)
mdelement.name = 'md_sim'
# Check that the element has not already been added, but that its dependency has.
workspec.add_element(mdelement)
return mdelement