Skip to content

Commit

Permalink
River: Beginnings of shared memory-based IPC implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
byteduck committed Apr 20, 2024
1 parent 3635f35 commit c4a8977
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 1 deletion.
2 changes: 1 addition & 1 deletion libraries/libriver/CMakeLists.txt
@@ -1,3 +1,3 @@
SET(SOURCES BusConnection.cpp BusServer.cpp Endpoint.cpp packet.cpp)
SET(SOURCES BusConnection.cpp BusServer.cpp Endpoint.cpp IPCBuffer.cpp packet.cpp)
MAKE_LIBRARY(libriver)
TARGET_LINK_LIBRARIES(libriver libduck)
152 changes: 152 additions & 0 deletions libraries/libriver/IPCBuffer.cpp
@@ -0,0 +1,152 @@
/* SPDX-License-Identifier: GPL-3.0-or-later */
/* Copyright © 2016-2024 Byteduck */

#include "IPCBuffer.h"
#include <sys/futex.h>

using namespace Duck;
using namespace River;

IPCBuffer::IPCBuffer(Duck::Ptr<Duck::SharedBuffer> buffer):
m_buffer(std::move(buffer)),
m_header(m_buffer->ptr<Header>()),
m_data(m_buffer->ptr<uint8_t>() + sizeof(Header)),
m_data_size(m_buffer->size() - sizeof(Header))
{
assert(m_header->magic == IPC_MAGIC);
}

Duck::ResultRet<Duck::Ptr<IPCBuffer>> IPCBuffer::alloc(std::string name, size_t buffer_size) {
if (buffer_size <= sizeof(Header))
return Result {"Invalid buffer size"};

auto buf = TRY(Duck::SharedBuffer::alloc(buffer_size, std::move(name)));
auto* hdr = buf->ptr<Header>();
hdr->magic = IPC_MAGIC;
hdr->read_futex = 0;
hdr->write_futex = 0;
hdr->read = 0;
hdr->write = 0;
return Duck::Ptr<IPCBuffer>(new IPCBuffer(buf));
}

Duck::ResultRet<Duck::Ptr<IPCBufferReceiver>> IPCBufferReceiver::attach(Duck::Ptr<Duck::SharedBuffer> buffer) {
if (buffer->size() <= sizeof(Header))
return Result {"Invalid buffer size"};
if (buffer->ptr<Header>()->magic != IPC_MAGIC)
return Result {"Invalid magic"};
return Duck::Ptr<IPCBufferReceiver>(new IPCBufferReceiver(std::move(buffer)));
}

Result IPCBufferReceiver::recv(const ReadCallback& callback, bool blocking) {
size_t msg_size;
auto read = m_header->read.load(); // Since this is in shared memory, we save it to the stack to prevent malicious screwery

// Acquire read futex
if(blocking)
futex_wait(&m_header->read_futex);
else if(!futex_trywait(&m_header->read_futex))
return {NO_MESSAGE};

while(true) {
// Ensure read head is valid
if (read > m_data_size) {
return {INVALID_BUFFER_STATE};
} else if (read + sizeof(Message) > m_data_size) {
// If reading a message header would go past the end of the buffer, that means we need to wrap around.
m_header->read = 0;
read = 0;
}

// Read into output buffer
msg_size = *((size_t*) (m_data + read));

// Size of -1 means that we ran out of space when writing the last packet and wrapped around to the beginning
if (msg_size == -1) {
m_header->read = 0;
read = 0;
} else {
break;
}
}

// Call callback with message. Check buffer size twice to account for overflows
if(read > m_data_size || (read + msg_size + sizeof(Message)) > m_data_size)
return {INVALID_BUFFER_STATE};

callback(m_data + read + sizeof(Message), msg_size);

// Move read head
m_header->read = (read + msg_size + sizeof(Message)) % m_data_size;
m_header->unread.fetch_sub(msg_size + sizeof(Message), std::memory_order::memory_order_release);
futex_signal(&m_header->write_futex);

return Result::SUCCESS;
}

Duck::ResultRet<Duck::Ptr<IPCBufferSender>> IPCBufferSender::attach(Ptr<SharedBuffer> buffer) {
if (buffer->size() <= sizeof(Header))
return Result {"Invalid buffer size"};
if (buffer->ptr<Header>()->magic != IPC_MAGIC)
return Result {"Invalid magic"};
return Duck::Ptr<IPCBufferSender>(new IPCBufferSender(std::move(buffer)));
}

Duck::Result IPCBufferSender::send(size_t size, const WriteCallback& callback, bool blocking) {
if (size > m_data_size - sizeof(Message))
return {MESSAGE_TOO_LARGE};

// Set the write futex to 0 just in case we need it
__atomic_store_n(&m_header->write_futex, 0, __ATOMIC_SEQ_CST);

auto write = m_header->write.load();

auto can_write = [&write, size, this] () -> bool {
const size_t read = m_header->read.load();
if (read == m_header->write)
return m_header->unread.load() == 0;
else if (read > write)
return write + size + sizeof(Message) < read;
else
return (write + size + sizeof(Message) <= m_data_size) || (read >= size + sizeof(Message));
};

while (!can_write()) {
if (!blocking)
return {NO_BUFFER_SPACE};
futex_wait(&m_header->write_futex);
}

if (write + sizeof(Message) > m_data_size) {
// If writing a message header would overflow, just wrap around to 0. Reader will know to follow.
write = 0;
} else if (write + size + sizeof(Message) > m_data_size) {
// If writing the header is possible but writing the whole message is not, we need to write a -1 to signify this
((Message*) (m_data + write))->size = -1;
write = 0;
}

// Write the message header and message
auto* message = (Message*) (m_data + write);
message->size = size;
callback(message->data);

// Update write head
write = write + size + sizeof(Message);
assert(write <= m_data_size);
if (write == m_data_size)
write = 0;
m_header->write = write;

// Signal reader futex
m_header->unread.fetch_add(size + sizeof(Message), std::memory_order::memory_order_release);
futex_signal(&m_header->read_futex);

return Result::SUCCESS;
}

Duck::Result IPCBufferSender::send(size_t size, const void* val, bool blocking) {
return send(size, [val, size] (uint8_t* buf) {
memcpy(buf, val, size);
}, blocking);
}
72 changes: 72 additions & 0 deletions libraries/libriver/IPCBuffer.h
@@ -0,0 +1,72 @@
/* SPDX-License-Identifier: GPL-3.0-or-later */
/* Copyright © 2016-2024 Byteduck */

#pragma once

#include <libduck/SharedBuffer.h>
#include <functional>
#include <atomic>

namespace River {
class IPCBuffer {
public:
struct Message {
size_t size;
uint8_t data[];
};

enum ResultCode {
NO_MESSAGE,
INVALID_BUFFER_STATE,
NO_BUFFER_SPACE,
MESSAGE_TOO_LARGE
};

static constexpr size_t default_buffer_size = PAGE_SIZE * 8;

static Duck::ResultRet<Duck::Ptr<IPCBuffer>> alloc(std::string name, size_t buffer_size = default_buffer_size);

Duck::Ptr<Duck::SharedBuffer> buffer() const { return m_buffer; }

protected:
IPCBuffer(Duck::Ptr<Duck::SharedBuffer> buffer);

static constexpr int IPC_MAGIC = 0x42069;
struct Header {
int magic;
int read_futex;
int write_futex;
std::atomic<size_t> unread, read, write;
};

Duck::Ptr<Duck::SharedBuffer> m_buffer;
uint8_t* m_data;
size_t m_data_size;
Header* m_header;
};

class IPCBufferReceiver: public IPCBuffer {
public:
static Duck::ResultRet<Duck::Ptr<IPCBufferReceiver>> attach(Duck::Ptr<Duck::SharedBuffer> buffer);

using ReadCallback = std::function<void(const uint8_t*, size_t)>;
Duck::Result recv(const ReadCallback& callback, bool blocking = true);

private:
IPCBufferReceiver(Duck::Ptr<Duck::SharedBuffer> buffer): IPCBuffer(std::move(buffer)) {};
};

class IPCBufferSender: public IPCBuffer {
public:

static Duck::ResultRet<Duck::Ptr<IPCBufferSender>> attach(Duck::Ptr<Duck::SharedBuffer> buffer);

using WriteCallback = std::function<void(uint8_t*)>;
Duck::Result send(size_t size, const WriteCallback& callback, bool blocking = true);
Duck::Result send(size_t size, const void* val, bool blocking = true);

private:

IPCBufferSender(Duck::Ptr<Duck::SharedBuffer> buffer): IPCBuffer(std::move(buffer)) {};
};
}

0 comments on commit c4a8977

Please sign in to comment.