Skip to content

Commit

Permalink
Merge pull request #258 from Codeacious/jrm_parallel_store_trond
Browse files Browse the repository at this point in the history
Implemented #221, called tronstore
  • Loading branch information
dnephin committed Aug 13, 2013
2 parents a0eaeed + a905d0b commit 804c332
Show file tree
Hide file tree
Showing 31 changed files with 2,705 additions and 112 deletions.
13 changes: 13 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ State Persistence
The number of save calls to buffer before writing the state. Defaults to 1,
which is no buffering.

**db_store_method**
The method to use for saving state information to a SQL database. Only used if store_type is sql.

Valid options are:
**json** - uses the `simplejson` module.

**msgpack** - uses the `msgpack` module, from the msgpack-python package (tested with version 0.3.0).

**pickle** - uses the `cPickle` module. be careful with this one, as pickle is Turing complete.

**yaml** - uses the `yaml` module, from the PyYaml package (tested with version 3.10).


Example::

Expand All @@ -223,6 +235,7 @@ Example::
name: local_sqlite
connection_details: "sqlite:///dest_state.db"
buffer_size: 1 # No buffer
db_store_method: json


.. _action_runners:
Expand Down
3 changes: 3 additions & 0 deletions docs/man_tronview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ Options
``-s, --save``
Save server and color options to client config file (~/.tron)

``--namespace``
Only show jobs and services from the specified namespace


States
----------
Expand Down
37 changes: 36 additions & 1 deletion tests/core/job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ def test_restore_state(self):
job_runs = [mock.Mock(), mock.Mock()]
state_data = ({'enabled': False, 'run_ids': [1, 2]}, run_data)

with mock.patch.object(self.job.job_runs, 'restore_state', return_value=job_runs):
with contextlib.nested(
mock.patch.object(self.job.job_runs, 'restore_state', return_value=job_runs),
mock.patch.object(self.job.job_runs, 'get_run_numbers', return_value=state_data[0]['run_ids'])
):
self.job.restore_state(state_data)

assert not self.job.enabled
calls = [mock.call(job_runs[i]) for i in xrange(len(job_runs))]
self.job.watcher.watch.assert_has_calls(calls)
calls = [mock.call(job_runs[i], jobrun.JobRun.NOTIFY_DONE) for i in xrange(len(job_runs))]
self.job_scheduler.watch.assert_has_calls(calls)
assert_equal(self.job.job_state.state_data, state_data[0])
self.job.job_runs.restore_state.assert_called_once_with(
sorted(run_data, key=lambda data: data['run_num'], reverse=True),
Expand All @@ -117,6 +122,7 @@ def test_restore_state(self):
self.job.context,
self.job.node_pool
)
self.job.job_runs.get_run_numbers.assert_called_once_with()
self.job.job_scheduler.restore_state.assert_called_once_with()
self.job.event.ok.assert_called_with('restored')

Expand Down Expand Up @@ -500,6 +506,35 @@ def mock_eventloop(self):
def teardown_job(self):
event.EventManager.reset()

def test_restore_state_scheduled(self):
mock_scheduled = [mock.Mock(), mock.Mock()]
with contextlib.nested(
mock.patch.object(self.job_scheduler.job_runs, 'get_scheduled',
return_value=iter(mock_scheduled)),
mock.patch.object(self.job_scheduler, 'schedule'),
mock.patch.object(self.job_scheduler, '_set_callback')
) as (get_patch, sched_patch, back_patch):
self.job_scheduler.restore_state()
get_patch.assert_called_once_with()
calls = [mock.call(m) for m in mock_scheduled]
back_patch.assert_has_calls(calls)
sched_patch.assert_called_once_with()

def test_restore_state_queued(self):
queued = mock.Mock()
with contextlib.nested(
mock.patch.object(self.job_scheduler.job_runs, 'get_scheduled',
return_value=iter([])),
mock.patch.object(self.job_scheduler.job_runs, 'get_first_queued',
return_value=queued),
mock.patch.object(self.job_scheduler, 'schedule'),
mock.patch.object(job.eventloop, 'call_later')
) as (get_patch, queue_patch, sched_patch, later_patch):
self.job_scheduler.restore_state()
get_patch.assert_called_once_with()
later_patch.assert_called_once_with(0, self.job_scheduler.run_job, queued, run_queued=True)
sched_patch.assert_called_once_with()

def test_schedule(self):
with mock.patch.object(self.job_scheduler.job_state, 'is_enabled',
new=True):
Expand Down
1 change: 1 addition & 0 deletions tests/mcp_reconfigure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def teardown_mcp(self):
event.EventManager.reset()
filehandler.OutputPath(self.test_dir).delete()
filehandler.FileHandleManager.reset()
self.mcp.state_watcher.shutdown()

def reconfigure(self):
config = {schema.MASTER_NAMESPACE: self._get_config(1, self.test_dir)}
Expand Down
16 changes: 8 additions & 8 deletions tests/mcp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ class MasterControlProgramTestCase(TestCase):
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
with mock.patch('tron.serialize.runstate.statemanager.StateChangeWatcher', autospec=True):
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.state_watcher = mock.create_autospec(
statemanager.StateChangeWatcher)

@teardown
def teardown_mcp(self):
Expand Down Expand Up @@ -134,11 +133,12 @@ class MasterControlProgramRestoreStateTestCase(TestCase):
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.jobs = mock.create_autospec(job.JobCollection)
self.mcp.services = mock.create_autospec(service.ServiceCollection)
self.mcp.state_watcher = mock.create_autospec(statemanager.StateChangeWatcher)
with mock.patch('tron.serialize.runstate.statemanager.StateChangeWatcher', autospec=True):
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.jobs = mock.create_autospec(job.JobCollection)
self.mcp.services = mock.create_autospec(service.ServiceCollection)
self.mcp.state_watcher = mock.create_autospec(statemanager.StateChangeWatcher)

@teardown
def teardown_mcp(self):
Expand Down
110 changes: 75 additions & 35 deletions tests/serialize/runstate/statemanager_test.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
import os
import mock
import contextlib
from testify import TestCase, assert_equal, setup, run

from tests.assertions import assert_raises
from tests.testingutils import autospec_method
from tron.config import schema
from tron.serialize import runstate
from tron.serialize.runstate.shelvestore import ShelveStateStore
from tron.serialize.runstate.statemanager import PersistentStateManager, StateChangeWatcher
from tron.serialize.runstate.statemanager import StateSaveBuffer
from tron.serialize.runstate.statemanager import StateMetadata
from tron.serialize.runstate.statemanager import PersistenceStoreError
from tron.serialize.runstate.statemanager import VersionMismatchError
from tron.serialize.runstate.statemanager import PersistenceManagerFactory


class PersistenceManagerFactoryTestCase(TestCase):

def test_from_config_shelve(self):
thefilename = 'thefilename'
config = schema.ConfigState(
store_type='shelve', name=thefilename, buffer_size=0,
connection_details=None)
manager = PersistenceManagerFactory.from_config(config)
store = manager._impl
assert_equal(store.filename, config.name)
assert isinstance(store, ShelveStateStore)
os.unlink(thefilename)


class StateMetadataTestCase(TestCase):
Expand Down Expand Up @@ -73,13 +56,19 @@ class PersistentStateManagerTestCase(TestCase):

@setup
def setup_manager(self):
self.store = mock.Mock()
self.store.build_key.side_effect = lambda t, i: '%s%s' % (t, i)
self.buffer = StateSaveBuffer(1)
self.manager = PersistentStateManager(self.store, self.buffer)
with mock.patch('tron.serialize.runstate.statemanager.ParallelStore', autospec=True) \
as self.store_patch:
self.store = self.store_patch.return_value
self.build_patch = mock.Mock(side_effect=lambda t, i: '%s%s' % (t, i))
self.store_patch.return_value.configure_mock(build_key=self.build_patch)
self.buffer = StateSaveBuffer(1)
self.manager = PersistentStateManager()
self.manager._buffer = self.buffer

def test__init__(self):
assert_equal(self.manager._impl, self.store)
self.store_patch.assert_called_once_with()
self.build_patch.assert_called_once_with(runstate.MCP_STATE, StateMetadata.name)
assert_equal(self.manager.metadata_key, self.manager._impl.build_key(runstate.MCP_STATE, StateMetadata.name))

def test_keys_for_items(self):
names = ['namea', 'nameb']
Expand Down Expand Up @@ -137,14 +126,43 @@ def test_disabled_nested(self):
pass
assert not self.manager.enabled

def test_update_config_success(self):
new_config = mock.Mock(buffer_size=5)
self.store.load_config.configure_mock(return_value=True)
with contextlib.nested(
mock.patch.object(self.manager, '_save_from_buffer'),
mock.patch('tron.serialize.runstate.statemanager.StateSaveBuffer', autospec=True)
) as (save_patch, buffer_patch):
assert_equal(self.manager.update_from_config(new_config), True)
save_patch.assert_called_once_with()
self.store.load_config.assert_called_once_with(new_config)
buffer_patch.assert_called_once_with(new_config.buffer_size)

def test_update_config_failure(self):
new_config = mock.Mock(buffer_size=5)
self.store.load_config.configure_mock(return_value=False)
with contextlib.nested(
mock.patch.object(self.manager, '_save_from_buffer'),
mock.patch('tron.serialize.runstate.statemanager.StateSaveBuffer', autospec=True)
) as (save_patch, buffer_patch):
assert_equal(self.manager.update_from_config(new_config), False)
save_patch.assert_called_once_with()
self.store.load_config.assert_called_once_with(new_config)
assert not buffer_patch.called


class StateChangeWatcherTestCase(TestCase):

@setup
def setup_watcher(self):
self.watcher = StateChangeWatcher()
self.state_manager = mock.create_autospec(PersistentStateManager)
self.watcher.state_manager = self.state_manager
with mock.patch('tron.serialize.runstate.statemanager.PersistentStateManager', autospec=True) \
as self.persistence_patch:
self.watcher = StateChangeWatcher()
self.state_manager = mock.create_autospec(PersistentStateManager)
self.watcher.state_manager = self.state_manager

def test__init__(self):
self.persistence_patch.assert_called_once_with()

def test_update_from_config_no_change(self):
self.watcher.config = state_config = mock.Mock()
Expand All @@ -153,17 +171,39 @@ def test_update_from_config_no_change(self):
assert_equal(self.watcher.state_manager, self.state_manager)
assert not self.watcher.shutdown.mock_calls

@mock.patch('tron.serialize.runstate.statemanager.PersistenceManagerFactory',
autospec=True)
def test_update_from_config_changed(self, mock_factory):
state_config = mock.Mock()
autospec_method(self.watcher.shutdown)
def test_update_from_config_success(self):
state_config = mock.Mock(store_type="shelve")
assert self.watcher.update_from_config(state_config)
assert_equal(self.watcher.config, state_config)
self.watcher.shutdown.assert_called_with()
assert_equal(self.watcher.state_manager,
mock_factory.from_config.return_value)
mock_factory.from_config.assert_called_with(state_config)
self.state_manager.update_from_config.assert_called_once_with(state_config)

def test_update_from_config_failure_same_config(self):
state_config = self.watcher.config
assert not self.watcher.update_from_config(state_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_update_from_config_failure_from_state_manager(self):
self.state_manager.update_from_config.configure_mock(return_value=False)
state_config = self.watcher.config
fake_config = mock.Mock(store_type="shelve")
assert not self.watcher.update_from_config(fake_config)
assert_equal(self.watcher.config, state_config)
self.state_manager.update_from_config.assert_called_once_with(fake_config)

def test_update_from_config_failure_bad_store_type(self):
state_config = self.watcher.config
fake_config = mock.Mock(store_type="hue_hue_hue")
assert_raises(PersistenceStoreError, self.watcher.update_from_config, fake_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_update_from_config_failure_bad_db_type(self):
state_config = self.watcher.config
fake_config = mock.Mock(store_type="sql", store_method="make_it_rain")
assert_raises(PersistenceStoreError, self.watcher.update_from_config, fake_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_save_job(self):
mock_job = mock.Mock()
Expand Down
Empty file.

0 comments on commit 804c332

Please sign in to comment.