Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread leaks when using rosrust::api::Ros::new #202

Open
jobafr opened this issue Jun 26, 2023 · 1 comment
Open

thread leaks when using rosrust::api::Ros::new #202

jobafr opened this issue Jun 26, 2023 · 1 comment

Comments

@jobafr
Copy link

jobafr commented Jun 26, 2023

Hi,

I'm writing a watchdog type of thing, which waits for roscore to come online, offers a few services while a roscore is running, keeps querying the run_id parameter periodically to notice ROS restarts/shutdown.

I do the "waiting for roscore" step by calling rosrust::api::Ros::new in a loop until it succeeds. Unfortunately, this spawns lots of threads that never terminate.

I managed to eliminate most of these by implementing Drop for Slave

diff --git a/rosrust/src/api/ros.rs b/rosrust/src/api/ros.rs
index a229efd..2f0e630 100644
--- a/rosrust/src/api/ros.rs
+++ b/rosrust/src/api/ros.rs
@@ -115,30 +115,45 @@ impl Ros {
         }));
 
         let param_cache = Arc::new(Mutex::new(Default::default()));
-        let slave = Slave::new(
-            master_uri,
-            hostname,
-            bind_host,
-            0,
-            &name,
-            Arc::clone(&param_cache),
-            Arc::clone(&shutdown_manager),
-        )?;
-        let master = Master::new(master_uri, &name, slave.uri())?;
-
-        Ok(Ros {
-            master: Arc::new(master),
-            slave: Arc::new(slave),
-            param_cache,
-            hostname: String::from(hostname),
-            bind_address: String::from(bind_host),
-            resolver,
-            name,
-            clock: Arc::new(RealClock::default()),
-            static_subs: Vec::new(),
-            logger,
-            shutdown_manager,
-        })
+
+        let gen_slave_and_master = {
+            let name = name.clone();
+            let shutdown_manager = shutdown_manager.clone();
+            let logger = logger.clone();
+            let param_cache = param_cache.clone();
+            move || {
+            let slave = Slave::new(
+                master_uri,
+                hostname,
+                bind_host,
+                0,
+                &name,
+                Arc::clone(&param_cache),
+                Arc::clone(&shutdown_manager),
+            )?;
+            let master = Master::new(master_uri, &name, slave.uri())?;
+            Ok((slave, master))
+        }};
+
+        match gen_slave_and_master() {
+            Ok((slave, master)) => Ok(Ros {
+                master: Arc::new(master),
+                slave: Arc::new(slave),
+                param_cache,
+                hostname: String::from(hostname),
+                bind_address: String::from(bind_host),
+                resolver,
+                name,
+                clock: Arc::new(RealClock::default()),
+                static_subs: Vec::new(),
+                logger,
+                shutdown_manager,
+            }),
+            Err(e) => {
+                shutdown_manager.shutdown();
+                Err(e)
+            }
+        }
     }
 
     fn map(&mut self, source: &str, destination: &str) -> Result<()> {
diff --git a/rosrust/src/api/slave/mod.rs b/rosrust/src/api/slave/mod.rs
index d99c3ea..38207b2 100644
--- a/rosrust/src/api/slave/mod.rs
+++ b/rosrust/src/api/slave/mod.rs
@@ -180,3 +182,10 @@ impl Slave {
         self.subscriptions.publisher_uris(topic)
     }
 }
+
+
+impl Drop for Slave {
+    fn drop(&mut self) {
+        self.shutdown_tx.send().unwrap();
+    }
+}

… and patching xml-rpc-rs to join the server on drop.

diff --git a/src/server.rs b/src/server.rs
index 964b28a..c5c5d14 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -150,11 +150,20 @@ where
         self.server.server_addr()
     }
 
-    pub fn run(self) {
+/*    pub fn run(self) {
         self.server.run()
-    }
+    }*/
 
     pub fn poll(&self) {
         self.server.poll()
     }
 }
+
+impl<F> Drop for BoundServer<F>
+where
+    F: Send + Sync + 'static + Fn(&rouille::Request) -> rouille::Response,
+{
+    fn drop(&mut self) {
+        self.server.join();
+    }
+}

(which unfortunately breaks the binaries in that crate)

The last remaining leaking thread comes from tcpconnection, which is spawned for the /rosout topic in Ros::new. Due to the interface of TcpListener from std, I didn't see an obvious way to make this cancellable.

I'd appreciate any hints on how to implement proper cleanup for Ros (including the case when the constructor fails), to make it feasible for use cases where the process needs to spawn before, or outlive, roscore.

@jobafr
Copy link
Author

jobafr commented Jun 27, 2023

Got it working with the first suggestion from this stackoverflow post.

I also fixed a TODO in the source (see below), which probably saves a bit CPU as well.

It seems to work, but I'm not entirely sure whether this can trigger a race condition for the Publisher's file descriptor, when repeatedly trying to create Ros objects (It may not be guaranteed that the old listener thread has died when a new Ros instance is spawned, escpecially when spawning multiple Ros instances, e.g. in order to connect to multiple roscores). Edit: libc::shutdown does not make the fd available for reuse (close does), so this should be ok i think.

Patch for rosrust:

diff --git a/rosrust/Cargo.toml b/rosrust/Cargo.toml
index eb9f3ed..6214cd0 100644
--- a/rosrust/Cargo.toml
+++ b/rosrust/Cargo.toml
@@ -18,11 +18,12 @@ hostname = "0.3.1"
 regex = "1.5.4"
 serde = "1.0.127"
 serde_derive = "1.0.127"
-xml-rpc = "0.0.12"
+xml-rpc = {path = "../../xml-rpc-rs"}
 yaml-rust = "0.4.5"
 crossbeam = "0.8.1"
 socket2 = "0.4.1"
 colored = "2.0.0"
+libc = "0.2"
 
 [dependencies.ros_message]
 path = "../ros_message"
diff --git a/rosrust/src/api/ros.rs b/rosrust/src/api/ros.rs
index a229efd..2f0e630 100644
--- a/rosrust/src/api/ros.rs
+++ b/rosrust/src/api/ros.rs
@@ -115,30 +115,45 @@ impl Ros {
         }));
 
         let param_cache = Arc::new(Mutex::new(Default::default()));
-        let slave = Slave::new(
-            master_uri,
-            hostname,
-            bind_host,
-            0,
-            &name,
-            Arc::clone(&param_cache),
-            Arc::clone(&shutdown_manager),
-        )?;
-        let master = Master::new(master_uri, &name, slave.uri())?;
-
-        Ok(Ros {
-            master: Arc::new(master),
-            slave: Arc::new(slave),
-            param_cache,
-            hostname: String::from(hostname),
-            bind_address: String::from(bind_host),
-            resolver,
-            name,
-            clock: Arc::new(RealClock::default()),
-            static_subs: Vec::new(),
-            logger,
-            shutdown_manager,
-        })
+
+        let gen_slave_and_master = {
+            let name = name.clone();
+            let shutdown_manager = shutdown_manager.clone();
+            let logger = logger.clone();
+            let param_cache = param_cache.clone();
+            move || {
+            let slave = Slave::new(
+                master_uri,
+                hostname,
+                bind_host,
+                0,
+                &name,
+                Arc::clone(&param_cache),
+                Arc::clone(&shutdown_manager),
+            )?;
+            let master = Master::new(master_uri, &name, slave.uri())?;
+            Ok((slave, master))
+        }};
+
+        match gen_slave_and_master() {
+            Ok((slave, master)) => Ok(Ros {
+                master: Arc::new(master),
+                slave: Arc::new(slave),
+                param_cache,
+                hostname: String::from(hostname),
+                bind_address: String::from(bind_host),
+                resolver,
+                name,
+                clock: Arc::new(RealClock::default()),
+                static_subs: Vec::new(),
+                logger,
+                shutdown_manager,
+            }),
+            Err(e) => {
+                shutdown_manager.shutdown();
+                Err(e)
+            }
+        }
     }
 
     fn map(&mut self, source: &str, destination: &str) -> Result<()> {
diff --git a/rosrust/src/api/slave/mod.rs b/rosrust/src/api/slave/mod.rs
index d99c3ea..b8ed1f3 100644
--- a/rosrust/src/api/slave/mod.rs
+++ b/rosrust/src/api/slave/mod.rs
@@ -63,9 +63,7 @@ impl Slave {
                     Ok(_) | Err(TryRecvError::Disconnected) => break,
                     Err(TryRecvError::Empty) => {}
                 }
-                bound_handler.poll();
-                // TODO: use a timed out poll once rouille provides it
-                std::thread::sleep(std::time::Duration::from_millis(5));
+                bound_handler.poll_timeout(std::time::Duration::from_millis(500));
             }
             shutdown_manager.shutdown();
         });
@@ -180,3 +178,10 @@ impl Slave {
         self.subscriptions.publisher_uris(topic)
     }
 }
+
+
+impl Drop for Slave {
+    fn drop(&mut self) {
+        self.shutdown_tx.send().unwrap();
+    }
+}
diff --git a/rosrust/src/tcpros/publisher.rs b/rosrust/src/tcpros/publisher.rs
index ef6c842..754de91 100644
--- a/rosrust/src/tcpros/publisher.rs
+++ b/rosrust/src/tcpros/publisher.rs
@@ -10,6 +10,7 @@ use log::error;
 use std::collections::HashMap;
 use std::net::{TcpListener, TcpStream, ToSocketAddrs};
 use std::sync::{atomic, Arc, Mutex};
+use std::os::fd::AsRawFd;
 
 pub struct Publisher {
     subscriptions: DataStream,
@@ -18,11 +19,16 @@ pub struct Publisher {
     last_message: Arc<Mutex<Arc<Vec<u8>>>>,
     queue_size: usize,
     exists: Arc<atomic::AtomicBool>,
+    raw_fd : i32,
+    join_handle: Option<std::thread::JoinHandle<()>>
 }
 
 impl Drop for Publisher {
     fn drop(&mut self) {
+        println!("dropping publisher for {}", &self.topic.name);
         self.exists.store(false, atomic::Ordering::SeqCst);
+        unsafe { libc::shutdown(self.raw_fd, libc::SHUT_RD); }
+        self.join_handle.take().unwrap().join().unwrap();
     }
 }
 
@@ -138,6 +144,7 @@ impl Publisher {
         U: ToSocketAddrs,
     {
         let listener = TcpListener::bind(address)?;
+        let raw_fd = listener.as_raw_fd();
         let socket_address = listener.local_addr()?;
 
         let publisher_exists = Arc::new(atomic::AtomicBool::new(true));
@@ -168,7 +175,7 @@ impl Publisher {
             }
         };
 
-        tcpconnection::iterate(listener, format!("topic '{}'", topic), iterate_handler);
+        let join_handle = Some(tcpconnection::iterate(listener, format!("topic '{}'", topic), iterate_handler));
 
         let topic = Topic {
             name: String::from(topic),
@@ -183,6 +190,8 @@ impl Publisher {
             last_message,
             queue_size,
             exists: publisher_exists,
+            raw_fd,
+            join_handle
         })
     }
 
diff --git a/rosrust/src/tcpros/util/tcpconnection.rs b/rosrust/src/tcpros/util/tcpconnection.rs
index 9cc56d4..a8ff41b 100644
--- a/rosrust/src/tcpros/util/tcpconnection.rs
+++ b/rosrust/src/tcpros/util/tcpconnection.rs
@@ -8,11 +8,11 @@ pub enum Feedback {
     StopAccepting,
 }
 
-pub fn iterate<F>(listener: TcpListener, tag: String, handler: F)
+pub fn iterate<F>(listener: TcpListener, tag: String, handler: F) -> thread::JoinHandle<()>
 where
     F: Fn(TcpStream) -> Feedback + Send + 'static,
 {
-    thread::spawn(move || listener_thread(&listener, &tag, handler));
+    thread::spawn(move || listener_thread(&listener, &tag, handler))
 }
 
 fn listener_thread<F>(connections: &TcpListener, tag: &str, handler: F)
@@ -26,7 +26,12 @@ where
                 Feedback::StopAccepting => break,
             },
             Err(err) => {
-                error!("TCP connection failed at {}: {}", tag, err);
+                if err.kind() == std::io::ErrorKind::InvalidInput {
+                    println!("got EINVAL, quitting listener thread");
+                    return;
+                }
+                error!("TCP connection failed at {}: {}", & tag, &err);
+                println!("listener_thread ({}) error: {}", &tag, &err);
             }
         }
     }

Patch for xml-rpc-rs:

diff --git a/src/server.rs b/src/server.rs
index 964b28a..65e6716 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -150,11 +150,24 @@ where
         self.server.server_addr()
     }
 
-    pub fn run(self) {
+/*    pub fn run(self) {
         self.server.run()
-    }
+    }*/
 
     pub fn poll(&self) {
         self.server.poll()
     }
+
+    pub fn poll_timeout(&self, dur : std::time::Duration) {
+        self.server.poll_timeout(dur)
+    }
+}
+
+impl<F> Drop for BoundServer<F>
+where
+    F: Send + Sync + 'static + Fn(&rouille::Request) -> rouille::Response,
+{
+    fn drop(&mut self) {
+        self.server.join();
+    }
 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant