Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Leak #364

Open
tolgatanriverdi opened this issue Oct 17, 2023 · 6 comments
Open

Memory Leak #364

tolgatanriverdi opened this issue Oct 17, 2023 · 6 comments

Comments

@tolgatanriverdi
Copy link

tolgatanriverdi commented Oct 17, 2023

Hi
Sorry for the previous issue(mistakenly opened with a wrong user)

We are using concurrentqueue in our trading application for transfering data between threads, and we are very pleased about the performance of this queue.
However there is a significant memory leak problem in the library, We are transferring all market data from 1 producer to multiple consumers(approx 2-3 consumers) but creating 10,15 different queues and when we do that the memory usage increases 10mb in every 1 seconds, so at the end of the day we are consuming at least 200-300gb of memory.
When we remove the concurrentqueue and replace it with intel tbb concurrent queue library, memory leak dissapears.
Here is how we use the concurrentqueue: (We also have the same problem on spsc queue) . I'm also attaching the memory usage of our server when we continously use the concurrentqueue (In the graph at the time 16.00 and 17.00 we restarted the app)

#pragma once

#include <memory>
#include "IdGenerator.h"
#include "../Globals/Enumarations.h"
#include "../Containers/concurrentqueue.h"

template<class T = char*, class L = moodycamel::ConcurrentQueue<T>>

class Subscriber: private IdGenerator {
public:
	Subscriber() : id(++idGenerater) {
	}
	virtual ~Subscriber(){
	}

	void add(const T& message) {
		messageList.enqueue(message);
	}
	bool tryPop(T& item) {
		return messageList.try_dequeue(item);
	}
	L& getList() {
		return messageList;
	}
	unsigned short getId()const {
		return id;
	}
	void clear()
	{
	}
protected:
	const short id;
	L messageList;
};

template<class T, class L> using SubscriberPtr = std::shared_ptr<Subscriber<T, L>>;
memory
@cameron314
Copy link
Owner

Copying my questions here:

Are the consumers keeping up with the producers? Are there many temporary threads enqueueing elements? Is this reproducible with the latest version of the queue?

Also, 100 MB per second is 360 GB per hour, but only 200 GB are in use at the end of the day. Are you sure this is a leak and not just the peak memory usage? (The queue recycles memory internally but never returns it to libc/the OS until destruction).

@cameron314
Copy link
Owner

My SPSC implementation (ReaderWriterQueue) is completely different. The only thing in common is that it reuses memory internally as well, only releasing it on destruction. So that could be the reason?

@tolgatanriverdi
Copy link
Author

Edited my original post, there were some mistakes in the logic I want to describe.
As you see in the code this is a subscriber class and this instance (with the queue inside of it) created everytime a subscriber is created and generally its between 15-20 instances in our application and that means there are 15-20 concurrentqueues and each of them works as single producer - multiple consumer

@erenkal
Copy link

erenkal commented Oct 17, 2023

I have a similar problem,

Here is a sample code. Memory usage is increasing all the time.

#include <iostream>
#include "concurrentqueue.h"

int main() {
  moodycamel::ConcurrentQueue<std::string> queue{1000};
  auto producer = [&queue]() {
    while (true) {
      std::string message = "Hello" + std::to_string(rand()%1000000);
      queue.enqueue(message);
    }
    std::cout << "Producer finished" << std::endl;
  };

  auto consumer = [&queue]() {
    while (true) {
      std::string message;
      if (queue.try_dequeue(message)) {
//        std::cout << message << std::endl;
      }
    }
  };

  std::thread producerThread(producer);
  std::thread consumerThread(consumer);

  producerThread.detach();
  consumerThread.detach();

  while (true)
    std::this_thread::sleep_for(std::chrono::seconds(1));

  return 0;


}

@cameron314
Copy link
Owner

@erenkal, there's no backpressure in that example. If enqueueing is even slightly faster than dequeueing on average, the queue's size will grow indefinitely.

@cameron314
Copy link
Owner

cameron314 commented Oct 17, 2023

@tolgatanriverdi, what are the queues' sizes before shutting down? And how many different threads call add?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants