diff --git a/autowiring/CoreContext.h b/autowiring/CoreContext.h index ccc8ee633..635842cef 100644 --- a/autowiring/CoreContext.h +++ b/autowiring/CoreContext.h @@ -22,7 +22,6 @@ #include "MemoEntry.h" #include "once.h" #include "result_or_default.h" -#include "ThreadPool.h" #include "TypeRegistry.h" #include "TypeUnifier.h" @@ -49,10 +48,6 @@ class CoreContextT; template class JunctionBox; -namespace autowiring { - class ThreadPool; -} - /// \file /// CoreContext definitions. @@ -231,10 +226,6 @@ class CoreContext: // Actual core threads: std::list m_threads; - // The thread pool used by this context. By default, a context inherits the thread pool of - // its parent, and the global context gets the system thread pool. - std::shared_ptr m_threadPool; - // The start token for the thread pool, if one exists std::shared_ptr m_startToken; @@ -1066,49 +1057,6 @@ class CoreContext: ); } - /// - /// Assigns the thread pool handler for this context - /// - /// - /// If the context is currently running, the thread pool will automatically be started. The pool's - /// start token and shared pointer is reset automatically when the context is torn down. If the - /// context has already been shut down (IE, IsShutdown returns true), this method has no effect. - /// - /// Dispatchers that have been attached to the current thread pool will not be transitioned to the - /// new pool. Changing the thread pool may cause the previously assigned thread pool to be stopped. - /// This will cause it to complete all work assigned to it and release resources associated with - /// processing. If there are no other handles to the pool, it may potentially destroy itself. - /// - /// It is an error to pass nullptr to this method. - /// - void SetThreadPool(const std::shared_ptr& threadPool); - - /// - /// Returns the current thread pool - /// - /// - /// If the context has been shut down, (IE, IsShutdown returns true), this method returns nullptr. Calling - /// ThreadPool::Start on the returned shared pointer will not cause dispatchers pended to this context to - /// be executed. To do this, invoke CoreContext::Initiate - /// - std::shared_ptr GetThreadPool(void) const; - - /// - /// Submits the specified lambda to this context's ThreadPool for processing - /// - /// True if the job has been submitted for execution - /// - /// The passed thunk will not be executed if the current context has already stopped. - /// - template - bool operator+=(Fx&& fx) { - auto pool = GetThreadPool(); - return - pool ? - pool->Submit(std::make_unique>(std::forward(fx))) : - false; - } - /// /// Adds a post-attachment listener in this context for a particular autowired member. /// There is no guarantee for the context in which the listener will be called. diff --git a/autowiring/SystemThreadPoolStl.h b/autowiring/SystemThreadPoolStl.h index 6b289f1bf..de42ff94d 100644 --- a/autowiring/SystemThreadPoolStl.h +++ b/autowiring/SystemThreadPoolStl.h @@ -2,7 +2,7 @@ #pragma once #include "DispatchQueue.h" #include "SystemThreadPool.h" -#include +#include #include namespace autowiring { diff --git a/autowiring/ThreadPool.h b/autowiring/ThreadPool.h index 91e869829..c429ddec3 100644 --- a/autowiring/ThreadPool.h +++ b/autowiring/ThreadPool.h @@ -1,8 +1,9 @@ // Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved. #pragma once +#include "DispatchThunk.h" #include -#include #include +#include MEMORY_HEADER class DispatchQueue; class DispatchThunkBase; @@ -89,6 +90,18 @@ class ThreadPool: /// be submitted for execution. /// virtual bool Submit(std::unique_ptr&& thunk) = 0; + + /// + /// Submits the specified lambda to this context's ThreadPool for processing + /// + /// True if the job has been submitted for execution + /// + /// The passed thunk will not be executed if the current context has already stopped. + /// + template + bool operator+=(Fx&& fx) { + return Submit(std::make_unique>(std::forward(fx))); + } }; } diff --git a/src/autowiring/CoreContext.cpp b/src/autowiring/CoreContext.cpp index cf69be3f3..c0633efe8 100644 --- a/src/autowiring/CoreContext.cpp +++ b/src/autowiring/CoreContext.cpp @@ -12,7 +12,6 @@ #include "NullPool.h" #include "SystemThreadPool.h" #include "thread_specific_ptr.h" -#include "ThreadPool.h" #include #include @@ -58,8 +57,7 @@ CoreContext::CoreContext(const std::shared_ptr& pParent, t_childLis m_backReference(backReference), m_sigilType(sigilType), m_stateBlock(std::make_shared(pParent ? pParent->m_stateBlock : nullptr)), - m_junctionBoxManager(new JunctionBoxManager), - m_threadPool(std::make_shared()) + m_junctionBoxManager(new JunctionBoxManager) {} CoreContext::~CoreContext(void) { @@ -448,48 +446,10 @@ void CoreContext::Initiate(void) { // Now we can recover the first thread that will need to be started auto beginning = m_threads.begin(); - - // Start our threads before starting any child contexts: - std::shared_ptr threadPool; - auto nullPool = std::dynamic_pointer_cast(m_threadPool); - if (nullPool) { - // Decide which pool will become our current thread pool. Global context is the final case, - // which defaults to the system thread pool - if (!nullPool->GetSuccessor()) - nullPool->SetSuccessor(m_pParent ? m_pParent->GetThreadPool() : SystemThreadPool::New()); - - // Trigger null pool destruction at this point: - m_threadPool = nullPool->MoveDispatchersToSuccessor(); - } - - // The default case should not generally occur, but if it were the case that the null pool were - // updated before the context was initiated, then we would have no work to do as no successors - // exist to be moved. In that case, simply take a record of the current thread pool for the - // call to Start that follows the unlock. - threadPool = m_threadPool; lk.unlock(); onInitiated(); m_stateBlock->m_stateChanged.notify_all(); - // Start the thread pool out of the lock, and then update our start token if our thread pool - // reference has not changed. The next pool could potentially be nullptr if the parent is going - // down while we are going up. - if (threadPool) { - // Initiate - auto startToken = threadPool->Start(); - - // Transfer all dispatchers from the null pool to the new thread pool: - std::lock_guard lk(m_stateBlock->m_lock); - - // If the thread pool was updated while we were trying to start the pool we observed earlier, - // then allow our token to expire and do not do any other work. Whomever caused the thread - // pool pointer to be updated would also have seen that the context is currently started, - // and would have updated both the thread pool pointer and the start token at the same time. - if (m_threadPool == threadPool) - // Swap, not assign; we don't want teardown to happen while synchronized - std::swap(m_startToken, startToken); - } - if (beginning != m_threads.end()) { auto outstanding = m_stateBlock->IncrementOutstandingThreadCount(shared_from_this()); for (auto q = beginning; q != m_threads.end(); ++q) @@ -552,7 +512,6 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) { // Thread pool token and pool pointer std::shared_ptr startToken; - std::shared_ptr threadPool; // Tear down all the children, evict thread pool: { @@ -560,8 +519,6 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) { startToken = std::move(m_startToken); m_startToken.reset(); - threadPool = std::move(m_threadPool); - m_threadPool.reset(); // Fill strong lock series in order to ensure proper teardown interleave: childrenInterleave.reserve(m_children.size()); @@ -717,48 +674,6 @@ void CoreContext::BuildCurrentState(void) { } } -void CoreContext::SetThreadPool(const std::shared_ptr& threadPool) { - if (!threadPool) - throw std::invalid_argument("A context cannot be given a null thread pool"); - - std::shared_ptr priorThreadPool; - { - std::lock_guard lk(m_stateBlock->m_lock); - if (IsShutdown()) - // Nothing to do, context already down - return; - - if (!IsRunning()) { - // Just set up the forwarding thread pool - auto nullPool = std::dynamic_pointer_cast(m_threadPool); - if (!nullPool) - throw autowiring_error("Internal error, null pool was deassigned even though the context has not been started"); - priorThreadPool = nullPool->GetSuccessor(); - nullPool->SetSuccessor(threadPool); - return; - } - - priorThreadPool = m_threadPool; - m_threadPool = threadPool; - } - - // We are presently running. We need to start the pool, and then attempt to - // update our token - auto startToken = threadPool->Start(); - std::lock_guard lk(m_stateBlock->m_lock); - if (m_threadPool != threadPool) - // Thread pool was updated by someone else, let them complete their operation - return; - - // Update our start token and return. Swap, not move; we don't want to risk - // calling destructors while synchronized. - std::swap(m_startToken, startToken); -} - -std::shared_ptr CoreContext::GetThreadPool(void) const { - return (std::lock_guard)m_stateBlock->m_lock, m_threadPool; -} - void CoreContext::Dump(std::ostream& os) const { std::lock_guard lk(m_stateBlock->m_lock); diff --git a/src/autowiring/test/ThreadPoolTest.cpp b/src/autowiring/test/ThreadPoolTest.cpp index 920a43ac2..2029a9d28 100644 --- a/src/autowiring/test/ThreadPoolTest.cpp +++ b/src/autowiring/test/ThreadPoolTest.cpp @@ -11,107 +11,28 @@ #include "SystemThreadPoolWinLH.hpp" #endif +template class ThreadPoolTest: public testing::Test -{}; - -TEST_F(ThreadPoolTest, SimpleSubmission) { - AutoCurrentContext ctxt; - ctxt->Initiate(); - - // Simple validation - auto pool = ctxt->GetThreadPool(); - ASSERT_NE(nullptr, pool.get()) << "Pool can never be null on an initiated context"; - ASSERT_EQ(nullptr, dynamic_cast(pool.get())) << "After context initiation, the pool should not be a null pool"; - ASSERT_TRUE(pool->IsStarted()) << "Pool was not started when the enclosing context was initiated"; - - // Submit a job and then block for its completion. Use a promise to ensure that - // the job is being executed in some other thread context. - auto p = std::make_shared>(); - - *ctxt += [&] { - p->set_value(); - }; - - auto rs = p->get_future(); - ASSERT_EQ(std::future_status::ready, rs.wait_for(std::chrono::seconds(5))) << "Thread pool lambda was not dispatched in a timely fashion"; -} - -TEST_F(ThreadPoolTest, PendBeforeContextStart) { - AutoCurrentContext ctxt; - - // Pend - auto barr = std::make_shared>(); - *ctxt += [barr] { barr->set_value(); }; - ASSERT_EQ(2UL, barr.use_count()) << "Lambda was not correctly captured by an uninitiated context"; - - std::future f = barr->get_future(); - ASSERT_EQ(std::future_status::timeout, f.wait_for(std::chrono::milliseconds(1))) << "A pended lambda was completed before the owning context was intiated"; - - ctxt->Initiate(); - ASSERT_EQ(std::future_status::ready, f.wait_for(std::chrono::seconds(5))) << "A lambda did not complete in a timely fashion after its context was started"; - - // Terminate, verify that we don't capture any more lambdas: - ctxt->SignalShutdown(); - ASSERT_EQ(nullptr, ctxt->GetThreadPool()) << "Thread pool was still present on a terminated context"; - ASSERT_FALSE(*ctxt += [barr] {}) << "Lambda append operation incorrectly evaluated to true"; - ASSERT_TRUE(barr.unique()) << "Lambda was incorrectly captured by a context that was already terminated"; -} - -TEST_F(ThreadPoolTest, ManualThreadPoolBehavior) { - // Make the manual pool that will be used for this test: - auto pool = std::make_shared(); - - // Launch a thread that will join the pool: - std::promise> val; - auto launch = std::async( - std::launch::async, - [pool, &val] { - auto token = pool->PrepareJoin(); - val.set_value(token); - pool->Join(token); - } - ); - - auto valFuture = val.get_future(); - ASSERT_EQ(std::future_status::ready, valFuture.wait_for(std::chrono::seconds(5))) << "Join thread took too much time to start up"; - auto token = valFuture.get(); - - // Set up the context - AutoCurrentContext ctxt; - ctxt->SetThreadPool(pool); - ctxt->Initiate(); - - // Pend some lambdas to be executed by our worker thread: - std::promise hitDone; - std::atomic hitCount{10}; - for (size_t i = hitCount; i--; ) - *ctxt += [&] { - if (!--hitCount) - hitDone.set_value(); - }; - - // Wait for everything to get hit: - auto hitDoneFuture = hitDone.get_future(); - ASSERT_EQ(std::future_status::ready, hitDoneFuture.wait_for(std::chrono::seconds(5))) << "Manual pool did not dispatch lambdas in a timely fashion"; +{ +public: + ThreadPoolTest(void) { + m_pool->SuggestThreadPoolSize(2); + token = m_pool->Start(); + } - // Verify that cancellation of the token causes the manual thread to back out - token->Leave(); - ASSERT_EQ(std::future_status::ready, launch.wait_for(std::chrono::seconds(5))) << "Token cancellation did not correctly release a single waiting thread"; -} + void TearDown(void) { + token.reset(); + } -template -class SystemThreadPoolTest: - public testing::Test -{}; + std::shared_ptr m_pool = std::make_shared(); + std::shared_ptr token; +}; -TYPED_TEST_CASE_P(SystemThreadPoolTest); +TYPED_TEST_CASE_P(ThreadPoolTest); -TYPED_TEST_P(SystemThreadPoolTest, PoolOverload) { +TYPED_TEST_P(ThreadPoolTest, PoolOverload) { AutoCurrentContext ctxt; - auto pool = std::make_shared(); - ctxt->SetThreadPool(pool); - pool->SuggestThreadPoolSize(2); ctxt->Initiate(); size_t cap = 1000; @@ -119,16 +40,16 @@ TYPED_TEST_P(SystemThreadPoolTest, PoolOverload) { auto p = std::make_shared>(); for (size_t i = cap; i--;) - *ctxt += [=] { - if (!--*ctr) - p->set_value(); - }; + *this->m_pool += [=] { + if (!--*ctr) + p->set_value(); + }; auto rs = p->get_future(); ASSERT_EQ(std::future_status::ready, rs.wait_for(std::chrono::seconds(5))) << "Pool saturation did not complete in a timely fashion"; } -REGISTER_TYPED_TEST_CASE_P(SystemThreadPoolTest, PoolOverload); +REGISTER_TYPED_TEST_CASE_P(ThreadPoolTest, PoolOverload); typedef ::testing::Types< #ifdef _MSC_VER @@ -141,4 +62,4 @@ typedef ::testing::Types< autowiring::SystemThreadPoolStl > t_testTypes; -INSTANTIATE_TYPED_TEST_CASE_P(My, SystemThreadPoolTest, t_testTypes); +INSTANTIATE_TYPED_TEST_CASE_P(My, ThreadPoolTest, t_testTypes);