-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to force fully remove the remaining data. #361
Comments
This should not happen. Can you share example code that reproduces the issue? |
The code is not allowed public. and tangled up with a lot of business logic。 |
The following code shows my use of concurrent queue. struct BiggerTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const size_t BLOCK_SIZE = 256;
};
int NUM = 5;
class DataLabel;
extern computerData(DataLabel data);
class Show {
public:
Show() {
// create concurrent queue and token
for (int i = 0; i<NUM ; ++i) {
mQueue.emplace_back(moodycamel::ConcurrentQueue<DataLabel, BiggerTraits >(1024*1024));
mPtok.emplace_back(moodycamel::ProducerToken(mQueue.back()));
}
// consume threads
for (int i = 0; i< NUM; ++i) {
mThread.emplace_back(std::thread(&Show::dequeueData, this, i));
}
std::thread(&Show::enqueueData, this);
}
void dequeueData(int threadId) {
while (mRuning) {
//sleep(1);
vector<DataLabel> items(32);
int num = mQueue[threadId].try_dequeue_bulk_from_producer(mPtok[threadId], items.begin(), 32);
if (num == 0) {
LOG(BaseLog, "fetch nothing from Queue %d" , threadId);
usleep(1000000);
continue;
}
LOG(BaseLog, "fetch %d data from Queue %d" , num, threadId);
for (int pos=0; pos<num; ++pos) {
dosomething(threadId, items[pos]);
}
}
}
void enqueueData() {
// create and computer data
int index = randvalue % NUM;
DataLabel dataLabel(index);
computerData(dataLabel);
assert(mQueue[index].enqueue(mPtok[index], dataLabel));
}
bool mRuning;
std::vector<moodycamel::ConcurrentQueue<DataLabel, BiggerTraits > > mQueue;
std::vector<moodycamel::ProducerToken> mPtok;
////thread
std::vector<std::thread> mThread;
}; |
Nothing stands out as problematic with this code (besides the call to |
I did my own statistics when using concurrentqueue, and occasionally there was a problem that the remaining data could not be taken out of the queue.
The detail is:
After no more data enqueue, a small amount of data will remain in the concurrentqueue and cannot be dequeue.
I use try_dequeue_bulk_from_producer() to do dequeue operation.
The reference log is as follows:
enqueue_count:6131058 dequeue_count:6131047 left:11
size_approx : 11
Because enqueue_count and dequeue_count are two variables respectively, and they are both accumulated by a single thread respectively. So the data is accurate.
And the num obtained by using size_approx() are also consistent.
Please tell me how to solve this situation. How to force fully remove the remaining data.
The text was updated successfully, but these errors were encountered: