Skip to content

Commit

Permalink
feat: add start stop can trace (#465)
Browse files Browse the repository at this point in the history
* feat(cc_pcan_can): add start and stop methods for pcan trace
  • Loading branch information
yannpoupon committed Mar 26, 2024
1 parent 0351834 commit baa9320
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 55 deletions.
6 changes: 6 additions & 0 deletions docs/whats_new/version_ongoing.rst
@@ -1,2 +1,8 @@
Version ongoing
---------------

PCAN Connector
^^^^^^^^^^^^^^

Trace file can be stopped and started by the function ``stop_pcan_trace`` and ``start_pcan_trace``
to create logfiles on the fly.
76 changes: 67 additions & 9 deletions src/pykiso/lib/connectors/cc_pcan_can/cc_pcan_can.py
Expand Up @@ -18,7 +18,6 @@
.. currentmodule:: cc_pcan_can
"""

import logging
import os
import shutil
Expand Down Expand Up @@ -167,7 +166,8 @@ def __init__(
self.boottime_epoch = boottime_epoch
self._initialize_trace()
self.merge_trc_logs = merge_trc_logs

self._trc_file_names: dict[Path, str | None] = {}
self.trace_running = False
if bus_error_warning_filter:
logging.getLogger("can.pcan").addFilter(PcanFilter())

Expand Down Expand Up @@ -285,7 +285,12 @@ def _pcan_configure_trace(self) -> None:
PCANBasic.PCAN_TRACE_STATUS,
PCANBasic.PCAN_PARAMETER_ON,
)
self.trace_running = True
log.internal_info("Trace activated")
# Add the file name and trace path to rename them later
if self._trc_file_names.get(self.trace_path, None) is None:
self._trc_file_names[self.trace_path] = []
self._trc_file_names[self.trace_path].append(self.trace_name)
self.trc_count += 1
except RuntimeError:
log.error(f"Logging for {self.channel} not activated")
Expand Down Expand Up @@ -415,19 +420,27 @@ def _extract_header(trace: Path) -> str:

def _merge_trc(self) -> None:
"""Merge all traces file in one and fix potential inconsistencies."""
if isinstance(self.trace_path, str):
self.trace_path = Path(self.trace_path)

# Get the latest trace files corresponding to the number of traces created
list_of_traces = sorted(self.trace_path.glob("*.trc"), key=os.path.getmtime)[-self.trc_count :]
list_of_traces = []
first_trace_name = None
first_trace_path = None

for trace_path, trace_file_names in self._trc_file_names.items():
if isinstance(trace_path, str):
trace_path = Path(trace_path)
if first_trace_path is None:
first_trace_name = trace_file_names[0]
first_trace_path = trace_path
# Get all the trace files created in a list
list_of_traces.extend(sorted(trace_path.glob("*.trc"), key=os.path.getmtime)[-len(trace_file_names) :])

try:
if self.trace_name is None:
if first_trace_name is None:
# If a log file is not provided, merge everything into the first created trace
result_trace = list_of_traces[0]
else:
# Otherwise create a separate file
result_trace = Path(self.trace_path / self.trace_name)
result_trace = Path(first_trace_path / first_trace_name)
# replace the first trace with the result trace in the trace list
# to ensure that all traces except the merged trace are deleted in _read_trace_messages
list_of_traces[0] = Path(shutil.move(str(list_of_traces[0]), str(result_trace)))
Expand Down Expand Up @@ -505,7 +518,52 @@ def _remove_offset(messages: List[TypedMessage], offset: float) -> List[TypedMes
msg.timestamp = msg.timestamp + offset
return messages

def _rename_trc(self):
"""Rename the trace file created if a name has been specified"""
for trace_path, trace_file_names in self._trc_file_names.items():
list_of_traces = sorted(trace_path.glob("*.trc"), key=os.path.getmtime)[-len(trace_file_names) :]
for index, file_name in enumerate(trace_file_names):
if file_name is not None:
list_of_traces[index].rename(trace_path / file_name)

def shutdown(self) -> None:
"""Destructor method."""
if self.logging_activated and self.merge_trc_logs:
if not self.logging_activated:
return
if self.merge_trc_logs:
self._merge_trc()
else:
self._rename_trc()

def stop_pcan_trace(self):
"""
Stops the PCAN trace if it is currently running.
:return: None
"""
if not self.trace_running:
log.warning("Trace is already stopped")
return

pcan_channel = getattr(PCANBasic, self.channel)
self._pcan_set_value(
pcan_channel,
PCANBasic.PCAN_TRACE_STATUS,
PCANBasic.PCAN_PARAMETER_OFF,
)
self.trace_running = False

def start_pcan_trace(self, trace_path: Optional[str] = None, trace_size: int = 10) -> None:
"""Start the PCAN trace, the trace file will be renamed after the pcan trace will be stopped.
:param trace_path: Trace path where the trace should be written if None is given it will use
the trace path defined for the last trace created, defaults to None
:param trace_size: maximum size of the trace (in MB), defaults to 10
"""
if self.trace_running:
log.warning("Trace is already started")
return

self.trace_size = trace_size or self.trace_size
self.trace_path = Path(trace_path) if trace_path else self.trace_path
self._initialize_trace()
self._pcan_configure_trace()

0 comments on commit baa9320

Please sign in to comment.