Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sea 178 test actions loading #118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
@@ -1 +1 @@
__version__ = "9.0.0"
__version__ = "10.0.0"
2 changes: 0 additions & 2 deletions scos_actions/actions/calibrate_y_factor.py
Expand Up @@ -231,14 +231,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"cal_params: {cal_params}")
cal_data = dict()
last_cal_datetime = get_datetime_str_now()
clock_rate_lookup_by_sample_rate = []
self.sensor.sensor_calibration = SensorCalibration(
calibration_parameters=cal_params,
calibration_data=cal_data,
calibration_reference=onboard_cal_reference,
file_path=Path(env("ONBOARD_CALIBRATION_FILE")),
last_calibration_datetime=last_cal_datetime,
clock_rate_lookup_by_sample_rate=clock_rate_lookup_by_sample_rate,
sensor_uid=sensor_uid,
)
elif self.sensor.sensor_calibration.file_path == env(
Expand Down
19 changes: 10 additions & 9 deletions scos_actions/actions/interfaces/action.py
Expand Up @@ -59,15 +59,16 @@ def configure_sigan(self, params: dict):

def configure_preselector(self, params: dict):
preselector = self.sensor.preselector
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
elif self.sensor.has_configurable_preselector:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
if self.sensor.has_configurable_preselector:
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
else:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
else:
# No preselector in use, so do not require an RF path
pass
Expand Down
7 changes: 4 additions & 3 deletions scos_actions/actions/interfaces/measurement_action.py
Expand Up @@ -67,15 +67,16 @@ def get_calibration(self, measurement_result: dict) -> ntia_sensor.Calibration:
noise_figure=round(
measurement_result["applied_calibration"]["noise_figure"], 3
),
temperature=round(
self.sensor.sensor_calibration_data["temperature"], 1
),
reference=measurement_result["reference"],
)
if "compression_point" in measurement_result["applied_calibration"]:
cal_meta.compression_point = measurement_result["applied_calibration"][
"compression_point"
]
if "temperature" in self.sensor.sensor_calibration_data:
cal_meta.temperature = round(
self.sensor.sensor_calibration_data["temperature"], 1
)
return cal_meta

def create_metadata(
Expand Down
34 changes: 0 additions & 34 deletions scos_actions/actions/logger.py

This file was deleted.

4 changes: 2 additions & 2 deletions scos_actions/actions/sync_gps.py
Expand Up @@ -19,12 +19,12 @@ def __init__(self, parameters: dict):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug("Syncing to GPS")
self.sensor = sensor
dt = self.sensor.gps.get_gps_time()
dt = self.sensor.gps.get_gps_time(self.sensor)
date_cmd = ["date", "-s", "{:}".format(dt.strftime("%Y/%m/%d %H:%M:%S"))]
subprocess.check_output(date_cmd, shell=True)
logger.info(f"Set system time to GPS time {dt.ctime()}")

location = sensor.gps.get_location()
location = sensor.gps.get_location(self.sensor)
if location is None:
raise RuntimeError("Unable to synchronize to GPS")

Expand Down
8 changes: 0 additions & 8 deletions scos_actions/calibration/sensor_calibration.py
Expand Up @@ -25,16 +25,8 @@ class provides an implementation for the update method to allow calibration
"""

last_calibration_datetime: str
clock_rate_lookup_by_sample_rate: List[Dict[str, float]]
sensor_uid: str

def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]:
"""Find the clock rate (Hz) using the given sample_rate (samples per second)"""
for mapping in self.clock_rate_lookup_by_sample_rate:
if mapping["sample_rate"] == sample_rate:
return mapping["clock_frequency"]
return sample_rate

def update(
self,
params: dict,
Expand Down
1 change: 0 additions & 1 deletion scos_actions/calibration/tests/test_calibration.py
Expand Up @@ -103,7 +103,6 @@ def test_to_and_from_json(self, tmp_path: Path):
"testing",
tmp_path / "testing.json",
"dt_str",
[],
"uid",
)
sensor_cal.to_json()
Expand Down
22 changes: 0 additions & 22 deletions scos_actions/calibration/tests/test_sensor_calibration.py
Expand Up @@ -83,17 +83,6 @@ def setup_calibration_file(self, tmp_path: Path):
cal_data["sensor_uid"] = "SAMPLE_CALIBRATION"
cal_data["calibration_reference"] = "TESTING"

# Add SR/CF lookup table
cal_data["clock_rate_lookup_by_sample_rate"] = []
for sr in self.sample_rates:
cr = sr
while cr <= 40e6:
cr *= 2
cr /= 2
cal_data["clock_rate_lookup_by_sample_rate"].append(
{"sample_rate": int(sr), "clock_frequency": int(cr)}
)

# Create the JSON architecture for the calibration data
cal_data["calibration_data"] = {}
cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"]
Expand Down Expand Up @@ -151,7 +140,6 @@ def test_sensor_calibration_dataclass_fields(self):
# Note: does not check field order
assert fields == {
"last_calibration_datetime": str,
"clock_rate_lookup_by_sample_rate": List[Dict[str, float]],
"sensor_uid": str,
}

Expand All @@ -167,13 +155,6 @@ def test_field_validator(self):
[], {}, False, Path(""), datetime.datetime.now(), [], "uid"
)

def test_get_clock_rate(self):
"""Test the get_clock_rate method"""
# Test getting a clock rate by sample rate
assert self.sample_cal.get_clock_rate(10e6) == 40e6
# If there isn't an entry, the sample rate should be returned
assert self.sample_cal.get_clock_rate(-999) == -999

def test_get_calibration_dict_exact_match_lookup(self):
calibration_datetime = get_datetime_str_now()
calibration_params = ["sample_rate", "frequency"]
Expand All @@ -187,7 +168,6 @@ def test_get_calibration_dict_exact_match_lookup(self):
calibration_reference="testing",
file_path=Path(""),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
cal_data = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 200.0})
Expand All @@ -206,7 +186,6 @@ def test_get_calibration_dict_within_range(self):
calibration_reference="testing",
file_path=Path("test_calibration.json"),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)

Expand Down Expand Up @@ -234,7 +213,6 @@ def test_update(self):
calibration_reference="testing",
file_path=test_cal_path,
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
action_params = {"sample_rate": 100.0, "frequency": 200.0}
Expand Down
@@ -1,26 +1,22 @@
nasctn_sea_data_product:
name: test_nasctn_sea_data_product
name: test_SEA_CBRS_Measure_Baseline
rf_path: antenna
calibration_adjust: False
# IIR filter settings
iir_apply: True
iir_gpass_dB: 0.1 # Max passband ripple below unity gain
iir_gstop_dB: 40 # Minimum stopband attenuation
iir_pb_edge_Hz: 5e6 # Passband edge frequency
iir_sb_edge_Hz: 5.008e6 # Stopband edge frequency
# Mean/Max FFT settings
fft_size: 175
# FFT settings
nffts: 320e3
fft_window_type: flattop # See scipy.signal.get_window for supported input
# PFP frame
pfp_frame_period_ms: 10
# APD downsampling settings
apd_bin_size_dB: 0.5 # Set to 0 or negative for no downsampling
apd_min_bin_dBm: -180
apd_bin_size_dB: 1.0 # Set to 0 or negative for no downsampling
apd_max_bin_dBm: -30
apd_min_bin_dBm: -180
# Time domain power statistics settings
td_bin_size_ms: 10
# Round all power results to X decimal places
round_to_places: 2
# Sigan Settings
preamp_enable: True
reference_level: -25
Expand Down
Expand Up @@ -17,3 +17,4 @@ stepped_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Expand Up @@ -7,3 +7,4 @@ single_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Expand Up @@ -8,3 +8,4 @@ single_frequency_fft:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
1 change: 1 addition & 0 deletions scos_actions/configs/actions/test_survey_iq_action.yml
Expand Up @@ -27,3 +27,4 @@ stepped_frequency_time_domain_iq:
- 10000
nskip: 15.36e4
calibration_adjust: False
rf_path: antenna
33 changes: 18 additions & 15 deletions scos_actions/discover/__init__.py
@@ -1,20 +1,11 @@
from scos_actions.actions import action_classes
from scos_actions.actions.logger import Logger
from scos_actions.actions.monitor_sigan import MonitorSignalAnalyzer
from scos_actions.actions.sync_gps import SyncGps
from scos_actions.discover.yaml import load_from_yaml
from scos_actions.settings import ACTION_DEFINITIONS_DIR
from scos_actions.settings import ACTION_DEFINITIONS_DIR, SIGAN_CLASS, SIGAN_MODULE

actions = {
"logger": Logger(),
}
test_actions = {
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
"logger": Logger(),
}
actions = {}
test_actions = {}


def init(
Expand All @@ -31,6 +22,18 @@ def init(
return yaml_actions, yaml_test_actions


yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(yaml_test_actions)
if (
SIGAN_MODULE == "scos_actions.hardware.mocks.mock_sigan"
and SIGAN_CLASS == "MockSignalAnalyzer"
):
yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(
{
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
}
)
test_actions.update(yaml_test_actions)
6 changes: 4 additions & 2 deletions scos_actions/hardware/gps_iface.py
Expand Up @@ -3,9 +3,11 @@

class GPSInterface(ABC):
@abstractmethod
def get_location(self, timeout_s=1):
def get_location(
self, sensor: "scos_actions.hardware.sensor.Sensor", timeout_s: float = 1
):
pass

@abstractmethod
def get_gps_time(self):
def get_gps_time(self, sensor: "scos_actions.hardware.sensor.Sensor"):
pass
5 changes: 3 additions & 2 deletions scos_actions/hardware/mocks/mock_gps.py
Expand Up @@ -7,10 +7,11 @@


class MockGPS(GPSInterface):
def get_location(timeout_s=1):

def get_location(self, sensor, timeout_s=1):
logger.warning("Using mock GPS!")
return 39.995118, -105.261572, 1651.0

def get_gps_time(self):
def get_gps_time(self, sensor):
logger.warning("Using mock GPS!")
return datetime.now()
6 changes: 6 additions & 0 deletions scos_actions/hardware/mocks/mock_sigan.py
Expand Up @@ -6,6 +6,7 @@

import numpy as np

from scos_actions import __package__ as SCOS_ACTIONS_NAME
from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface
from scos_actions.utils import get_datetime_str_now
Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(
self._reference_level = -30
self._is_available = True
self._plugin_version = SCOS_ACTIONS_VERSION
self._plugin_name = SCOS_ACTIONS_NAME
self._firmware_version = "1.2.3"
self._api_version = "v1.2.3"

Expand All @@ -60,6 +62,10 @@ def is_available(self):
def plugin_version(self):
return self._plugin_version

@property
def plugin_name(self):
return self._plugin_name

@property
def sample_rate(self):
return self._sample_rate
Expand Down