Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add start stop can trace #465

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/whats_new/version_ongoing.rst
@@ -1,2 +1,8 @@
Version ongoing
---------------

PCAN Connector
^^^^^^^^^^^^^^

Trace file can be stopped and started by the function ``stop_pcan_trace`` and ``start_pcan_trace``
to create logfiles on the fly.
76 changes: 67 additions & 9 deletions src/pykiso/lib/connectors/cc_pcan_can/cc_pcan_can.py
Expand Up @@ -18,7 +18,6 @@
.. currentmodule:: cc_pcan_can

"""

import logging
import os
import shutil
Expand Down Expand Up @@ -167,7 +166,8 @@ def __init__(
self.boottime_epoch = boottime_epoch
self._initialize_trace()
self.merge_trc_logs = merge_trc_logs

self.trc_file_names: dict[Path, str | None] = {}
yannpoupon marked this conversation as resolved.
Show resolved Hide resolved
self.trace_running = False
if bus_error_warning_filter:
logging.getLogger("can.pcan").addFilter(PcanFilter())

Expand Down Expand Up @@ -285,7 +285,12 @@ def _pcan_configure_trace(self) -> None:
PCANBasic.PCAN_TRACE_STATUS,
PCANBasic.PCAN_PARAMETER_ON,
)
self.trace_running = True
log.internal_info("Trace activated")
# Add the file name and trace path to rename them later
if self.trc_file_names.get(self.trace_path, None) is None:
self.trc_file_names[self.trace_path] = []
self.trc_file_names[self.trace_path].append(self.trace_name)
self.trc_count += 1
except RuntimeError:
log.error(f"Logging for {self.channel} not activated")
Expand Down Expand Up @@ -415,19 +420,27 @@ def _extract_header(trace: Path) -> str:

def _merge_trc(self) -> None:
"""Merge all traces file in one and fix potential inconsistencies."""
if isinstance(self.trace_path, str):
self.trace_path = Path(self.trace_path)

# Get the latest trace files corresponding to the number of traces created
list_of_traces = sorted(self.trace_path.glob("*.trc"), key=os.path.getmtime)[-self.trc_count :]
list_of_traces = []
first_trace_name = None
first_trace_path = None

for trace_path, trace_file_names in self.trc_file_names.items():
if isinstance(trace_path, str):
trace_path = Path(trace_path)
if first_trace_path is None:
first_trace_name = trace_file_names[0]
first_trace_path = trace_path
# Get all the trace files created in a list
list_of_traces.extend(sorted(trace_path.glob("*.trc"), key=os.path.getmtime)[-len(trace_file_names) :])

try:
if self.trace_name is None:
if first_trace_name is None:
# If a log file is not provided, merge everything into the first created trace
result_trace = list_of_traces[0]
else:
# Otherwise create a separate file
result_trace = Path(self.trace_path / self.trace_name)
result_trace = Path(first_trace_path / first_trace_name)
# replace the first trace with the result trace in the trace list
# to ensure that all traces except the merged trace are deleted in _read_trace_messages
list_of_traces[0] = Path(shutil.move(str(list_of_traces[0]), str(result_trace)))
Expand Down Expand Up @@ -505,7 +518,52 @@ def _remove_offset(messages: List[TypedMessage], offset: float) -> List[TypedMes
msg.timestamp = msg.timestamp + offset
return messages

def _rename_trc(self):
"""Rename the trace file created if a name has been specified"""
for trace_path, trace_file_names in self.trc_file_names.items():
list_of_traces = sorted(trace_path.glob("*.trc"), key=os.path.getmtime)[-len(trace_file_names) :]
for index, file_name in enumerate(trace_file_names):
if file_name is not None:
list_of_traces[index].rename(trace_path / file_name)

def shutdown(self) -> None:
"""Destructor method."""
if self.logging_activated and self.merge_trc_logs:
if not self.logging_activated:
return
if self.merge_trc_logs:
self._merge_trc()
else:
self._rename_trc()

def stop_pcan_trace(self):
"""
Stops the PCAN trace if it is currently running.
:return: None
"""
if not self.trace_running:
log.warning("Trace is already stopped")
return

pcan_channel = getattr(PCANBasic, self.channel)
self._pcan_set_value(
pcan_channel,
PCANBasic.PCAN_TRACE_STATUS,
PCANBasic.PCAN_PARAMETER_OFF,
)
self.trace_running = False

def start_pcan_trace(self, trace_path: Optional[str] = None, trace_size: int = 10) -> None:
"""Start the PCAN trace, the trace file will be renamed after the pcan trace will be stopped.

:param trace_path: Trace path where the trace should be written if None is given it will use
the trace path defined for the last trace created, defaults to None
:param trace_size: maximum size of the trace (in MB), defaults to 10
"""
if self.trace_running:
log.warning("Trace is already started")
return

self.trace_size = trace_size or self.trace_size
self.trace_path = Path(trace_path) if trace_path else self.trace_path
self._initialize_trace()
self._pcan_configure_trace()