Skip to content

Commit

Permalink
Merge pull request #794 from spotify/ulzha/task-limit
Browse files Browse the repository at this point in the history
add configurable per-invocation task-limit
  • Loading branch information
Tarrasch committed Feb 26, 2015
2 parents 6797ffa + f51e4f9 commit 0abe27e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 3 deletions.
8 changes: 8 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ worker-ping-interval
Number of seconds to wait between pinging scheduler to let it know
that the worker is still alive. Defaults to 1.0.

worker-task-limit
.. versionadded:: 1.0.25

Maximum number of tasks to schedule per invocation. Upon exceeding it,
the worker will issue a warning and proceed with the workflow obtained
thus far. Prevents incidents due to spamming of the scheduler, usually
accidental. Default: no limit.

worker-timeout
.. versionadded:: 1.0.20

Expand Down
12 changes: 10 additions & 2 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class Worker(object):
def __init__(self, scheduler=None, worker_id=None,
worker_processes=1, ping_interval=None, keep_alive=None,
wait_interval=None, max_reschedules=None, count_uniques=None,
worker_timeout=None):
worker_timeout=None, task_limit=None):

if scheduler is None:
scheduler = CentralPlannerScheduler()
Expand Down Expand Up @@ -313,9 +313,13 @@ def __init__(self, scheduler=None, worker_id=None,
self.__max_reschedules = max_reschedules

if worker_timeout is None:
worker_timeout = configuration.get_config().getint('core', 'worker-timeout', 0)
worker_timeout = config.getint('core', 'worker-timeout', 0)
self.__worker_timeout = worker_timeout

if task_limit is None:
task_limit = config.getint('core', 'worker-task-limit', None)
self.__task_limit = task_limit

self._id = worker_id
self._scheduler = scheduler

Expand Down Expand Up @@ -476,6 +480,10 @@ def add(self, task, multiprocess=False):
return self.add_succeeded

def _add(self, task, is_complete):
if self.__task_limit is not None and len(self._scheduled_tasks) >= self.__task_limit:
logger.warning('Will not schedule %s or any dependencies due to exceeded task-limit of %d', task, self.__task_limit)
return

formatted_traceback = None
try:
self._check_complete_value(is_complete)
Expand Down
2 changes: 1 addition & 1 deletion test/cmdline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_cmdline_logger(self, setup_mock, warn):

with mock.patch("luigi.configuration.get_config") as getconf:
getconf.return_value.get.side_effect = ConfigParser.NoOptionError(section='foo', option='bar')
getconf.return_value.get_boolean.return_value = True
getconf.return_value.getint.return_value = 0

luigi.interface.setup_interface_logging.call_args_list = []
luigi.run(['SomeTask', '--n', '42', '--local-scheduler', '--no-lock'])
Expand Down
49 changes: 49 additions & 0 deletions test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import mock
from helpers import with_config
from luigi import ExternalTask, RemoteScheduler, Task
from luigi.mock import MockFile, MockFileSystem
from luigi.scheduler import CentralPlannerScheduler
from luigi.worker import Worker
from luigi import six
Expand Down Expand Up @@ -838,5 +839,53 @@ def test_purge_hung_worker_override_timeout_time(self, mock_time):
self.assertEqual(0, len(w._running_tasks))


class ForkBombTask(luigi.Task):
depth = luigi.IntParameter()
breadth = luigi.IntParameter()
p = luigi.Parameter(default=(0, )) # ehm for some weird reason [0] becomes a tuple...?

def output(self):
return MockFile('.'.join(map(str, self.p)))

def run(self):
with self.output().open('w') as f:
f.write('Done!')

def requires(self):
if len(self.p) < self.depth:
for i in range(self.breadth):
yield ForkBombTask(self.depth, self.breadth, self.p + (i, ))


class TaskLimitTest(unittest.TestCase):
def tearDown(self):
MockFileSystem().remove('')

@with_config({'core': {'worker-task-limit': '6'}})
def test_task_limit_exceeded(self):
w = Worker()
t = ForkBombTask(3, 2)
w.add(t)
w.run()
self.assertFalse(t.complete())
leaf_tasks = [ForkBombTask(3, 2, branch) for branch in [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1)]]
self.assertEquals(3, sum(t.complete() for t in leaf_tasks), "should have gracefully completed as much as possible even though the single last leaf didn't get scheduled")

@with_config({'core': {'worker-task-limit': '7'}})
def test_task_limit_not_exceeded(self):
w = Worker()
t = ForkBombTask(3, 2)
w.add(t)
w.run()
self.assertTrue(t.complete())

def test_no_task_limit(self):
w = Worker()
t = ForkBombTask(4, 2)
w.add(t)
w.run()
self.assertTrue(t.complete())


if __name__ == '__main__':
luigi.run()

0 comments on commit 0abe27e

Please sign in to comment.