Skip to content

Commit

Permalink
Merge pull request #955 from leapmotion/feature-rundown
Browse files Browse the repository at this point in the history
Add Rundown behavior to DispatchQueue
  • Loading branch information
yeswalrus committed May 23, 2016
2 parents 3a64e9f + f2410e5 commit c8fdffb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 36 deletions.
11 changes: 3 additions & 8 deletions src/autowiring/CoreThread.cpp
Expand Up @@ -11,15 +11,10 @@ CoreThread::CoreThread(const char* pName):
CoreThread::~CoreThread(void){}

void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shared_ptr<CoreObject>&& refTracker) {
try {
// If we are asked to rundown while we still have elements in our dispatch queue,
// we must try to process them:
// Kill everything in the dispatch queue and also run it down
{
CurrentContextPusher pshr(ctxt);
DispatchAllEvents();
}
catch(...) {
// We failed to run down the dispatch queue gracefully, we now need to abort it
Abort();
Rundown();
}

// Handoff to base class:
Expand Down
82 changes: 54 additions & 28 deletions src/autowiring/DispatchQueue.cpp
Expand Up @@ -29,6 +29,56 @@ DispatchQueue::~DispatchQueue(void) {
}
}

void DispatchQueue::ClearQueueInternal(bool executeDispatchers) {
// Do not permit any more lambdas to be pended to our queue
DispatchThunkBase* pHead;
{
std::priority_queue<autowiring::DispatchThunkDelayed> delayedQueue;
std::lock_guard<std::mutex> lk(m_dispatchLock);
onAborted();
m_dispatchCap = 0;
pHead = m_pHead;
m_pHead = nullptr;
m_pTail = nullptr;
delayedQueue = std::move(m_delayedQueue);
}

// Execute dispatchers if asked to do so
if(executeDispatchers)
while(pHead)
try {
auto next = pHead->m_pFlink;
(*pHead)();
delete pHead;
pHead = next;

// Need to update this as we go along due to the requirements of rundown behavior
m_count--;
}
catch (dispatch_aborted_exception&) {
// Silently ignore, as per documentation
} catch(...) {
// Stop executing dispatchers, nothing we can do here
break;
}

// Destroy everything else. Do so in an unsynchronized context in order to prevent reentrancy.
size_t nTraversed = 0;
for (auto cur = pHead; cur;) {
auto next = cur->m_pFlink;
delete cur;
cur = next;
nTraversed++;
}

// Decrement the count by the number of entries we actually traversed. Abort may potentially
// be called from a lambda function, so assigning this value directly to zero would be an error.
m_count -= nTraversed;

// Wake up anyone who is still waiting:
m_queueUpdated.notify_all();
}

bool DispatchQueue::PromoteReadyDispatchersUnsafe(void) {
// Move all ready elements out of the delayed queue and into the dispatch queue:
size_t nInitial = m_delayedQueue.size();
Expand Down Expand Up @@ -97,35 +147,11 @@ void DispatchQueue::TryDispatchEventUnsafe(std::unique_lock<std::mutex>& lk) {
}

void DispatchQueue::Abort(void) {
// Do not permit any more lambdas to be pended to our queue
DispatchThunkBase* pHead;
{
std::priority_queue<autowiring::DispatchThunkDelayed> delayedQueue;
std::lock_guard<std::mutex> lk(m_dispatchLock);
onAborted();
m_dispatchCap = 0;
pHead = m_pHead;
m_pHead = nullptr;
m_pTail = nullptr;
delayedQueue = std::move(m_delayedQueue);
}

// Destroy the whole dispatch queue. Do so in an unsynchronized context in order to prevent
// reentrancy.
size_t nTraversed = 0;
for (auto cur = pHead; cur;) {
auto next = cur->m_pFlink;
delete cur;
cur = next;
nTraversed++;
}

// Decrement the count by the number of entries we actually traversed. Abort may potentially
// be called from a lambda function, so assigning this value directly to zero would be an error.
m_count -= nTraversed;
ClearQueueInternal(false);
}

// Wake up anyone who is still waiting:
m_queueUpdated.notify_all();
void DispatchQueue::Rundown(void) {
ClearQueueInternal(true);
}

bool DispatchQueue::Cancel(void) {
Expand Down
22 changes: 22 additions & 0 deletions src/autowiring/DispatchQueue.h
Expand Up @@ -114,6 +114,9 @@ class DispatchQueue {
/// </summary>
void SetDispatcherCap(size_t dispatchCap) { m_dispatchCap = dispatchCap; }

// Internal implementation for abort/rundown
void ClearQueueInternal(bool executeDispatchers);

public:
/// <returns>
/// True if there are curerntly any dispatchers ready for execution--IE, DispatchEvent would return true
Expand Down Expand Up @@ -144,6 +147,25 @@ class DispatchQueue {
/// </remarks>
void Abort(void);

/// <summary>
/// Graceful version of Abort
/// </summary>
/// <remarks>
/// In a synchronized context, all attached lambdas are guaranteed to be called when this function returns.
/// No guarantees are made in an unsynchronized context.
///
/// Any delayed dispatchers that are ready at the time of the call will be invoked. All oter delayed
/// dispatchers will be aborted.
///
/// If a dispatcher throws any exception other than dispatch_aborted_exception, the remaining dispatchers
/// will be aborted.
///
/// This method may be safely called from within a dispatcher.
///
/// This method is idempotent.
/// </remarks>
void Rundown(void);

/// <summary>
/// Causes the very first lambda on the dispatch queue to be deleted without running it
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions src/autowiring/test/DispatchQueueTest.cpp
Expand Up @@ -344,3 +344,18 @@ TEST_F(DispatchQueueTest, DelayedAbort) {
dq.Abort();
ASSERT_TRUE(v.unique()) << "A delayed dispatcher was leaked after a call to Abort";
}

TEST_F(DispatchQueueTest, Rundown) {
auto called = std::make_shared<bool>(false);
auto notCalled = std::make_shared<bool>(false);

DispatchQueue dq;
dq += [called] { *called = true; };
dq.Rundown();
ASSERT_TRUE(*called) << "Dispatcher was not invoked during rundown as expected";
ASSERT_TRUE(called.unique()) << "Rundown dispatcher was leaked";

dq += [notCalled] { *notCalled = true; };
ASSERT_FALSE(*notCalled) << "Dispatcher was incorrectly invoked during rundown";
ASSERT_TRUE(notCalled.unique()) << "Rejected dispatcher was leaked";
}

0 comments on commit c8fdffb

Please sign in to comment.