You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Goal: I want to develop a code that records rosbag and when ever a trigger happens, a snapshot service is called to save the data that is in the cache to a rosbag. The code also should be able to stop the recording and start it again whenever needed (for example, after 5 snapshot calls, save a rosbag and start recording again). I tried to use the Recorder() class from the rosbag2_py package but the code is stuck at self.recorder.record(storage_options, record_options) line and cant be do any other task. What would be a potential solution for developing such code?
This is a sample python code:
from rosbag2_snapshot.srv import Snapshot
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from rosbag2_py import Recorder, RecordOptions, StorageOptions
class BagRecorderNode(Node):
def __init__(self):
super().__init__('bag_recorder')
self.snapshot_client = self.create_client(Snapshot, 'rosbag2_recorder/snapshot')
self.snapshot_control_service = self.create_service(SetBool, 'rosbag_control', self.handle_rosbag_control_request)
self.recorder = Recorder()
self.recording = False
self.snapshot_counter = 0
self.start_recording('/')
def handle_rosbag_control_request(self, request, response):
self.get_logger().info('Snapshot control requested, calling snapshot service.')
while not self.snapshot_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Snapshot service not available, waiting...')
snapshot_request = Snapshot.Request()
future = self.snapshot_client.call_async(snapshot_request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
response.success = True
self.get_logger().info('Snapshot service call successful')
self.snapshot_counter += 1
if self.snapshot_counter >= 5:
self.stop_recording()
self.get_logger().info('Stopped recording after 5 snapshots')
else:
response.success = False
self.get_logger().info('Failed to call snapshot service')
return response
def start_recording(self, uri):
storage_options = StorageOptions(uri=uri, snapshot_mode=True, max_cache_size=1024*1024*1, storage_id='mcap')
record_options = RecordOptions()
record_options.all = True
record_options.is_discovery_disabled = False
record_options.topics = ['/rosout']
record_options.compression_format = 'zstd'
record_options.compression_mode = 'file'
self.recorder.record(storage_options, record_options)
self.recording = True
def stop_recording(self):
if self.recording:
self.recorder.cancel()
self.recording = False
self.snapshot_counter = 0
self.start_recording('/')
def main(args=None):
rclpy.init(args=args)
node = BagRecorderNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System info:
Ubuntu 22.04 and ROS2 Humble
The text was updated successfully, but these errors were encountered:
Goal: I want to develop a code that records rosbag and when ever a trigger happens, a snapshot service is called to save the data that is in the cache to a rosbag. The code also should be able to stop the recording and start it again whenever needed (for example, after 5 snapshot calls, save a rosbag and start recording again). I tried to use the Recorder() class from the rosbag2_py package but the code is stuck at
self.recorder.record(storage_options, record_options)
line and cant be do any other task. What would be a potential solution for developing such code?This is a sample python code:
System info:
Ubuntu 22.04 and ROS2 Humble
The text was updated successfully, but these errors were encountered: