Skip to content

Commit

Permalink
fix-squash: Better push/pop methods - using only true atomic operatio…
Browse files Browse the repository at this point in the history
…ns for handling the active items counter, remove pointless thresholding - the writer should never overtake the reader with a full buffer if operations are handled correctly.
  • Loading branch information
Imaniac230 committed Mar 5, 2024
1 parent e6e2978 commit ad556dd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 146 deletions.
47 changes: 11 additions & 36 deletions ouster-ros/src/thread_safe_ring_buffer.h
Expand Up @@ -25,8 +25,6 @@ class ThreadSafeRingBuffer {
active_items_count(0),
write_idx(SIZE_MAX),
read_idx(SIZE_MAX),
dropped_reads(0),
should_always_drop_reads(true),
new_data_lock(mutex, std::defer_lock),
free_space_lock(mutex, std::defer_lock) {}

Expand Down Expand Up @@ -72,7 +70,6 @@ class ThreadSafeRingBuffer {
*/
template <class BufferWriteFn>
void write(BufferWriteFn&& buffer_write) {
should_always_drop_reads = false;
free_space_lock.lock();
free_space_condition.wait(free_space_lock, [this] { return !full(); });
free_space_lock.unlock();
Expand All @@ -85,7 +82,6 @@ class ThreadSafeRingBuffer {
*/
template <class BufferWriteFn>
void write_overwrite(BufferWriteFn&& buffer_write) {
should_always_drop_reads = true;
perform_write(buffer_write);
}

Expand All @@ -95,7 +91,6 @@ class ThreadSafeRingBuffer {
*/
template <class BufferWriteFn>
void write_nonblock(BufferWriteFn&& buffer_write) {
should_always_drop_reads = false;
if (!full()) perform_write(buffer_write);
}

Expand Down Expand Up @@ -153,16 +148,6 @@ class ThreadSafeRingBuffer {
*/
void reset_read_idx() { read_idx = SIZE_MAX; }

/**
* Gets the max_allowed_read_drops value.
* @return The statically set max allowed number of reading drops.
* @remarks
* Should be mostly used by tests.
*/
static constexpr uint32_t get_max_allowed_read_drops() {
return MAX_ALLOWED_READ_DROPS;
}

private:
/**
* Performs the actual sequence of operations for writing.
Expand All @@ -182,23 +167,15 @@ class ThreadSafeRingBuffer {
* @param buffer_read
* @remarks
* If this function attempts to read using an index currently held by the
* writer, it will not perform the operations. However, if allowed, it will
* not keep dropping more than the MAX_ALLOWED_READ_DROPS, after which a
* single read is performed regardless.
* writer, it will not perform the operations.
*/
template <typename BufferReadFn>
void perform_read(BufferReadFn&& buffer_read) {
if ((incremented_with_capacity(read_idx.load()) == write_idx.load())
&& (should_always_drop_reads.load() ||
(dropped_reads.load() < MAX_ALLOWED_READ_DROPS))) {
++dropped_reads;
return;
if (incremented_with_capacity(read_idx.load()) != write_idx.load()) {
buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]);
pop();
free_space_condition.notify_one();
}

dropped_reads = 0;
buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]);
pop();
free_space_condition.notify_one();
}

/**
Expand Down Expand Up @@ -229,19 +206,20 @@ class ThreadSafeRingBuffer {
* buffer capacity.
*/
void push() {
active_items_count = std::min(active_items_count.load() + 1, capacity());
size_t overflow = capacity() + 1;
++active_items_count;
active_items_count.compare_exchange_strong(overflow, capacity());
}

/**
* Atomically decrements the buffer active elements count, clamping at zero.
*/
void pop() {
active_items_count = static_cast<size_t>(std::max(
static_cast<int>(active_items_count.load() - 1), 0));
size_t overflow = SIZE_MAX;
--active_items_count;
active_items_count.compare_exchange_strong(overflow, 0);
}

static constexpr uint32_t MAX_ALLOWED_READ_DROPS = UINT16_MAX * 6;

std::vector<uint8_t> buffer;

const size_t item_size;
Expand All @@ -251,9 +229,6 @@ class ThreadSafeRingBuffer {
std::atomic_size_t write_idx;
std::atomic_size_t read_idx;

std::atomic_uint32_t dropped_reads;
std::atomic_bool should_always_drop_reads;

std::mutex mutex;
std::condition_variable new_data_condition;
std::unique_lock<std::mutex> new_data_lock;
Expand Down
110 changes: 0 additions & 110 deletions ouster-ros/test/ring_buffer_test.cpp
Expand Up @@ -51,10 +51,6 @@ class ThreadSafeRingBufferTest : public ::testing::Test {

void reset_reading() { buffer->reset_read_idx(); }

[[nodiscard]] uint32_t max_dropped_reads() const {
return buffer->get_max_allowed_read_drops();
}

std::unique_ptr<ThreadSafeRingBuffer> buffer;
};

Expand Down Expand Up @@ -639,112 +635,6 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblockingThrottling) {
EXPECT_FALSE(buffer->full());
}

TEST_F(ThreadSafeRingBufferTest, GracefulReadingBlockingWithTimeout) {

static constexpr int TOTAL_ITEMS = 10; // total items to process
const std::vector<std::string> source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE);
std::vector<std::string> target = known_vector_str(TOTAL_ITEMS, "0000");

EXPECT_TRUE(buffer->empty());
EXPECT_FALSE(buffer->full());

std::thread producer([this, &source]() {
for (int i = 0; i < TOTAL_ITEMS; ++i) {
buffer->write_nonblock([i, &source](uint8_t* buffer){
std::memcpy(buffer, &source[i][0], ITEM_SIZE);
});
}

//We're not resetting the writing index on purpose.
});

// wait for 1 second before starting the consumer thread
// allowing sufficient time for the producer thread to be
// completely done
std::this_thread::sleep_for(1s);
std::thread consumer([this, &target]() {
unsigned idx = 0;
for (unsigned i = 0; i < ITEM_COUNT + max_dropped_reads(); ++i) {
buffer->read_timeout([&idx, &target](uint8_t* buffer){
std::memcpy(&target[idx++][0], buffer, ITEM_SIZE);
}, 1s);
}
});

producer.join();
consumer.join();

// The final writing index remained at ITEM_COUNT - 1, so the consumer will
// keep dropping reads until it reaches the maximum dropping threshold, and
// the final item will eventually be filled.
for (int i = 0; i < ITEM_COUNT; ++i) {
std::cout << "source " << source[i] << ", target " << target[i] << std::endl;
EXPECT_EQ(target[i], source[i]);
}
// Since the buffer can only hold upto ITEM_COUNT items, the buffer is completely
// read out. The remaining target items should be empty.
for (int i = ITEM_COUNT + 1; i < TOTAL_ITEMS; ++i) {
std::cout << "source " << source[i] << ", target " << target[i] << std::endl;
EXPECT_EQ(target[i], "0000");
}

EXPECT_TRUE(buffer->empty());
EXPECT_FALSE(buffer->full());
}

TEST_F(ThreadSafeRingBufferTest, GracefulReadingNonblocking) {

static constexpr int TOTAL_ITEMS = 10; // total items to process
const std::vector<std::string> source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE);
std::vector<std::string> target = known_vector_str(TOTAL_ITEMS, "0000");

EXPECT_TRUE(buffer->empty());
EXPECT_FALSE(buffer->full());

std::thread producer([this, &source]() {
for (int i = 0; i < TOTAL_ITEMS; ++i) {
buffer->write_nonblock([i, &source](uint8_t* buffer){
std::memcpy(buffer, &source[i][0], ITEM_SIZE);
});
}

//We're not resetting the writing index on purpose.
});

// wait for 1 second before starting the consumer thread
// allowing sufficient time for the producer thread to be
// completely done
std::this_thread::sleep_for(1s);
std::thread consumer([this, &target]() {
unsigned idx = 0;
for (unsigned i = 0; i < ITEM_COUNT + max_dropped_reads(); ++i) {
buffer->read_nonblock([&idx, &target](uint8_t* buffer){
std::memcpy(&target[idx++][0], buffer, ITEM_SIZE);
});
}
});

producer.join();
consumer.join();

// The final writing index remained at ITEM_COUNT - 1, so the consumer will
// keep dropping reads until it reaches the maximum dropping threshold, and
// the final item will eventually be filled.
for (int i = 0; i < ITEM_COUNT; ++i) {
std::cout << "source " << source[i] << ", target " << target[i] << std::endl;
EXPECT_EQ(target[i], source[i]);
}
// Since the buffer can only hold upto ITEM_COUNT items, the buffer is completely
// read out. The remaining target items should be empty.
for (int i = ITEM_COUNT + 1; i < TOTAL_ITEMS; ++i) {
std::cout << "source " << source[i] << ", target " << target[i] << std::endl;
EXPECT_EQ(target[i], "0000");
}

EXPECT_TRUE(buffer->empty());
EXPECT_FALSE(buffer->full());
}

int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit ad556dd

Please sign in to comment.