/
DispatchQueue.cpp
415 lines (345 loc) · 12.2 KB
/
DispatchQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#include "stdafx.h"
#include "DispatchQueue.h"
#include "at_exit.h"
#include <assert.h>
using namespace autowiring;
DispatchQueue::DispatchQueue(void) {}
DispatchQueue::DispatchQueue(size_t dispatchCap):
m_dispatchCap(dispatchCap)
{}
DispatchQueue::DispatchQueue(DispatchQueue&& q):
onAborted(std::move(q.onAborted)),
m_dispatchCap(q.m_dispatchCap)
{
if (!onAborted)
*this += std::move(q);
}
DispatchQueue::~DispatchQueue(void) {
// Wipe out each entry in the queue, we can't call any of them because we're in teardown
for (auto cur = m_pHead; cur;) {
auto next = cur->m_pFlink;
delete cur;
cur = next;
}
}
void DispatchQueue::ClearQueueInternal(bool executeDispatchers) {
// Do not permit any more lambdas to be pended to our queue
DispatchThunkBase* pHead;
{
std::priority_queue<autowiring::DispatchThunkDelayed> delayedQueue;
std::lock_guard<std::mutex> lk(m_dispatchLock);
onAborted();
m_dispatchCap = 0;
pHead = m_pHead;
m_pHead = nullptr;
m_pTail = nullptr;
delayedQueue = std::move(m_delayedQueue);
}
// Execute dispatchers if asked to do so
if(executeDispatchers)
while(pHead)
try {
auto next = pHead->m_pFlink;
(*pHead)();
delete pHead;
pHead = next;
// Need to update this as we go along due to the requirements of rundown behavior
m_count--;
}
catch (dispatch_aborted_exception&) {
// Silently ignore, as per documentation
} catch(...) {
// Stop executing dispatchers, nothing we can do here
break;
}
// Destroy everything else. Do so in an unsynchronized context in order to prevent reentrancy.
size_t nTraversed = 0;
for (auto cur = pHead; cur;) {
auto next = cur->m_pFlink;
delete cur;
cur = next;
nTraversed++;
}
// Decrement the count by the number of entries we actually traversed. Abort may potentially
// be called from a lambda function, so assigning this value directly to zero would be an error.
m_count -= nTraversed;
// Wake up anyone who is still waiting:
m_queueUpdated.notify_all();
}
bool DispatchQueue::PromoteReadyDispatchersUnsafe(void) {
// Move all ready elements out of the delayed queue and into the dispatch queue:
size_t nInitial = m_delayedQueue.size();
// String together a chain of things that will be made ready:
for (
auto now = std::chrono::steady_clock::now();
!m_delayedQueue.empty() && m_delayedQueue.top().GetReadyTime() < now;
m_delayedQueue.pop()
) {
// Update tail if head is already set, otherwise update head:
auto thunk = m_delayedQueue.top().GetThunk().release();
if (m_pHead)
m_pTail->m_pFlink = thunk;
else
m_pHead = thunk;
m_pTail = thunk;
m_count++;
}
// Something was promoted if the dispatch queue size is different
return nInitial != m_delayedQueue.size();
}
void DispatchQueue::DispatchEventUnsafe(std::unique_lock<std::mutex>& lk) {
// Pull the ready thunk off of the front of the queue and pop it while we hold the lock.
// Then, we will excecute the call while the lock has been released so we do not create
// deadlocks.
std::unique_ptr<DispatchThunkBase> thunk(m_pHead);
m_pHead = thunk->m_pFlink;
lk.unlock();
MakeAtExit([&] {
if (!--m_count) {
// Notify that we have hit zero:
std::lock_guard<std::mutex>{ *lk.mutex() };
m_queueUpdated.notify_all();
}
}),
(*thunk)();
}
void DispatchQueue::TryDispatchEventUnsafe(std::unique_lock<std::mutex>& lk) {
// Pull the ready thunk off of the front of the queue and pop it while we hold the lock.
// Then, we will excecute the call while the lock has been released so we do not create
// deadlocks.
DispatchThunkBase* pThunk = m_pHead;
m_pHead = pThunk->m_pFlink;
lk.unlock();
try { (*pThunk)(); }
catch (...) {
// Failed to execute thunk, put it back
lk.lock();
pThunk->m_pFlink = m_pHead;
m_pHead = pThunk;
throw;
}
if (!--m_count) {
// Notify that we have hit zero:
std::lock_guard<std::mutex>{ *lk.mutex() };
m_queueUpdated.notify_all();
}
delete pThunk;
}
void DispatchQueue::Abort(void) {
ClearQueueInternal(false);
}
void DispatchQueue::Rundown(void) {
ClearQueueInternal(true);
}
bool DispatchQueue::Cancel(void) {
// Holds the cancelled thunk, declared here so that we delete it out of the lock
std::unique_ptr<DispatchThunkBase> thunk;
std::lock_guard<std::mutex> lk(m_dispatchLock);
if(m_pHead) {
// Found a ready thunk, run from here:
thunk.reset(m_pHead);
m_pHead = thunk->m_pFlink;
}
else if (!m_delayedQueue.empty()) {
auto& f = m_delayedQueue.top();
thunk = std::move(f.GetThunk());
m_delayedQueue.pop();
}
else
// Nothing to cancel!
return false;
return true;
}
void DispatchQueue::WakeAllWaitingThreads(void) {
m_version++;
m_queueUpdated.notify_all();
}
void DispatchQueue::WaitForEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");
// Unconditional delay:
uint64_t version = m_version;
m_queueUpdated.wait(
lk,
[this, version] {
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");
return
// We will need to transition out if the delay queue receives any items:
!this->m_delayedQueue.empty() ||
// We also transition out if the dispatch queue has any events:
this->m_pHead ||
// Or, finally, if the versions don't match
version != m_version;
}
);
if (m_pHead) {
// We have an event, we can just hop over to this variant:
DispatchEventUnsafe(lk);
return;
}
if (!m_delayedQueue.empty())
// The delay queue has items but the dispatch queue does not, we need to switch
// to the suggested sleep timeout variant:
WaitForEventUnsafe(lk, m_delayedQueue.top().GetReadyTime());
}
bool DispatchQueue::WaitForEvent(std::chrono::milliseconds milliseconds) {
return WaitForEvent(std::chrono::steady_clock::now() + milliseconds);
}
bool DispatchQueue::WaitForEvent(std::chrono::steady_clock::time_point wakeTime) {
if (wakeTime == std::chrono::steady_clock::time_point::max())
// Maximal wait--we can optimize by using the zero-arguments version
return WaitForEvent(), true;
std::unique_lock<std::mutex> lk(m_dispatchLock);
return WaitForEventUnsafe(lk, wakeTime);
}
bool DispatchQueue::WaitForEventUnsafe(std::unique_lock<std::mutex>& lk, std::chrono::steady_clock::time_point wakeTime) {
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted prior to waiting for an event");
while (!m_pHead) {
// Derive a wakeup time using the high precision timer:
wakeTime = SuggestSoonestWakeupTimeUnsafe(wakeTime);
// Now we wait, either for the timeout to elapse or for the dispatch queue itself to
// transition to the "aborted" state.
std::cv_status status = m_queueUpdated.wait_until(lk, wakeTime);
// Short-circuit if the queue was aborted
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted while waiting for an event");
if (PromoteReadyDispatchersUnsafe())
// Dispatcher is ready to run! Exit our loop and dispatch an event
break;
if (status == std::cv_status::timeout)
// Can't proceed, queue is empty and nobody is ready to be run
return false;
}
DispatchEventUnsafe(lk);
return true;
}
bool DispatchQueue::DispatchEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
// If the queue is empty and we fail to promote anything, return here
// Note that, due to short-circuiting, promotion will not take place if the queue is not empty.
// This behavior is by design.
if (!m_pHead && !PromoteReadyDispatchersUnsafe())
return false;
DispatchEventUnsafe(lk);
return true;
}
bool DispatchQueue::TryDispatchEvent(void) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
if (!m_pHead && !PromoteReadyDispatchersUnsafe())
return false;
TryDispatchEventUnsafe(lk);
return true;
}
int DispatchQueue::DispatchAllEvents(void) {
int retVal = 0;
while(DispatchEvent())
retVal++;
return retVal;
}
void DispatchQueue::PendExisting(std::unique_lock<std::mutex>&& lk, DispatchThunkBase* thunk) {
// Count must be separately maintained:
m_count++;
// Linked list setup:
if (m_pHead)
m_pTail->m_pFlink = thunk;
else {
m_pHead = thunk;
m_queueUpdated.notify_all();
}
m_pTail = thunk;
// Notification as needed:
OnPended(std::move(lk));
}
bool DispatchQueue::Barrier(std::chrono::nanoseconds timeout) {
// Do not block or lock in the event of a no-wait check
if (timeout.count() == 0)
return m_count == 0;
// Now lock and double-check:
std::unique_lock<std::mutex> lk(m_dispatchLock);
// Short-circuit if dispatching has been aborted
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted before a timed wait was attempted");
// Set up the lambda. Note that the queue size CANNOT be 1, because we just checked to verify
// that it is non-empty. Thus, we do not need to signal the m_queueUpdated condition variable.
auto complete = std::make_shared<bool>(false);
auto lambda = [complete] { *complete = true; };
PendExisting(
std::move(lk),
new DispatchThunk<decltype(lambda)>(std::move(lambda))
);
if (!lk.owns_lock())
lk.lock();
// Wait until our variable is satisfied, which might be right away:
bool rv = m_queueUpdated.wait_for(lk, timeout, [&] { return onAborted || *complete; });
if (onAborted)
throw dispatch_aborted_exception("Dispatch queue was aborted during a timed wait");
return rv;
}
void DispatchQueue::Barrier(void) {
// Set up the lambda:
bool complete = false;
*this += [&] { complete = true; };
// Obtain the lock, wait until our variable is satisfied, which might be right away:
std::unique_lock<std::mutex> lk(m_dispatchLock);
m_queueUpdated.wait(lk, [&] { return onAborted || complete; });
if (onAborted)
// At this point, the dispatch queue MUST be completely run down. We have no outstanding references
// to our stack-allocated "complete" variable. Furthermore, after m_aborted is true, no further
// dispatchers are permitted to be run.
throw dispatch_aborted_exception("Dispatch queue was aborted while a barrier was invoked");
}
std::chrono::steady_clock::time_point
DispatchQueue::SuggestSoonestWakeupTimeUnsafe(std::chrono::steady_clock::time_point latestTime) const {
return
m_delayedQueue.empty() ?
// Nothing in the queue, no way to suggest a shorter time
latestTime :
// Return the shorter of the maximum wait time and the time of the queue ready--we don't want to tell the
// caller to wait longer than the limit of their interest.
std::min(
m_delayedQueue.top().GetReadyTime(),
latestTime
);
}
void DispatchQueue::operator+=(DispatchQueue&& rhs) {
std::unique_lock<std::mutex> lk(m_dispatchLock);
// Append thunks to our queue
if (m_pHead)
m_pTail->m_pFlink = rhs.m_pHead;
else
m_pHead = rhs.m_pHead;
m_pTail = rhs.m_pTail;
m_count += rhs.m_count;
// Clear queue from rhs
rhs.m_pHead = nullptr;
rhs.m_pTail = nullptr;
rhs.m_count = 0;
// Append delayed thunks
while (!rhs.m_delayedQueue.empty()) {
const auto& top = rhs.m_delayedQueue.top();
m_delayedQueue.emplace(top.GetReadyTime(), top.GetThunk().release());
rhs.m_delayedQueue.pop();
}
// Notification as needed:
m_queueUpdated.notify_all();
OnPended(std::move(lk));
}
DispatchQueue::DispatchThunkDelayedExpressionAbs DispatchQueue::operator+=(std::chrono::steady_clock::time_point rhs) {
return{this, rhs};
}
void DispatchQueue::operator+=(DispatchThunkDelayed&& rhs) {
bool shouldNotify;
{
std::lock_guard<std::mutex> lk(m_dispatchLock);
m_delayedQueue.push(std::forward<DispatchThunkDelayed>(rhs));
shouldNotify = m_delayedQueue.top().GetReadyTime() == rhs.GetReadyTime() && !m_count;
}
if(shouldNotify)
// We're becoming the new next-to-execute entity, dispatch queue currently empty, trigger wakeup
// so our newly pended delay thunk is eventually processed.
m_queueUpdated.notify_all();
}