Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This change ads sudo to group calls #2052

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 86 additions & 0 deletions fabric/group.py
Expand Up @@ -107,6 +107,31 @@ def run(self, *args, **kwargs):
# exception just being the signal that Shit Broke?
raise NotImplementedError

def sudo(self, *args, **kwargs):
"""
Executes `.Connection.sudo` on all member `Connections <.Connection>`.

:returns: a `.GroupResult`.

.. versionadded:: 2.0
"""
# TODO: probably best to suck it up & match actual run() sig?
# TODO: how to change method of execution across contents? subclass,
# kwargs, additional methods, inject an executor? Doing subclass for
# now, but not 100% sure it's the best route.
# TODO: also need way to deal with duplicate connections (see THOUGHTS)
# TODO: and errors - probably FailureSet? How to handle other,
# regular, non Failure, exceptions though? Still need an aggregate
# exception type either way, whether it is FailureSet or what...
# TODO: OTOH, users may well want to be able to operate on the hosts
# that did not fail (esp if failure % is low) so we really _do_ want
# something like a result object mixing success and failure, or maybe a
# golang style two-tuple of successes and failures?
# TODO: or keep going w/ a "return or except", but the object is
# largely similar (if not identical) in both situations, with the
# exception just being the signal that Shit Broke?
raise NotImplementedError

# TODO: how to handle sudo? Probably just an inner worker method that takes
# the method name to actually call (run, sudo, etc)?

Expand Down Expand Up @@ -153,12 +178,30 @@ def run(self, *args, **kwargs):
raise GroupException(results)
return results

def sudo(self, *args, **kwargs):
results = GroupResult()
excepted = False
for cxn in self:
try:
results[cxn] = cxn.sudo(*args, **kwargs)
except Exception as e:
results[cxn] = e
excepted = True
if excepted:
raise GroupException(results)
return results


def thread_worker(cxn, queue, args, kwargs):
result = cxn.run(*args, **kwargs)
# TODO: namedtuple or attrs object?
queue.put((cxn, result))

def thread_worker_sudo(cxn, queue, args, kwargs):
result = cxn.sudo(*args, **kwargs)
# TODO: namedtuple or attrs object?
queue.put((cxn, result))


class ThreadingGroup(Group):
"""
Expand Down Expand Up @@ -210,6 +253,49 @@ def run(self, *args, **kwargs):
raise GroupException(results)
return results

def sudo(self, *args, **kwargs):
results = GroupResult()
queue = Queue()
threads = []
for cxn in self:
my_kwargs = dict(cxn=cxn, queue=queue, args=args, kwargs=kwargs)
thread = ExceptionHandlingThread(
target=thread_worker_sudo, kwargs=my_kwargs
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
# TODO: configurable join timeout
# TODO: (in sudo's version) configurability around interactive
# prompting resulting in an exception instead, as in v1
thread.join()
# Get non-exception results from queue
while not queue.empty():
# TODO: io-sleep? shouldn't matter if all threads are now joined
cxn, result = queue.get(block=False)
# TODO: outstanding musings about how exactly aggregate results
# ought to ideally operate...heterogenous obj like this, multiple
# objs, ??
results[cxn] = result
# Get exceptions from the threads themselves.
# TODO: in a non-thread setup, this would differ, e.g.:
# - a queue if using multiprocessing
# - some other state-passing mechanism if using e.g. coroutines
# - ???
excepted = False
for thread in threads:
wrapper = thread.exception()
if wrapper is not None:
# Outer kwargs is Thread instantiation kwargs, inner is kwargs
# passed to thread target/body.
cxn = wrapper.kwargs["kwargs"]["cxn"]
results[cxn] = wrapper.value
excepted = True
if excepted:
raise GroupException(results)
return results


class GroupResult(dict):
"""
Expand Down