From 622937f07435d0a6136c15ce2a83e2610cc2eb00 Mon Sep 17 00:00:00 2001 From: Kornel Date: Sat, 20 Apr 2024 13:26:17 +0100 Subject: [PATCH] WIP --- rmp/Cargo.toml | 5 +- rmp/src/decode/future.rs | 161 +++++++++++++++++++++++++++++++++++++++ rmp/src/decode/mod.rs | 5 ++ 3 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 rmp/src/decode/future.rs diff --git a/rmp/Cargo.toml b/rmp/Cargo.toml index 4c101d2..017cacc 100644 --- a/rmp/Cargo.toml +++ b/rmp/Cargo.toml @@ -13,12 +13,15 @@ edition = "2021" [dependencies] byteorder = { version = "1.4.2", default-features = false } +bytes = { version = "1.6.0", default-features = false, features = ["std"], optional = true } +futures = { version = "0.3.30", optional = true, default-features = false, features = ["std"] } num-traits = { version = "0.2.14", default-features = false } # This is macro_only ;) paste = "1.0" [features] -default = ["std"] +default = ["std", "async"] +async = ["std", "dep:futures", "dep:bytes"] std = ["byteorder/std", "num-traits/std"] [dev-dependencies] diff --git a/rmp/src/decode/future.rs b/rmp/src/decode/future.rs new file mode 100644 index 0000000..95aff10 --- /dev/null +++ b/rmp/src/decode/future.rs @@ -0,0 +1,161 @@ +use std::io::{self, Read}; + +use alloc::collections::VecDeque; +use bytes::{Buf, Bytes}; +use super::{LenError, MessageLen}; + +/// Collect chunks of bytes until a full message is found +pub struct MessageBuffer { + /// Once parsing fails, it's impossible to recover + fatal_error: bool, + /// A rope for all incoming data + chunks: VecDeque, + /// Chunks are parsed lazily + chunks_parsed_num: usize, + /// Cumulative byte size of `chunks[..chunks_parsed_num]` + chunks_parsed_byte_len: usize, + /// msgpack parser + msg_len: MessageLen, +} + +/// Result after buffering chunk of data +pub enum MaybeMessage { + /// Found a complete message + /// + /// The message is split into pieces + Message(MessageChunks), + /// Message not complete yet. Read this many bytes. + MoreBytes(usize), +} + +/// This keeps individual `Bytes` pieces to avoid reallocating memory +/// +/// Use `into_inner` to process them manually, or use `MessageChunks` as `io::Read` +pub struct MessageChunks(VecDeque); + +impl MessageChunks { + /// Get the underlying `Bytes` + pub fn into_inner(self) -> VecDeque { + self.0 + } +} + +/// The exact `IntoIter` type may change in the future +impl IntoIterator for MessageChunks { + type IntoIter = as IntoIterator>::IntoIter; + type Item = Bytes; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl Read for MessageChunks { + fn read(&mut self, out_buf: &mut [u8]) -> io::Result { + while let Some(bytes) = self.0.get_mut(0) { + let mut ch = bytes.chunk(); + if ch.is_empty() { + self.0.pop_front(); + continue; + } + let read_len = out_buf.len().min(ch.len()); + out_buf[..read_len].copy_from_slice(&ch[..read_len]); + if read_len == ch.len() { + self.0.pop_front(); + } else { + ch.advance(read_len); + } + return Ok(read_len); + } + Ok(0) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + let len = self.0.iter().map(|ch| ch.remaining()).sum(); + buf.try_reserve_exact(len).map_err(|e| io::ErrorKind::OutOfMemory)?; + for c in self.0.drain(..) { + buf.extend_from_slice(c.chunk()); + } + Ok(len) + } +} + +impl MessageBuffer { + #[inline(always)] + pub fn new() -> Self { + Self { + fatal_error: false, + chunks_parsed_num: 0, + chunks_parsed_byte_len: 0, + chunks: VecDeque::new(), + msg_len: MessageLen::new(), // TODO: limits + } + } + + /// Parse chunks added with `push_bytes` etc., and dequeue chunks of complete msgpack messages + pub fn poll_messages(&mut self) -> impl Iterator> + '_ { + std::iter::from_fn(move || { + while self.chunks_parsed_num < self.chunks.len() && !self.fatal_error { + let bytes = &mut self.chunks[self.chunks_parsed_num]; + self.chunks_parsed_num += 1; + self.chunks_parsed_byte_len += bytes.len(); + + match self.msg_len.incremental_len(bytes.as_ref()) { + Ok(message_len) => { + self.msg_len.reset(); + + let unused_bytes = self.chunks_parsed_byte_len.saturating_sub(message_len); + let remainder = bytes.split_off(bytes.len() - unused_bytes); + + // includes the `bytes` cut + let message_data = self.chunks.drain(..self.chunks_parsed_num).collect::>(); + + self.chunks_parsed_byte_len = 0; + self.chunks_parsed_num = 0; + self.chunks.push_front(remainder); + + debug_assert!(message_data.iter().all(|b| b.remaining() == b.len())); + Some(Ok::(MaybeMessage::Message(MessageChunks(message_data)))); + }, + Err(LenError::Truncated(new_len)) => { + if self.chunks_parsed_num >= self.chunks.len() { + let wants_more = new_len.get().saturating_sub(self.chunks_parsed_byte_len); + return Some(Ok(MaybeMessage::MoreBytes(wants_more))); + } + }, + Err(LenError::ParseError) => { + self.fatal_error = true; + return Some(Err(())); + }, + } + } + None + }) + } + + /// Buffer more data + pub fn push_bytes(&mut self, mut bytes: Bytes) { + // bytes are stateful, and later `io::Read` will use that + if bytes.remaining() != bytes.len() { + bytes = bytes.slice(..); + } + self.chunks.push_back(bytes); + } + + /// Buffer more data + #[inline] + pub fn push_vec(&mut self, bytes: Vec) { + self.push_bytes(bytes.into()); + } + + /// Buffer more data + #[inline] + pub fn copy_from_slice(&mut self, bytes: &[u8]) { + self.push_bytes(bytes.into()); + } + + /// Recover buffered data + pub fn into_bytes(self) -> Vec { + self.chunks.into() + } +} + diff --git a/rmp/src/decode/mod.rs b/rmp/src/decode/mod.rs index aa55f7d..1402335 100644 --- a/rmp/src/decode/mod.rs +++ b/rmp/src/decode/mod.rs @@ -20,6 +20,11 @@ mod est; #[cfg(feature = "std")] pub use est::{MessageLen, LenError}; +#[cfg(feature = "async")] +mod future; +#[cfg(feature = "async")] +pub use future::Async; + pub use self::dec::{read_f32, read_f64}; pub use self::ext::{ read_ext_meta, read_fixext1, read_fixext16, read_fixext2, read_fixext4, read_fixext8, ExtMeta,