Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to force fully remove the remaining data. #361

Open
runmouse opened this issue Sep 26, 2023 · 4 comments
Open

How to force fully remove the remaining data. #361

runmouse opened this issue Sep 26, 2023 · 4 comments

Comments

@runmouse
Copy link

I did my own statistics when using concurrentqueue, and occasionally there was a problem that the remaining data could not be taken out of the queue.
The detail is:
After no more data enqueue, a small amount of data will remain in the concurrentqueue and cannot be dequeue.
I use try_dequeue_bulk_from_producer() to do dequeue operation.

The reference log is as follows:
enqueue_count:6131058 dequeue_count:6131047 left:11
size_approx : 11

Because enqueue_count and dequeue_count are two variables respectively, and they are both accumulated by a single thread respectively. So the data is accurate.
And the num obtained by using size_approx() are also consistent.

Please tell me how to solve this situation. How to force fully remove the remaining data.

@cameron314
Copy link
Owner

This should not happen. Can you share example code that reproduces the issue?

@runmouse
Copy link
Author

The code is not allowed public. and tangled up with a lot of business logic。
I'am trying use try_dequeue_from_producer follow by example:
https://github.com/cameron314/concurrentqueue/blob/master/samples.md#pump-until-empty

@runmouse
Copy link
Author

runmouse commented Oct 8, 2023

This should not happen. Can you share example code that reproduces the issue?

The following code shows my use of concurrent queue.
There is only one writing thread, there are NUM queues and NUM reading threads. Each reading thread only reads the corresponding queue. The writing thread writes data to all queues.

struct BiggerTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
    static const size_t BLOCK_SIZE = 256;
};


int NUM = 5;

class DataLabel;
extern computerData(DataLabel data);

class Show {
public:
    Show() {
        // create concurrent queue and token
        for (int i = 0; i<NUM ; ++i) {
            mQueue.emplace_back(moodycamel::ConcurrentQueue<DataLabel, BiggerTraits >(1024*1024));
            mPtok.emplace_back(moodycamel::ProducerToken(mQueue.back()));
        }
        // consume threads
        for (int i = 0; i< NUM; ++i) {
            mThread.emplace_back(std::thread(&Show::dequeueData, this, i));
        }
        std::thread(&Show::enqueueData, this);
    }

    void dequeueData(int threadId) {
        while (mRuning) {
            //sleep(1);
            vector<DataLabel> items(32); 
            int num = mQueue[threadId].try_dequeue_bulk_from_producer(mPtok[threadId], items.begin(), 32);
            if (num == 0) {
                LOG(BaseLog, "fetch nothing from Queue %d" , threadId);
                usleep(1000000);
                continue;
            }
            LOG(BaseLog, "fetch %d data from Queue %d" , num, threadId);
            for (int pos=0; pos<num; ++pos) {
                dosomething(threadId, items[pos]);
            }
        }
    }

    void enqueueData() {
        // create and computer data
        int index = randvalue % NUM;
        DataLabel dataLabel(index);
        computerData(dataLabel);
        assert(mQueue[index].enqueue(mPtok[index], dataLabel));
    }


 
    bool mRuning; 
    std::vector<moodycamel::ConcurrentQueue<DataLabel, BiggerTraits > > mQueue;
    std::vector<moodycamel::ProducerToken> mPtok;
    ////thread
    std::vector<std::thread> mThread;

};

@cameron314
Copy link
Owner

Nothing stands out as problematic with this code (besides the call to enqueue within an assert). Can you add a main to this example that reproduces the issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants