Skip to content

Commit

Permalink
Merge pull request #301 from whitevegagabriel/simplify-event-loop-copy
Browse files Browse the repository at this point in the history
Remove unncecesary steps for injecting Python event loop
  • Loading branch information
barbibulle committed Oct 2, 2023
2 parents 80db9e2 + 50fd221 commit a1b55b9
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 35 deletions.
17 changes: 9 additions & 8 deletions rust/src/cli/l2cap/client_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
/// TCP client to connect.
/// When the L2CAP CoC channel is closed, the TCP connection is closed as well.
use crate::cli::l2cap::{
proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals,
BridgeData,
inject_py_event_loop, proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData,
};
use bumble::wrapper::{
device::{Connection, Device},
Expand Down Expand Up @@ -85,22 +84,24 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
let mtu = args.mtu;
let mps = args.mps;
let ble_connection = Arc::new(Mutex::new(ble_connection));
// Ensure Python event loop is available to l2cap `disconnect`
let _ = run_future_with_current_task_locals(async move {
// spawn thread to handle incoming tcp connections
tokio::spawn(inject_py_event_loop(async move {
while let Ok((tcp_stream, addr)) = listener.accept().await {
let ble_connection = ble_connection.clone();
let _ = run_future_with_current_task_locals(proxy_data_between_tcp_and_l2cap(
// spawn thread to handle this specific tcp connection
if let Ok(future) = inject_py_event_loop(proxy_data_between_tcp_and_l2cap(
ble_connection,
tcp_stream,
addr,
psm,
max_credits,
mtu,
mps,
));
)) {
tokio::spawn(future);
}
}
Ok(())
});
})?);
Ok(())
}

Expand Down
23 changes: 7 additions & 16 deletions rust/src/cli/l2cap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::L2cap;
use anyhow::anyhow;
use bumble::wrapper::{device::Device, l2cap::LeConnectionOrientedChannel, transport::Transport};
use owo_colors::{colors::css::Orange, OwoColorize};
use pyo3::{PyObject, PyResult, Python};
use pyo3::{PyResult, Python};
use std::{future::Future, path::PathBuf, sync::Arc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
Expand Down Expand Up @@ -170,21 +170,12 @@ async fn proxy_tcp_rx_to_l2cap_tx(
}
}

/// Copies the current thread's TaskLocals into a Python "awaitable" and encapsulates it in a Rust
/// future, running it as a Python Task.
/// `TaskLocals` stores the current event loop, and allows the user to copy the current Python
/// context if necessary. In this case, the python event loop is used when calling `disconnect` on
/// an l2cap connection, or else the call will fail.
pub fn run_future_with_current_task_locals<F>(
fut: F,
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send>
/// Copies the current thread's Python even loop (contained in `TaskLocals`) into the given future.
/// Useful when sending work to another thread that calls Python code which calls `get_running_loop()`.
pub fn inject_py_event_loop<F, R>(fut: F) -> PyResult<impl Future<Output = R>>
where
F: Future<Output = PyResult<()>> + Send + 'static,
F: Future<Output = R> + Send + 'static,
{
Python::with_gil(|py| {
let locals = pyo3_asyncio::tokio::get_current_locals(py)?;
let future = pyo3_asyncio::tokio::scope(locals.clone(), fut);
pyo3_asyncio::tokio::future_into_py_with_locals(py, locals, future)
.and_then(pyo3_asyncio::tokio::into_future)
})
let locals = Python::with_gil(pyo3_asyncio::tokio::get_current_locals)?;
Ok(pyo3_asyncio::tokio::scope(locals, fut))
}
19 changes: 8 additions & 11 deletions rust/src/cli/l2cap/server_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
/// When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
/// and waits for a new L2CAP CoC channel to be connected.
/// When the TCP connection is closed by the TCP server, the L2CAP connection is closed as well.
use crate::cli::l2cap::{
proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, run_future_with_current_task_locals,
BridgeData,
};
use crate::cli::l2cap::{proxy_l2cap_rx_to_tcp_tx, proxy_tcp_rx_to_l2cap_tx, BridgeData};
use bumble::wrapper::{device::Device, hci::HciConstant, l2cap::LeConnectionOrientedChannel};
use futures::executor::block_on;
use owo_colors::OwoColorize;
Expand All @@ -49,19 +46,19 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
let port = args.tcp_port;
device.register_l2cap_channel_server(
args.psm,
move |_py, l2cap_channel| {
move |py, l2cap_channel| {
let channel_info = l2cap_channel
.debug_string()
.unwrap_or_else(|e| format!("failed to get l2cap channel info ({e})"));
println!("{} {channel_info}", "*** L2CAP channel:".cyan());

let host = host.clone();
// Ensure Python event loop is available to l2cap `disconnect`
let _ = run_future_with_current_task_locals(proxy_data_between_l2cap_and_tcp(
l2cap_channel,
host,
port,
));
// Handles setting up a tokio runtime that runs this future to completion while also
// containing the necessary context vars.
pyo3_asyncio::tokio::future_into_py(
py,
proxy_data_between_l2cap_and_tcp(l2cap_channel, host, port),
)?;
Ok(())
},
args.max_credits,
Expand Down

0 comments on commit a1b55b9

Please sign in to comment.