Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent order of enqueue and try_dequeue #316

Open
YeahhhhLi opened this issue Sep 21, 2022 · 13 comments
Open

Inconsistent order of enqueue and try_dequeue #316

YeahhhhLi opened this issue Sep 21, 2022 · 13 comments

Comments

@YeahhhhLi
Copy link

Two threads:
one thread continuously generates data per 10ms about and enqueue();
the other thread continuously reads data and processes it. If the queue is empty, it will sleep for 20ms,

but the data obtained by try_dequeue is messy

the code like this:

// thread 1
void SendMessage(message) {
  // message 1:  id = 1
  // message 2:  id = 2
  // message 2:  id = 3
  message_queue_.enqueue(message);
}

// thread 2
void ReceiveMessage() {
   while (is_running_ || message_queue_.size_approx() > 0) {
    if (message_queue_.size_approx() <= 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
      continue;
    }
    CHECK(message_queue_.try_dequeue(message));
    // message 1: id = 3
    // message 2: id = 2
    //  message 3: id = 1
  }
}

So what is the reason for this phenomenon?

@cameron314
Copy link
Owner

That shouldn't happen. Please provide a more complete example program. Are you sure only one thread is enqueuing?

@YeahhhhLi
Copy link
Author

That shouldn't happen. Please provide a more complete example program. Are you sure only one thread is enqueuing?

We use this queue in a time-driven (10ms interval) module, and the underlying scheduling uses coroutine. I don't know if it is related to this.
I added the debug information and found that the tid of the sent message is indeed different, but only one tid generated message is queued at the same time

@cameron314
Copy link
Owner

That explains it then -- different threads will be mapped to different sub-queues internally, with no guarantee of any ordering between them (see the README).

Since you seem to have a single logical thread of execution, you can use a producer token when enqueueing, which will guarantee that all elements end up in the same sub-queue.

Also note that since this appears to be an SPSC use case, consider using my ReaderWriterQueue instead, which is far simpler and can result in less overhead.

@YeahhhhLi
Copy link
Author

That explains it then -- different threads will be mapped to different sub-queues internally, with no guarantee of any ordering between them (see the README).

Since you seem to have a single logical thread of execution, you can use a producer token when enqueueing, which will guarantee that all elements end up in the same sub-queue.

Also note that since this appears to be an SPSC use case, consider using my ReaderWriterQueue instead, which is far simpler and can result in less overhead.

OK, thx~

@ysj1173886760
Copy link

ysj1173886760 commented Apr 26, 2023

@cameron314 I i have the similiar issues here. I was using block concurrent queue as a MPSC queue. Multiple coroutines will keep enqueue elements with unique id, and a single pthread consumer will keep using wait_dequeue_bulk_timed to retrieve batch of elements and process them.

I found that sometimes in a highly concurrent environment, i will insert some items into queue and never managed to pop it back. So i wonder could that issue casued by starvation, since in those concurrent environment, if we cann't ensure FIFO, there might be new items keep on adding in and old items will never get consumed.

Or maybe queue has lost some data.

Really appreciated if you can help me, this confused me for many days.

@cameron314
Copy link
Owner

cameron314 commented Apr 26, 2023

Data loss should not be possible.

Interestingly, starvation is possible, as dequeue is not fair. However, using the version that accepts a consumer token should be fair in this scenario.

Please note that unfortunately, this queue's implementation has MPSC as its worst case for global ordering and performance.

@ysj1173886760
Copy link

That you for answering.

I'm not sure how to dispatch token in MSPC queue to ensure there is no starvation.

Will that be ok if i give each producer a producer token, and a consumer token for consumer. Does the queue guarantees that no producer will get starvation?

Also, since i was using coroutine, multiple producers are dynamic. i.e. i was keep on destroying and creating producer. In that case, is that ok if i always allocate new producer token for new coroutine, or just allocate producer token for every pthread worker under the hood.

@cameron314
Copy link
Owner

Probably best to avoid short-lived producer tokens. But using a consumer token will cause the consumer to cycle between the inner producer sub-queues. Without the token, the consumer just picks the first sub-queue that looks "good enough" without checking them all.

@ysj1173886760
Copy link

It sounds that i don't need producer token but only a consumer token?

What happens to producers if i've add a producer token

@cameron314
Copy link
Owner

Right. You can use just producer tokens, just consumer tokens, neither, or both.

Creating/destroying a producer token has significant overhead. Instead of churning producer tokens, consider using a thread-local (dispatcher-local?) producer token, or none at all.

@ysj1173886760
Copy link

Cool, that helps a lot.

@ysj1173886760
Copy link

Much appreciated, i will do some experiments and see whether it has solved my problem, thanks a lot.

@ysj1173886760
Copy link

By adding just a consumer token solved my problem, no starvation anymore!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants