Skip to content

Commit

Permalink
Support proxying OOB signals
Browse files Browse the repository at this point in the history
  • Loading branch information
joerivanruth committed Mar 26, 2024
1 parent c0b3112 commit 2c269e1
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 4 deletions.
4 changes: 1 addition & 3 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ In main.rs we take care of setting everything up.

The `proxy` module takes care of all network IO. It's currently based on [mio],
a nonblocking IO library, because that seemed to be the best way to support
out-of-band messages. That requirement has gone away and mio could now probably
be replaced with a simpler threaded implementation, but that hasn't happened
yet.
out-of-band messages in Rust.

The proxy is not aware of the structure of the communication protocol, it just
forwards bytes. There is one exception, when connecting to a Unix Domain socket,
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ What changed in mapiproxy, per version

## mapiproxy NEXTVERSION - YYYY-MM-DD

- Support proxying [Out-Of-Band (OOB)][OOB] signals.

[OOB]: https://en.wikipedia.org/wiki/Transmission_Control_Protocol#Out-of-band_data

## mapiproxy 0.6.1 - 2024-03-13

Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mio = { version = "0.8.11", features = [ "net", "os-ext", "os-poll" ] }
pcap-file = "2.0.0"
slab = "0.4.9"
smallvec = { version = "1.13.1", features = [ "union" ] }
socket2 = "0.5.6"
thiserror = "1.0.57"

[dev-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions src/mapi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ impl State {
format_args!("{receiver} has stopped receiving data, discarding {n} bytes"),
)?;
}

MapiEvent::Oob(id, direction, byte) => {
let sender = direction.sender();
renderer.message(
Some(*id),
Some(*direction),
format_args!("{sender} sent an Out-Of-Band message: {byte}"),
)?;
}
}

Ok(())
Expand Down
9 changes: 9 additions & 0 deletions src/proxy/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub enum MapiEvent {
error: io::Error,
immediately: bool,
},

/// Client or server has sent an OOB message.
Oob(ConnectionId, Direction, u8),
}

/// Struct [EventSink] knows what to do with new [MapiEvent]s and
Expand Down Expand Up @@ -246,4 +249,10 @@ impl<'a> ConnectionSink<'a> {
discard,
});
}

/// Emit a [MapiEvent::Oob] event.
pub fn emit_oob_received(&mut self, direction: Direction, byte: u8) {
self.0
.emit_event(MapiEvent::Oob(self.id(), direction, byte))
}
}
64 changes: 64 additions & 0 deletions src/proxy/forward.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io::{self, ErrorKind, Read, Write},
mem::MaybeUninit,
ops::ControlFlow::{self, Break, Continue},
vec,
};
Expand Down Expand Up @@ -301,6 +302,7 @@ pub struct Copying {
can_read: bool,
can_write: bool,
buffer: Box<[u8; Self::BUFSIZE]>,
oob: Option<u8>,
unsent_data: usize,
free_space: usize,
fix_unix_read: bool,
Expand All @@ -322,6 +324,7 @@ impl Copying {
can_read: true,
can_write: true,
buffer,
oob: None,
unsent_data: 0,
free_space,
fix_unix_read,
Expand Down Expand Up @@ -354,6 +357,10 @@ impl Copying {
}
}

progress |= self
.handle_oob(direction, sink, rd, wr)
.map_err(Error::Oob)?;

let to_write = &self.buffer[self.unsent_data..self.free_space];
if !to_write.is_empty() {
assert!(self.can_write);
Expand Down Expand Up @@ -434,6 +441,63 @@ impl Copying {
fn finished(&self) -> bool {
!self.can_read && !self.can_write
}

fn handle_oob(
&mut self,
direction: Direction,
sink: &mut ConnectionSink,
rd: &mut Registered<MioStream>,
wr: &mut Registered<MioStream>,
) -> io::Result<bool> {
let mut progress = false;

// only try to receive OOB if we don't already have an OOB we're trying to send
if self.oob.is_none() {
if let Some(msg) = rd.attempt(Interest::PRIORITY, Self::recv_oob)? {
progress = true;
self.oob = Some(msg);
sink.emit_oob_received(direction, msg);
}
};

if let Some(msg) = &self.oob {
let wrote = wr.attempt(Interest::WRITABLE, |w| Self::send_oob(w, *msg))?;
if wrote > 0 {
progress = true;
self.oob = None;
}
}

Ok(progress)
}

fn recv_oob(r: &mut MioStream) -> io::Result<Option<u8>> {
r.with_socket2(|sock2| {
let mut buf = [MaybeUninit::uninit()];
let x = sock2.recv_out_of_band(&mut buf);
match x {
Ok(1) => {
let message = unsafe { buf[0].assume_init() };
Ok(Some(message))
}
Ok(0) => Ok(None),
Err(e) if would_block(&e) => Ok(None),
Err(e) if e.kind() == ErrorKind::InvalidInput => Ok(None),
Err(e) => Err(e),
Ok(n) => unreachable!("oob read 1 byte returned {n}"),
}
})
}

fn send_oob(w: &mut MioStream, msg: u8) -> io::Result<usize> {
let ret = w.with_socket2(|sock2| sock2.send_out_of_band(&[msg]));
match ret.as_ref().map_err(|e| e.kind()) {
Ok(1) => Ok(1),
Ok(0) | Err(ErrorKind::WouldBlock | ErrorKind::InvalidInput) => Ok(0),
Ok(n @ 2..) => unreachable!("oob write 1 byte returned {n}"),
Err(_) => ret,
}
}
}

#[derive(Debug)]
Expand Down
5 changes: 4 additions & 1 deletion src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ pub enum Error {
#[error("None of the servers responded")]
Connect,

#[error("forwarding failed when {doing} {side}: {err}")]
#[error("Forwarding failed when {doing} {side}: {err}")]
Forward {
doing: &'static str,
side: &'static str,
err: io::Error,
},

#[error("could not handle OOB message: {0}")]
Oob(io::Error),

#[error("{0}")]
Other(String),
}
Expand Down
18 changes: 18 additions & 0 deletions src/proxy/network.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::mem::ManuallyDrop;
use std::os::fd::{AsRawFd, FromRawFd};
use std::{
ffi::{OsStr, OsString},
fmt::Display,
Expand Down Expand Up @@ -425,6 +427,22 @@ impl MioStream {
MioStream::Unix(_) => Ok(()),
}
}

pub fn with_socket2<T, F>(&mut self, f: F) -> io::Result<T>
where
F: FnOnce(&mut socket2::Socket) -> io::Result<T>,
{
let fd = match self {
MioStream::Tcp(sock) => sock.as_raw_fd(),
MioStream::Unix(sock) => sock.as_raw_fd(),
};
let sock2 = unsafe {
// SAFETY: it's clear from above that fd is always a socket.
socket2::Socket::from_raw_fd(fd)
};
let mut dont_drop = ManuallyDrop::new(sock2);
f(&mut dont_drop)
}
}

impl io::Write for MioStream {
Expand Down

0 comments on commit 2c269e1

Please sign in to comment.