Skip to content

Commit

Permalink
Merge pull request #848 from leapmotion/dep-threadpool
Browse files Browse the repository at this point in the history
Deprecate `CoreContext` thread pools
  • Loading branch information
Veronica Zheng committed Feb 10, 2016
2 parents 43bd063 + 6e3b667 commit 9eee4b9
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 240 deletions.
52 changes: 0 additions & 52 deletions autowiring/CoreContext.h
Expand Up @@ -22,7 +22,6 @@
#include "MemoEntry.h"
#include "once.h"
#include "result_or_default.h"
#include "ThreadPool.h"
#include "TypeRegistry.h"
#include "TypeUnifier.h"

Expand All @@ -49,10 +48,6 @@ class CoreContextT;
template<typename T>
class JunctionBox;

namespace autowiring {
class ThreadPool;
}

/// \file
/// CoreContext definitions.

Expand Down Expand Up @@ -231,10 +226,6 @@ class CoreContext:
// Actual core threads:
std::list<CoreRunnable*> m_threads;

// The thread pool used by this context. By default, a context inherits the thread pool of
// its parent, and the global context gets the system thread pool.
std::shared_ptr<autowiring::ThreadPool> m_threadPool;

// The start token for the thread pool, if one exists
std::shared_ptr<void> m_startToken;

Expand Down Expand Up @@ -1066,49 +1057,6 @@ class CoreContext:
);
}

/// <summary>
/// Assigns the thread pool handler for this context
/// </summary>
/// <remarks>
/// If the context is currently running, the thread pool will automatically be started. The pool's
/// start token and shared pointer is reset automatically when the context is torn down. If the
/// context has already been shut down (IE, IsShutdown returns true), this method has no effect.
///
/// Dispatchers that have been attached to the current thread pool will not be transitioned to the
/// new pool. Changing the thread pool may cause the previously assigned thread pool to be stopped.
/// This will cause it to complete all work assigned to it and release resources associated with
/// processing. If there are no other handles to the pool, it may potentially destroy itself.
///
/// It is an error to pass nullptr to this method.
/// </remarks>
void SetThreadPool(const std::shared_ptr<autowiring::ThreadPool>& threadPool);

/// <summary>
/// Returns the current thread pool
/// </summary>
/// <remarks>
/// If the context has been shut down, (IE, IsShutdown returns true), this method returns nullptr. Calling
/// ThreadPool::Start on the returned shared pointer will not cause dispatchers pended to this context to
/// be executed. To do this, invoke CoreContext::Initiate
/// </remarks>
std::shared_ptr<autowiring::ThreadPool> GetThreadPool(void) const;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
auto pool = GetThreadPool();
return
pool ?
pool->Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx))) :
false;
}

/// <summary>
/// Adds a post-attachment listener in this context for a particular autowired member.
/// There is no guarantee for the context in which the listener will be called.
Expand Down
2 changes: 1 addition & 1 deletion autowiring/SystemThreadPoolStl.h
Expand Up @@ -2,7 +2,7 @@
#pragma once
#include "DispatchQueue.h"
#include "SystemThreadPool.h"
#include <thread>
#include <thread>
#include <vector>

namespace autowiring {
Expand Down
15 changes: 14 additions & 1 deletion autowiring/ThreadPool.h
@@ -1,8 +1,9 @@
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#pragma once
#include "DispatchThunk.h"
#include <atomic>
#include <memory>
#include <mutex>
#include MEMORY_HEADER

class DispatchQueue;
class DispatchThunkBase;
Expand Down Expand Up @@ -89,6 +90,18 @@ class ThreadPool:
/// be submitted for execution.
/// </remarks>
virtual bool Submit(std::unique_ptr<DispatchThunkBase>&& thunk) = 0;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
return Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx)));
}
};

}
87 changes: 1 addition & 86 deletions src/autowiring/CoreContext.cpp
Expand Up @@ -12,7 +12,6 @@
#include "NullPool.h"
#include "SystemThreadPool.h"
#include "thread_specific_ptr.h"
#include "ThreadPool.h"
#include <sstream>
#include <stdexcept>

Expand Down Expand Up @@ -58,8 +57,7 @@ CoreContext::CoreContext(const std::shared_ptr<CoreContext>& pParent, t_childLis
m_backReference(backReference),
m_sigilType(sigilType),
m_stateBlock(std::make_shared<CoreContextStateBlock>(pParent ? pParent->m_stateBlock : nullptr)),
m_junctionBoxManager(new JunctionBoxManager),
m_threadPool(std::make_shared<NullPool>())
m_junctionBoxManager(new JunctionBoxManager)
{}

CoreContext::~CoreContext(void) {
Expand Down Expand Up @@ -448,48 +446,10 @@ void CoreContext::Initiate(void) {

// Now we can recover the first thread that will need to be started
auto beginning = m_threads.begin();

// Start our threads before starting any child contexts:
std::shared_ptr<ThreadPool> threadPool;
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (nullPool) {
// Decide which pool will become our current thread pool. Global context is the final case,
// which defaults to the system thread pool
if (!nullPool->GetSuccessor())
nullPool->SetSuccessor(m_pParent ? m_pParent->GetThreadPool() : SystemThreadPool::New());

// Trigger null pool destruction at this point:
m_threadPool = nullPool->MoveDispatchersToSuccessor();
}

// The default case should not generally occur, but if it were the case that the null pool were
// updated before the context was initiated, then we would have no work to do as no successors
// exist to be moved. In that case, simply take a record of the current thread pool for the
// call to Start that follows the unlock.
threadPool = m_threadPool;
lk.unlock();
onInitiated();
m_stateBlock->m_stateChanged.notify_all();

// Start the thread pool out of the lock, and then update our start token if our thread pool
// reference has not changed. The next pool could potentially be nullptr if the parent is going
// down while we are going up.
if (threadPool) {
// Initiate
auto startToken = threadPool->Start();

// Transfer all dispatchers from the null pool to the new thread pool:
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

// If the thread pool was updated while we were trying to start the pool we observed earlier,
// then allow our token to expire and do not do any other work. Whomever caused the thread
// pool pointer to be updated would also have seen that the context is currently started,
// and would have updated both the thread pool pointer and the start token at the same time.
if (m_threadPool == threadPool)
// Swap, not assign; we don't want teardown to happen while synchronized
std::swap(m_startToken, startToken);
}

if (beginning != m_threads.end()) {
auto outstanding = m_stateBlock->IncrementOutstandingThreadCount(shared_from_this());
for (auto q = beginning; q != m_threads.end(); ++q)
Expand Down Expand Up @@ -552,16 +512,13 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {

// Thread pool token and pool pointer
std::shared_ptr<void> startToken;
std::shared_ptr<ThreadPool> threadPool;

// Tear down all the children, evict thread pool:
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

startToken = std::move(m_startToken);
m_startToken.reset();
threadPool = std::move(m_threadPool);
m_threadPool.reset();

// Fill strong lock series in order to ensure proper teardown interleave:
childrenInterleave.reserve(m_children.size());
Expand Down Expand Up @@ -717,48 +674,6 @@ void CoreContext::BuildCurrentState(void) {
}
}

void CoreContext::SetThreadPool(const std::shared_ptr<ThreadPool>& threadPool) {
if (!threadPool)
throw std::invalid_argument("A context cannot be given a null thread pool");

std::shared_ptr<ThreadPool> priorThreadPool;
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (IsShutdown())
// Nothing to do, context already down
return;

if (!IsRunning()) {
// Just set up the forwarding thread pool
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (!nullPool)
throw autowiring_error("Internal error, null pool was deassigned even though the context has not been started");
priorThreadPool = nullPool->GetSuccessor();
nullPool->SetSuccessor(threadPool);
return;
}

priorThreadPool = m_threadPool;
m_threadPool = threadPool;
}

// We are presently running. We need to start the pool, and then attempt to
// update our token
auto startToken = threadPool->Start();
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (m_threadPool != threadPool)
// Thread pool was updated by someone else, let them complete their operation
return;

// Update our start token and return. Swap, not move; we don't want to risk
// calling destructors while synchronized.
std::swap(m_startToken, startToken);
}

std::shared_ptr<ThreadPool> CoreContext::GetThreadPool(void) const {
return (std::lock_guard<std::mutex>)m_stateBlock->m_lock, m_threadPool;
}

void CoreContext::Dump(std::ostream& os) const {
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

Expand Down

0 comments on commit 9eee4b9

Please sign in to comment.