Skip to content

Commit

Permalink
Pay attention to TCP sequence numbers
Browse files Browse the repository at this point in the history
(Untested)
  • Loading branch information
joerivanruth committed Feb 23, 2024
1 parent b30c537 commit 6908092
Showing 1 changed file with 80 additions and 19 deletions.
99 changes: 80 additions & 19 deletions src/pcap/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ impl TcpTracker {
return Ok(());
}

let seqno = tcp.sequence_number();

let id = ConnectionId::new(self.conn_ids.next().unwrap());
let upstream = StreamState {
id,
dir: Direction::Upstream,
finished: false,
};
let upstream = StreamState::new(id, Direction::Upstream, seqno.wrapping_add(1));

let ev = MapiEvent::Incoming {
id,
Expand All @@ -94,12 +92,11 @@ impl TcpTracker {
let Some(upstream) = self.streams.get(&flipped) else {
return Ok(());
};

let seqno = tcp.sequence_number();

let id = upstream.id;
let downstream = StreamState {
id,
dir: Direction::Downstream,
finished: false,
};
let downstream = StreamState::new(id, Direction::Downstream, seqno.wrapping_add(1));

let ev = MapiEvent::Connected {
id,
Expand All @@ -124,20 +121,20 @@ impl TcpTracker {
let id = stream.id;
let direction = stream.dir;

let seqno = tcp.sequence_number();
let payload = tcp.payload();
if !payload.is_empty() {
let ev = MapiEvent::Data {
id,
direction,
data: payload.into(),
};
handler(ev)?;
let Some(payload) = stream.reorder(seqno, tcp.fin(), payload) else {
return Ok(());
};

Self::emit_data(id, direction, payload, handler)?;
while let Some(payload) = stream.next_ready() {
Self::emit_data(id, direction, &payload, handler)?;
}

if !tcp.fin() {
if !stream.finished {
return Ok(());
}
stream.finished = true;

let ev = MapiEvent::ShutdownRead { id, direction };
handler(ev)?;
Expand All @@ -152,11 +149,75 @@ impl TcpTracker {

Ok(())
}

fn emit_data(
id: ConnectionId,
direction: Direction,
payload: &[u8],
handler: &mut Handler,
) -> io::Result<()> {
if !payload.is_empty() {
let ev = MapiEvent::Data {
id,
direction,
data: payload.into(),
};
handler(ev)?;
}
Ok(())
}
}

#[derive(Debug)]
struct StreamState {
id: ConnectionId,
dir: Direction,
waiting_for: u32,
waiting: HashMap<u32, (Vec<u8>, bool)>,
finished: bool,
}

impl StreamState {
fn new(id: ConnectionId, dir: Direction, seqno: u32) -> Self {
StreamState {
id,
dir,
waiting_for: seqno,
waiting: Default::default(),
finished: false,
}
}

fn reorder<'a>(&'a mut self, seqno: u32, fin: bool, payload: &'a [u8]) -> Option<&'a [u8]> {
if self.waiting_for == seqno {
return self.yield_payload(payload, fin);
}

// Discard packets we've already seen. Be careful with wraparound.
// Example values: waiting_for = 0x30, seqno_1 = 0x31, seqno_2 = 0x2f.
// delta_1 = 0x01, delta_2 = 0xff.
// delta_1 as i32 = 1, delta_2 as i32 = -1
let delta = seqno.wrapping_sub(self.waiting_for);
if (delta as i32) < 0 {
return None;
}

self.waiting.insert(seqno, (payload.to_owned(), fin));
None
}

fn next_ready(&mut self) -> Option<Vec<u8>> {
if let Some((payload, fin)) = self.waiting.remove(&self.waiting_for) {
self.yield_payload(payload, fin)
} else {
None
}
}

fn yield_payload<T: AsRef<[u8]>>(&mut self, payload: T, fin: bool) -> Option<T> {
self.finished |= fin;
let n = payload.as_ref().len() as u32;
self.waiting_for = self.waiting_for.wrapping_add(n);
Some(payload)
}
}

0 comments on commit 6908092

Please sign in to comment.