/
task_blocker.hpp
91 lines (75 loc) · 1.74 KB
/
task_blocker.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#pragma once
#include "monitor.hpp"
#include <chrono>
#include <condition_variable>
#include <deque>
namespace cu
{
class TaskBlocker
{
public:
explicit TaskBlocker(
std::chrono::steady_clock::duration maxLatency,
std::size_t maxQueueLength = 0 )
: maxLatency_( std::move( maxLatency ) )
, maxQueueLength_( maxQueueLength )
{}
void push()
{
data_( cu::PassUniqueLockTag{},
[&]( Data & data, std::unique_lock<std::mutex> & lock )
{
data.conditionVariable.wait( lock, [&]()
{
return
( maxQueueLength_ == 0 ||
data.pushTimes.size() < maxQueueLength_ ) &&
( data.pushTimes.empty() ||
data.pushTimes.front() >= std::chrono::steady_clock::now() - maxLatency_ );
} );
data.pushTimes.push_back( std::chrono::steady_clock::now() );
} );
}
void pop()
{
data_( []( Data & data )
{
data.pushTimes.pop_front();
data.conditionVariable.notify_all();
} );
}
private:
struct Data
{
std::deque<std::chrono::steady_clock::time_point> pushTimes;
std::condition_variable conditionVariable;
};
cu::Monitor<Data> data_;
const std::chrono::steady_clock::duration maxLatency_;
const std::size_t maxQueueLength_ = 0;
};
class TaskBlockerItem
{
public:
explicit TaskBlockerItem( TaskBlocker & taskBlocker )
: taskBlocker_(taskBlocker)
{
taskBlocker.push();
}
~TaskBlockerItem()
{
taskBlocker_.pop();
}
private:
TaskBlocker & taskBlocker_;
};
class SharedTaskBlockerItem
{
public:
explicit SharedTaskBlockerItem( TaskBlocker & taskBlocker )
: itemPtr_( std::make_shared<TaskBlockerItem>( taskBlocker ) )
{}
private:
std::shared_ptr<const void> itemPtr_;
};
} // namespace cu