Skip to content

Commit

Permalink
Make sure Context does not contain an active session when it shouldn't.
Browse files Browse the repository at this point in the history
Referemce kassonlab#214

* Add `gmx.core.has_feature` binding to `gmxapi::Version::hasFeature`
* Check whether workaround is necessary for bug kassonlab#214
* Reassign a new `gmx.core.Context` to `gmx.context.Context._api_object`
  if bugfix not reported present in libgmxapi
* Avoid circular reference by only allowing WorkSpec to hold weak ref
  to context.
* Update and expand testing
  • Loading branch information
eirrgang committed Dec 24, 2018
1 parent 4e4719d commit 6ece65a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 16 deletions.
25 changes: 19 additions & 6 deletions src/gmx/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from gmx import exceptions
from gmx import logging
from gmx import status
import gmx.core as gmxapi


# Module-level logger
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,7 +73,6 @@ def _md(context, element):
Returns:
A Director that the Context can use in launching the Session.
"""
import gmx.core
class Builder(object):
"""Translate md work element to a node in the session's DAG."""
def __init__(self, element):
Expand Down Expand Up @@ -123,17 +124,17 @@ def launch(rank=None):
# altered input.
_, temp_filename = tempfile.mkstemp(suffix='.tpr')
logger.debug('Updating input. Using temp file {}'.format(temp_filename))
gmx.core.copy_tprfile(source=infile[rank],
gmxapi.copy_tprfile(source=infile[rank],
destination=temp_filename,
end_time=self.runtime_params['end_time'])
tpr_file = temp_filename
else:
tpr_file = infile[rank]

logger.info('Loading TPR file: {}'.format(tpr_file))
system = gmx.core.from_tpr(tpr_file)
system = gmxapi.from_tpr(tpr_file)
dag.nodes[name]['system'] = system
mdargs = gmx.core.MDArgs()
mdargs = gmxapi.MDArgs()
mdargs.set(self.runtime_params)
# Workaround to give access to plugin potentials used in a context.
pycontext = element.workspec._context
Expand Down Expand Up @@ -663,7 +664,6 @@ def __init__(self, work=None, workdir_list=None, communicator=None):

# self.__context_array = list([Context(work_element) for work_element in work])
from gmx.workflow import WorkSpec
import gmx.core

# Until better Session abstraction exists at the Python level, a
# _session_communicator attribute will be added to and removed from the
Expand Down Expand Up @@ -722,7 +722,7 @@ def __init__(self, work=None, workdir_list=None, communicator=None):
# This setter must be called after the operations map has been populated.
self.work = work

self._api_object = gmx.core.Context()
self._api_object = gmxapi.Context()

@property
def work(self):
Expand Down Expand Up @@ -995,7 +995,9 @@ def __enter__(self):
if not runner is None:
runners.append(runner)
closers.append(graph.nodes[name]['close'])

# Get a session object to return. It must simply provide a `run()` function.
context = self # Part of workaround for bug gmxapi-214
class Session(object):
def __init__(self, runners, closers):
self.runners = list(runners)
Expand All @@ -1011,22 +1013,33 @@ def run(self):
to_be_deleted.insert(0, i)
for i in to_be_deleted:
del self.runners[i]
return True

def close(self):
for close in self.closers:
logger.debug("Closing node: {}".format(close))
close()
# Workaround for bug gmxapi-214
if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
context._api_object = gmxapi.Context()


self._session = Session(runners, closers)
else:
logger.info("Context rank {} has no work to do".format(self.rank))

context = self # Part of workaround for bug gmxapi-214
class NullSession(object):
def run(self):
logger.info("Running null session on rank {}.".format(self.rank))
return status.Status()
def close(self):
logger.info("Closing null session.")
# Workaround for bug gmxapi-214
if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'):
context._api_object = gmxapi.Context()
return

self._session = NullSession()
self._session.rank = self.rank

Expand Down
4 changes: 4 additions & 0 deletions src/gmx/core/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "tprfile.h"
#include "gmxapi/status.h"
#include "gmxapi/version.h"

#include "pybind11/pybind11.h"

Expand Down Expand Up @@ -69,6 +70,9 @@ PYBIND11_MODULE(core, m) {
m.doc() = docstring;

// Export core bindings

m.def("has_feature", &gmxapi::Version::hasFeature, "Check the gmxapi library for a named feature.");

py::class_< ::gmxapi::Status > gmx_status(m, "Status", "Holds status for API operations.");


Expand Down
92 changes: 83 additions & 9 deletions src/gmx/test/test_pymd.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,99 @@ def test_simpleSimulation(caplog):
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
gmx.run(md)

@pytest.mark.usefixtures("cleandir")
@pytest.mark.filterwarnings("ignore:Using or importing the ABCs from 'collections'")
@pytest.mark.usefixtures("caplog")
def test_idempotence1(caplog):
"""Confirm that a work graph can be run repeatedly, even after completed.
Use gmx.run and avoid extra references held by user code.
"""
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
gmx.run(md)
gmx.run(md)
gmx.run(md)
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
gmx.run(md)
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
gmx.run(md)

@pytest.mark.usefixtures("cleandir")
@pytest.mark.filterwarnings("ignore:Using or importing the ABCs from 'collections'")
@pytest.mark.usefixtures("caplog")
def test_idempotence2(caplog):
"""Confirm that a work graph can be run repeatedly, even after completed.
Interact with Context more directly.
Check that more unpredictable references held by user are still safe.
"""
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
with gmx.get_context(md) as session:
session.run()

context = gmx.get_context(md)
with context as session:
session.run()

context = gmx.context.Context()
context.work = md
with context as session:
session.run()

@pytest.mark.usefixtures("cleandir")
def test_modifiedInput(caplog):
"""Load a work specification with a single TPR file and updated params."""
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.02')
with gmx.context.ParallelArrayContext(md) as session:
context = gmx.get_context(md)
with context as session:
session.run()
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.03')
context.work = md
with context as session:
session.run()
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.04')
gmx.run(md)

@pytest.mark.usefixtures("cleandir")
@pytest.mark.usefixtures("caplog")
@withmpi_only
def test_array_context(caplog):
def test_plugin_no_ensemble(caplog):
# Test attachment of external code
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
context = gmx.context.ParallelArrayContext(md)
with context as session:
session.run()

# Create a WorkElement for the potential
#potential = gmx.core.TestModule()
potential_element = gmx.workflow.WorkElement(namespace="testing", operation="create_test")
potential_element.name = "test_module"
before = md.workspec.elements[md.name]
md.add_dependency(potential_element)
assert potential_element.name in md.workspec.elements
assert potential_element.workspec is md.workspec
after = md.workspec.elements[md.name]
assert not before is after

# Workaround for https://github.com/kassonlab/gmxapi/issues/42
# We can't add an operation to a context that doesn't exist yet, but we can't
# add a work graph with an operation that is not defined in a context.
context = gmx.get_context()
context.add_operation(potential_element.namespace, potential_element.operation, my_plugin)
context.work = md

with warnings.catch_warnings():
# Swallow warning about wide MPI context
warnings.simplefilter("ignore")
with context as session:
if context.rank == 0:
print(context.work)
session.run()


@pytest.mark.usefixtures("cleandir")
@pytest.mark.usefixtures("caplog")
@withmpi_only
def test_plugin(caplog):
# Test attachment of external code
md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1)
def test_plugin_with_ensemble(caplog):
# Test in ensemble.
md = gmx.workflow.from_tpr([tpr_filename, tpr_filename], threads_per_rank=1)

# Create a WorkElement for the potential
#potential = gmx.core.TestModule()
Expand All @@ -145,7 +216,10 @@ def test_plugin(caplog):
after = md.workspec.elements[md.name]
assert not before is after

context = gmx.context.ParallelArrayContext()
# Workaround for https://github.com/kassonlab/gmxapi/issues/42
# We can't add an operation to a context that doesn't exist yet, but we can't
# add a work graph with an operation that is not defined in a context.
context = gmx.get_context()
context.add_operation(potential_element.namespace, potential_element.operation, my_plugin)
context.work = md

Expand Down
16 changes: 15 additions & 1 deletion src/gmx/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from __future__ import unicode_literals

import warnings
import weakref

from gmx import exceptions
from gmx import logging
Expand Down Expand Up @@ -124,7 +125,20 @@ class WorkSpec(object):
def __init__(self):
self.version = workspec_version
self.elements = dict()
self._context = None
self.__context_weak_ref = None

@property
def _context(self):
referent = None
if self.__context_weak_ref is not None:
referent = self.__context_weak_ref()
return referent

@_context.setter
def _context(self, context):
# We're moving towards having the context own the work, so the work should
# not own the context.
self.__context_weak_ref = weakref.ref(context)

def _chase_deps(self, source_set, name_list):
"""Helper to recursively generate dependencies before dependents.
Expand Down

0 comments on commit 6ece65a

Please sign in to comment.