Skip to content

Commit

Permalink
Merge pull request #845 from leapmotion/ref-parallel
Browse files Browse the repository at this point in the history
Do not hold lock under notification where possible
  • Loading branch information
Veronica Zheng committed Feb 10, 2016
2 parents daee07b + ed1a64b commit 43bd063
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 114 deletions.
17 changes: 10 additions & 7 deletions autowiring/DispatchQueue.h
Expand Up @@ -363,9 +363,9 @@ class DispatchQueue {
// Create the thunk first to reduce the amount of time we spend in lock:
auto thunk = new DispatchThunk<_Fx>(std::forward<_Fx>(fx));

std::unique_lock<std::mutex> lk(m_dispatchLock);
m_dispatchLock.lock();
if (m_count >= m_dispatchCap) {
lk.unlock();
m_dispatchLock.unlock();
delete thunk;
return;
}
Expand All @@ -374,15 +374,18 @@ class DispatchQueue {
m_count++;

// Linked list setup:
if (m_pHead)
m_pTail->m_pFlink = thunk;
if (m_pHead) {
m_pTail->m_pFlink = thunk;
m_pTail = thunk;
m_dispatchLock.unlock();
}
else {
m_pHead = thunk;
m_pHead = m_pTail = thunk;
m_dispatchLock.unlock();
m_queueUpdated.notify_all();
}
m_pTail = thunk;

// Notification as needed:
OnPended(std::move(lk));
OnPended(std::unique_lock<std::mutex>{});
}
};
24 changes: 12 additions & 12 deletions src/autonet/AutoNetServerImpl.cpp
Expand Up @@ -24,15 +24,15 @@ AutoNetServerImpl::AutoNetServerImpl(void):

AutoNetServerImpl::AutoNetServerImpl(std::unique_ptr<AutoNetTransport>&& transport) :
m_transport(std::move(transport))
{
{
// Register internal event handlers
AddEventHandler("terminateContext", [this] (int contextID) {
ResolveContextID(contextID)->SignalShutdown();
});

AddEventHandler("injectContextMember", [this] (int contextID, const std::string& typeName){
std::shared_ptr<CoreContext> ctxt = ResolveContextID(contextID)->shared_from_this();

if(m_AllTypes.find(typeName) != m_AllTypes.end()) {
CurrentContextPusher pshr(ctxt);
m_AllTypes[typeName]();
Expand All @@ -42,11 +42,11 @@ AutoNetServerImpl::AutoNetServerImpl(std::unique_ptr<AutoNetTransport>&& transpo
assert(false);
}
});

AddEventHandler("resumeFromBreakpoint", [this] (const std::string& name){
std::lock_guard<std::mutex> lk(m_breakpoint_mutex);

std::lock_guard<std::mutex>{ m_breakpoint_mutex },
m_breakpoints.erase(name);

m_breakpoint_cv.notify_all();
});

Expand All @@ -73,7 +73,7 @@ AutoNetServer* NewAutoNetServerImpl(void) {
// CoreThread overrides
void AutoNetServerImpl::Run(void){
std::cout << "Starting Autonet server..." << std::endl;

// Register ourselves as a handler
m_transport->SetTransportHandler(std::static_pointer_cast<AutoNetServerImpl>(shared_from_this()));

Expand Down Expand Up @@ -110,7 +110,7 @@ void AutoNetServerImpl::OnMessage(AutoNetTransportHandler::connection_hdl hdl, c

std::string msgType = msg["type"].string_value();
Json::array msgArgs = msg["args"].array_items();

// Handle client specific internal events
if (msgType == "subscribe") {
*this += [this, hdl] {
Expand All @@ -130,7 +130,7 @@ void AutoNetServerImpl::OnMessage(AutoNetTransportHandler::connection_hdl hdl, c
for (const auto& a : msgArgs) {
args.push_back(!a.string_value().empty() ? a.string_value() : a.dump());
}

// call all the handlers
for (const auto& handler : this->m_handlers[msgType]) {
handler(args);
Expand Down Expand Up @@ -270,19 +270,19 @@ void AutoNetServerImpl::SendEvent(const std::string& rawEvent, const std::vector
// Prepend '$' to custum event to avoid namespace collitions with internal events
std::string event("$");
event.append(rawEvent);

Json::array jsonArgs;
for (const auto& a : args) {
jsonArgs.push_back(a);
}

*this += [this, event, jsonArgs] {
for(auto hdl : m_Subscribers) {
Json msg = Json::object{
{"type", event},
{"args", jsonArgs}
};

m_transport->Send(hdl, msg.dump());
}
};
Expand Down
38 changes: 23 additions & 15 deletions src/autowiring/AutoPacket.cpp
Expand Up @@ -35,7 +35,7 @@ AutoPacket::~AutoPacket(void) {
std::chrono::high_resolution_clock::now() - m_initTime
)
);

// Mark decorations of successor packets that use decorations
// originating from this packet as unsatisfiable
for (auto& pair : m_decoration_map)
Expand All @@ -44,15 +44,15 @@ AutoPacket::~AutoPacket(void) {

// Needed for the AutoPacketGraph
NotifyTeardownListeners();

// Create vector of all successor packets that will be destroyed
// This prevents recursive AutoPacket destructor calls
std::vector<std::shared_ptr<AutoPacket>> packets;

// Recurse through unique successors, storing them in our vector
for (AutoPacket* current = this; current->m_successor.unique();) {
packets.push_back(current->m_successor);

// Reset and continue to next successor
AutoPacket* prev_current = current;
current = current->m_successor.get();
Expand Down Expand Up @@ -249,14 +249,14 @@ void AutoPacket::MarkUnsatisfiable(const DecorationKey& key) {

void AutoPacket::MarkSuccessorsUnsatisfiable(DecorationKey key) {
std::lock_guard<std::mutex> lk(m_lock);

// Update key and successor
key.tshift++;
auto successor = SuccessorUnsafe();

while (m_decoration_map.count(key)) {
successor->MarkUnsatisfiable(key);

// Update key and successor
key.tshift++;
successor = successor->Successor();
Expand Down Expand Up @@ -661,12 +661,16 @@ bool AutoPacket::Wait(std::condition_variable& cv, const AutoFilterArgument* inp
SignalStub* stub = (SignalStub*)pObj;

// Completed, mark the output as satisfied and update the condition variable
std::lock_guard<std::mutex>(stub->packet.m_lock);
stub->is_satisfied = true;
std::condition_variable* pcv;
{
std::lock_guard<std::mutex>{stub->packet.m_lock};
stub->is_satisfied = true;
pcv = stub->cv;
}

// Only notify while the condition variable is still valid
if (stub->cv)
stub->cv->notify_all();
if (pcv)
pcv->notify_all();
}
)
)
Expand All @@ -676,12 +680,16 @@ bool AutoPacket::Wait(std::condition_variable& cv, const AutoFilterArgument* inp
// decorations. In that case, the satisfaction flag is left in its initial state
AddTeardownListener(
[stub] {
std::lock_guard<std::mutex>(stub->packet.m_lock);
stub->is_complete = true;
std::condition_variable* pcv;
{
std::lock_guard<std::mutex> lk(stub->packet.m_lock);
stub->is_complete = true;
pcv = stub->cv;
}

// Only notify the condition variable if it's still present
if (stub->cv)
stub->cv->notify_all();
if (pcv)
pcv->notify_all();
}
);

Expand Down
20 changes: 10 additions & 10 deletions src/autowiring/AutoPacketFactory.cpp
Expand Up @@ -22,22 +22,22 @@ std::shared_ptr<AutoPacket> AutoPacketFactory::NewPacket(void) {
throw autowiring_error("Attempted to create a packet on an AutoPacketFactory that was already terminated");
if(!IsRunning())
throw autowiring_error("Cannot create a packet until the AutoPacketFactory is started");

std::shared_ptr<AutoPacketInternal> retVal;
bool isFirstPacket;
{
std::lock_guard<std::mutex> lk(m_lock);

// New packet issued
isFirstPacket = !m_packetCount;
++m_packetCount;

// Create a new next packet
retVal = m_nextPacket;
m_nextPacket = retVal->SuccessorInternal();
m_curPacket = retVal;
}

retVal->Initialize(isFirstPacket);
return retVal;
}
Expand All @@ -62,12 +62,12 @@ std::shared_ptr<void> AutoPacketFactory::GetInternalOutstanding(void) {
retVal = std::shared_ptr<void>(
(void*)1,
[this, outstanding] (void*) mutable {
std::lock_guard<std::mutex> lk(m_lock);
m_stateCondition.notify_all();

// Weak pointer will prevent our lambda from being destroyed, so we manually reset
// the outstanding counter in order to force it to be reset here
std::lock_guard<std::mutex>{m_lock},
outstanding.reset();

m_stateCondition.notify_all();
}
);
m_outstandingInternal = retVal;
Expand Down Expand Up @@ -105,7 +105,7 @@ SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {
bool AutoPacketFactory::OnStart(void) {
// Initialize first packet
m_nextPacket = ConstructPacket();

// Wake us up. We're starting now
m_stateCondition.notify_all();
return true;
Expand All @@ -114,12 +114,12 @@ bool AutoPacketFactory::OnStart(void) {
void AutoPacketFactory::OnStop(bool graceful) {
// Queue of local variables to be destroyed when leaving scope
t_autoFilterSet autoFilters;

// Reset next packet, it will never be issued
m_nextPacket.reset();

// Lock destruction precedes local variables
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex>{m_lock},

// Same story with the AutoFilters
autoFilters.swap(m_autoFilters);
Expand Down
4 changes: 2 additions & 2 deletions src/autowiring/BasicThread.cpp
Expand Up @@ -96,7 +96,7 @@ void BasicThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::sha

// Notify other threads that we are done. At this point, any held references that might still exist
// notification must happen from a synchronized level in order to ensure proper ordering.
std::lock_guard<std::mutex> lk(state->m_lock);
std::lock_guard<std::mutex>{state->m_lock},
state->m_completed = true;
state->m_stateCondition.notify_all();
}
Expand All @@ -114,7 +114,7 @@ void BasicThread::WaitForStateUpdate(const std::function<bool()>& fn) const {
}

void BasicThread::PerformStatusUpdate(const std::function<void()>& fn) const {
std::unique_lock<std::mutex> lk(m_state->m_lock);
std::unique_lock<std::mutex>{m_state->m_lock},
fn();
m_state->m_stateCondition.notify_all();
}
Expand Down
15 changes: 7 additions & 8 deletions src/autowiring/CoreContext.cpp
Expand Up @@ -426,9 +426,6 @@ void CoreContext::Initiate(void) {
return;
}

// State change has taken place, we can signal
m_stateBlock->m_stateChanged.notify_all();

// Now we can add the event receivers we haven't been able to add because the context
// wasn't yet started:
AddEventReceiversUnsafe(std::move(m_delayedEventReceivers));
Expand All @@ -441,6 +438,7 @@ void CoreContext::Initiate(void) {
if (!IsRunning()) {
lk.unlock();
onInitiated();
m_stateBlock->m_stateChanged.notify_all();

// Need to inject a delayed context type so that this context will not be destroyed until
// it has an opportunity to start.
Expand Down Expand Up @@ -471,6 +469,7 @@ void CoreContext::Initiate(void) {
threadPool = m_threadPool;
lk.unlock();
onInitiated();
m_stateBlock->m_stateChanged.notify_all();

// Start the thread pool out of the lock, and then update our start token if our thread pool
// reference has not changed. The next pool could potentially be nullptr if the parent is going
Expand Down Expand Up @@ -539,9 +538,8 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {
firstThreadToStop = m_threads.begin();
if (m_beforeRunning)
++firstThreadToStop;

m_stateBlock->m_stateChanged.notify_all();
}
m_stateBlock->m_stateChanged.notify_all();
onShutdown();

// Teardown interleave assurance--all of these contexts will generally be destroyed
Expand Down Expand Up @@ -1083,12 +1081,12 @@ void CoreContext::TryTransitionChildrenState(void) {
auto q = child->m_threads.begin();
child->m_state = State::Running;

// Child had it's state changed
child->m_stateBlock->m_stateChanged.notify_all();

// Raise the run condition in the child
childLk.unlock();

// Child had it's state changed
child->m_stateBlock->m_stateChanged.notify_all();

auto outstanding = child->m_stateBlock->IncrementOutstandingThreadCount(child);
while (q != child->m_threads.end()) {
(*q)->Start(outstanding);
Expand All @@ -1106,6 +1104,7 @@ void CoreContext::TryTransitionChildrenState(void) {
child->m_state = State::CanRun;

// Child had it's state changed
childLk.unlock();
child->m_stateBlock->m_stateChanged.notify_all();
break;
case State::CanRun:
Expand Down
16 changes: 9 additions & 7 deletions src/autowiring/CoreContextStateBlock.cpp
Expand Up @@ -25,13 +25,15 @@ RunCounter::~RunCounter(void) {
owner.reset();

std::weak_ptr<CoreObject> outstanding;
std::lock_guard<std::mutex> lk(stateBlock->m_lock);

// Unfortunately, this destructor callback is made before weak pointers are
// invalidated, which requires that we manually reset the outstanding count
// We don't want to free memory while holding the lock, so defer
outstanding = std::move(stateBlock->m_outstanding);
stateBlock->m_outstanding.reset();
{
std::lock_guard<std::mutex> lk(stateBlock->m_lock);

// Unfortunately, this destructor callback is made before weak pointers are
// invalidated, which requires that we manually reset the outstanding count
// We don't want to free memory while holding the lock, so defer
outstanding = std::move(stateBlock->m_outstanding);
stateBlock->m_outstanding.reset();
}

// Wake everyone up
stateBlock->m_stateChanged.notify_all();
Expand Down

0 comments on commit 43bd063

Please sign in to comment.