Skip to content

Commit

Permalink
chore(io): Add a cross-platform unidirectional pipe implementation (#…
Browse files Browse the repository at this point in the history
…22522)

Currently useful for `deno test` and internal tests, but could
potentially be exposed at a later time as a `Deno` API.
  • Loading branch information
mmastrac committed Feb 22, 2024
1 parent d29fb91 commit 27579f6
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 7 deletions.
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -129,7 +129,7 @@ monch = "=0.5.0"
notify = "=5.0.0"
num-bigint = { version = "0.4", features = ["rand"] }
once_cell = "1.17.1"
os_pipe = "=1.1.4"
os_pipe = { version = "=1.1.5", features = ["io_safety"] }
p224 = { version = "0.13.0", features = ["ecdh"] }
p256 = { version = "0.13.2", features = ["ecdh"] }
p384 = { version = "0.13.0", features = ["ecdh"] }
Expand Down Expand Up @@ -165,7 +165,7 @@ tar = "=0.4.40"
tempfile = "3.4.0"
termcolor = "1.1.3"
thiserror = "1.0.40"
tokio = { version = "1.28.1", features = ["full"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-metrics = { version = "0.3.0", features = ["rt"] }
tokio-util = "0.7.4"
tower-lsp = { version = "=0.20.0", features = ["proposed"] }
Expand Down
4 changes: 4 additions & 0 deletions ext/io/Cargo.toml
Expand Up @@ -21,5 +21,9 @@ fs3.workspace = true
once_cell.workspace = true
tokio.workspace = true

[target.'cfg(not(windows))'.dependencies]
os_pipe.workspace = true

[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv"] }
rand.workspace = true
9 changes: 9 additions & 0 deletions ext/io/lib.rs
Expand Up @@ -50,6 +50,15 @@ use winapi::um::processenv::GetStdHandle;
use winapi::um::winbase;

pub mod fs;
mod pipe;
#[cfg(windows)]
mod winpipe;

pub use pipe::pipe;
pub use pipe::AsyncPipeRead;
pub use pipe::AsyncPipeWrite;
pub use pipe::PipeRead;
pub use pipe::PipeWrite;

// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
Expand Down
288 changes: 288 additions & 0 deletions ext/io/pipe.rs
@@ -0,0 +1,288 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::io;
use std::pin::Pin;

// The synchronous read end of a unidirectional pipe.
pub struct PipeRead {
file: std::fs::File,
}

// The asynchronous read end of a unidirectional pipe.
pub struct AsyncPipeRead {
#[cfg(windows)]
/// We use a `ChildStdout` here as it's a much better fit for a Windows named pipe on Windows. We
/// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
/// if those can be created from raw handles down the road.
read: tokio::process::ChildStdout,
#[cfg(not(windows))]
read: tokio::net::unix::pipe::Receiver,
}

// The synchronous write end of a unidirectional pipe.
pub struct PipeWrite {
file: std::fs::File,
}

// The asynchronous write end of a unidirectional pipe.
pub struct AsyncPipeWrite {
#[cfg(windows)]
/// We use a `ChildStdin` here as it's a much better fit for a Windows named pipe on Windows. We
/// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
/// if those can be created from raw handles down the road.
write: tokio::process::ChildStdin,
#[cfg(not(windows))]
write: tokio::net::unix::pipe::Sender,
}

impl PipeRead {
#[cfg(windows)]
pub fn into_async(self) -> AsyncPipeRead {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdout = std::process::ChildStdout::from(owned);
AsyncPipeRead {
read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
}
}
#[cfg(not(windows))]
pub fn into_async(self) -> AsyncPipeRead {
AsyncPipeRead {
read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
}
}
}

impl AsyncPipeRead {
#[cfg(windows)]
pub fn into_sync(self) -> PipeRead {
let owned = self.read.into_owned_handle().unwrap();
PipeRead { file: owned.into() }
}
#[cfg(not(windows))]
pub fn into_sync(self) -> PipeRead {
let file = self.read.into_nonblocking_fd().unwrap().into();
PipeRead { file }
}
}

impl std::io::Read for PipeRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.file.read(buf)
}

fn read_vectored(
&mut self,
bufs: &mut [io::IoSliceMut<'_>],
) -> io::Result<usize> {
self.file.read_vectored(bufs)
}
}

impl tokio::io::AsyncRead for AsyncPipeRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
}
}

impl PipeWrite {
#[cfg(windows)]
pub fn into_async(self) -> AsyncPipeWrite {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdin = std::process::ChildStdin::from(owned);
AsyncPipeWrite {
write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
}
}
#[cfg(not(windows))]
pub fn into_async(self) -> AsyncPipeWrite {
AsyncPipeWrite {
write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
}
}
}

impl AsyncPipeWrite {
#[cfg(windows)]
pub fn into_sync(self) -> PipeWrite {
let owned = self.write.into_owned_handle().unwrap();
PipeWrite { file: owned.into() }
}
#[cfg(not(windows))]
pub fn into_sync(self) -> PipeWrite {
let file = self.write.into_nonblocking_fd().unwrap().into();
PipeWrite { file }
}
}

impl std::io::Write for PipeWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.file.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}

fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.file.write_vectored(bufs)
}
}

impl tokio::io::AsyncWrite for AsyncPipeWrite {
#[inline(always)]
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
}

#[inline(always)]
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().write).poll_flush(cx)
}

#[inline(always)]
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
}

#[inline(always)]
fn is_write_vectored(&self) -> bool {
self.write.is_write_vectored()
}

#[inline(always)]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, io::Error>> {
Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs)
}
}

/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
/// but either side may be promoted to an async-capable reader/writer.
///
/// On Windows, we use a named pipe because that's the only way to get reliable async I/O
/// support. On Unix platforms, we use the `os_pipe` library, which uses `pipe2` under the hood
/// (or `pipe` on OSX).
pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
pipe_impl()
}

/// Creates a unidirectional pipe on top of a named pipe (which is technically bidirectional).
#[cfg(windows)]
pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
// SAFETY: We're careful with handles here
unsafe {
use std::os::windows::io::FromRawHandle;
use std::os::windows::io::OwnedHandle;
let (server, client) = crate::winpipe::create_named_pipe()?;
let read = std::fs::File::from(OwnedHandle::from_raw_handle(client));
let write = std::fs::File::from(OwnedHandle::from_raw_handle(server));
Ok((PipeRead { file: read }, PipeWrite { file: write }))
}
}

/// Creates a unidirectional pipe for unix platforms.
#[cfg(not(windows))]
pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
use std::os::unix::io::OwnedFd;
let (read, write) = os_pipe::pipe()?;
let read = std::fs::File::from(Into::<OwnedFd>::into(read));
let write = std::fs::File::from(Into::<OwnedFd>::into(write));
Ok((PipeRead { file: read }, PipeWrite { file: write }))
}

#[cfg(test)]
mod tests {
use super::*;

use std::io::Read;
use std::io::Write;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;

#[test]
fn test_pipe() {
let (mut read, mut write) = pipe().unwrap();
// Write to the server and read from the client
write.write_all(b"hello").unwrap();
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
}

#[tokio::test]
async fn test_async_pipe() {
let (read, write) = pipe().unwrap();
let mut read = read.into_async();
let mut write = write.into_async();

write.write_all(b"hello").await.unwrap();
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
}

/// Test a round-trip through async mode and back.
#[tokio::test]
async fn test_pipe_transmute() {
let (mut read, mut write) = pipe().unwrap();

// Sync
write.write_all(b"hello").unwrap();
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");

let mut read = read.into_async();
let mut write = write.into_async();

// Async
write.write_all(b"hello").await.unwrap();
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");

let mut read = read.into_sync();
let mut write = write.into_sync();

// Sync
write.write_all(b"hello").unwrap();
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
}

#[tokio::test]
async fn test_async_pipe_is_nonblocking() {
let (read, write) = pipe().unwrap();
let mut read = read.into_async();
let mut write = write.into_async();

let a = tokio::spawn(async move {
let mut buf: [u8; 5] = Default::default();
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
});
let b = tokio::spawn(async move {
write.write_all(b"hello").await.unwrap();
});

a.await.unwrap();
b.await.unwrap();
}
}

0 comments on commit 27579f6

Please sign in to comment.