From f2410e5cea87076826ebba092aac498e9a272f6e Mon Sep 17 00:00:00 2001 From: Jason Lokerson Date: Fri, 20 May 2016 20:15:00 -0700 Subject: [PATCH] Add Rundown behavior to DispatchQueue This behavior is necessary to get the `CoreThread` teardown behavior to function properly. It is also specifically mentioned in the documentation for `Abort`, so it's probably a good idea that we actually implement it. Also use this behavior in `CoreThread::DoRunLoopCleanup`. There is a race condition that occurs in the current implementation which can arise when a lambda is pended at about the same time as termination happens; this lambda can sometimes be leaked. --- src/autowiring/CoreThread.cpp | 11 +-- src/autowiring/DispatchQueue.cpp | 82 +++++++++++++++-------- src/autowiring/DispatchQueue.h | 22 ++++++ src/autowiring/test/DispatchQueueTest.cpp | 15 +++++ 4 files changed, 94 insertions(+), 36 deletions(-) diff --git a/src/autowiring/CoreThread.cpp b/src/autowiring/CoreThread.cpp index 9388019db..ef27a6f45 100644 --- a/src/autowiring/CoreThread.cpp +++ b/src/autowiring/CoreThread.cpp @@ -11,15 +11,10 @@ CoreThread::CoreThread(const char* pName): CoreThread::~CoreThread(void){} void CoreThread::DoRunLoopCleanup(std::shared_ptr&& ctxt, std::shared_ptr&& refTracker) { - try { - // If we are asked to rundown while we still have elements in our dispatch queue, - // we must try to process them: + // Kill everything in the dispatch queue and also run it down + { CurrentContextPusher pshr(ctxt); - DispatchAllEvents(); - } - catch(...) { - // We failed to run down the dispatch queue gracefully, we now need to abort it - Abort(); + Rundown(); } // Handoff to base class: diff --git a/src/autowiring/DispatchQueue.cpp b/src/autowiring/DispatchQueue.cpp index 41fa2fccd..381409455 100644 --- a/src/autowiring/DispatchQueue.cpp +++ b/src/autowiring/DispatchQueue.cpp @@ -29,6 +29,56 @@ DispatchQueue::~DispatchQueue(void) { } } +void DispatchQueue::ClearQueueInternal(bool executeDispatchers) { + // Do not permit any more lambdas to be pended to our queue + DispatchThunkBase* pHead; + { + std::priority_queue delayedQueue; + std::lock_guard lk(m_dispatchLock); + onAborted(); + m_dispatchCap = 0; + pHead = m_pHead; + m_pHead = nullptr; + m_pTail = nullptr; + delayedQueue = std::move(m_delayedQueue); + } + + // Execute dispatchers if asked to do so + if(executeDispatchers) + while(pHead) + try { + auto next = pHead->m_pFlink; + (*pHead)(); + delete pHead; + pHead = next; + + // Need to update this as we go along due to the requirements of rundown behavior + m_count--; + } + catch (dispatch_aborted_exception&) { + // Silently ignore, as per documentation + } catch(...) { + // Stop executing dispatchers, nothing we can do here + break; + } + + // Destroy everything else. Do so in an unsynchronized context in order to prevent reentrancy. + size_t nTraversed = 0; + for (auto cur = pHead; cur;) { + auto next = cur->m_pFlink; + delete cur; + cur = next; + nTraversed++; + } + + // Decrement the count by the number of entries we actually traversed. Abort may potentially + // be called from a lambda function, so assigning this value directly to zero would be an error. + m_count -= nTraversed; + + // Wake up anyone who is still waiting: + m_queueUpdated.notify_all(); +} + bool DispatchQueue::PromoteReadyDispatchersUnsafe(void) { // Move all ready elements out of the delayed queue and into the dispatch queue: size_t nInitial = m_delayedQueue.size(); @@ -97,35 +147,11 @@ void DispatchQueue::TryDispatchEventUnsafe(std::unique_lock& lk) { } void DispatchQueue::Abort(void) { - // Do not permit any more lambdas to be pended to our queue - DispatchThunkBase* pHead; - { - std::priority_queue delayedQueue; - std::lock_guard lk(m_dispatchLock); - onAborted(); - m_dispatchCap = 0; - pHead = m_pHead; - m_pHead = nullptr; - m_pTail = nullptr; - delayedQueue = std::move(m_delayedQueue); - } - - // Destroy the whole dispatch queue. Do so in an unsynchronized context in order to prevent - // reentrancy. - size_t nTraversed = 0; - for (auto cur = pHead; cur;) { - auto next = cur->m_pFlink; - delete cur; - cur = next; - nTraversed++; - } - - // Decrement the count by the number of entries we actually traversed. Abort may potentially - // be called from a lambda function, so assigning this value directly to zero would be an error. - m_count -= nTraversed; + ClearQueueInternal(false); +} - // Wake up anyone who is still waiting: - m_queueUpdated.notify_all(); +void DispatchQueue::Rundown(void) { + ClearQueueInternal(true); } bool DispatchQueue::Cancel(void) { diff --git a/src/autowiring/DispatchQueue.h b/src/autowiring/DispatchQueue.h index 94f0b8a27..4b7b8fb66 100644 --- a/src/autowiring/DispatchQueue.h +++ b/src/autowiring/DispatchQueue.h @@ -114,6 +114,9 @@ class DispatchQueue { /// void SetDispatcherCap(size_t dispatchCap) { m_dispatchCap = dispatchCap; } + // Internal implementation for abort/rundown + void ClearQueueInternal(bool executeDispatchers); + public: /// /// True if there are curerntly any dispatchers ready for execution--IE, DispatchEvent would return true @@ -144,6 +147,25 @@ class DispatchQueue { /// void Abort(void); + /// + /// Graceful version of Abort + /// + /// + /// In a synchronized context, all attached lambdas are guaranteed to be called when this function returns. + /// No guarantees are made in an unsynchronized context. + /// + /// Any delayed dispatchers that are ready at the time of the call will be invoked. All oter delayed + /// dispatchers will be aborted. + /// + /// If a dispatcher throws any exception other than dispatch_aborted_exception, the remaining dispatchers + /// will be aborted. + /// + /// This method may be safely called from within a dispatcher. + /// + /// This method is idempotent. + /// + void Rundown(void); + /// /// Causes the very first lambda on the dispatch queue to be deleted without running it /// diff --git a/src/autowiring/test/DispatchQueueTest.cpp b/src/autowiring/test/DispatchQueueTest.cpp index 23cb0a470..5a851c60f 100644 --- a/src/autowiring/test/DispatchQueueTest.cpp +++ b/src/autowiring/test/DispatchQueueTest.cpp @@ -344,3 +344,18 @@ TEST_F(DispatchQueueTest, DelayedAbort) { dq.Abort(); ASSERT_TRUE(v.unique()) << "A delayed dispatcher was leaked after a call to Abort"; } + +TEST_F(DispatchQueueTest, Rundown) { + auto called = std::make_shared(false); + auto notCalled = std::make_shared(false); + + DispatchQueue dq; + dq += [called] { *called = true; }; + dq.Rundown(); + ASSERT_TRUE(*called) << "Dispatcher was not invoked during rundown as expected"; + ASSERT_TRUE(called.unique()) << "Rundown dispatcher was leaked"; + + dq += [notCalled] { *notCalled = true; }; + ASSERT_FALSE(*notCalled) << "Dispatcher was incorrectly invoked during rundown"; + ASSERT_TRUE(notCalled.unique()) << "Rejected dispatcher was leaked"; +}