Skip to content

Commit

Permalink
Merge branch 'SEA-178_test_actions_loading' of https://github.com/NTI…
Browse files Browse the repository at this point in the history
  • Loading branch information
dboulware committed Apr 26, 2024
2 parents 4525a64 + 7c0454d commit ed7679f
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 112 deletions.
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
@@ -1 +1 @@
__version__ = "9.0.0"
__version__ = "10.0.0"
2 changes: 0 additions & 2 deletions scos_actions/actions/calibrate_y_factor.py
Expand Up @@ -231,14 +231,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"cal_params: {cal_params}")
cal_data = dict()
last_cal_datetime = get_datetime_str_now()
clock_rate_lookup_by_sample_rate = []
self.sensor.sensor_calibration = SensorCalibration(
calibration_parameters=cal_params,
calibration_data=cal_data,
calibration_reference=onboard_cal_reference,
file_path=Path(env("ONBOARD_CALIBRATION_FILE")),
last_calibration_datetime=last_cal_datetime,
clock_rate_lookup_by_sample_rate=clock_rate_lookup_by_sample_rate,
sensor_uid=sensor_uid,
)
elif self.sensor.sensor_calibration.file_path == env(
Expand Down
19 changes: 10 additions & 9 deletions scos_actions/actions/interfaces/action.py
Expand Up @@ -59,15 +59,16 @@ def configure_sigan(self, params: dict):

def configure_preselector(self, params: dict):
preselector = self.sensor.preselector
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
elif self.sensor.has_configurable_preselector:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
if self.sensor.has_configurable_preselector:
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
else:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
)
else:
# No preselector in use, so do not require an RF path
pass
Expand Down
7 changes: 4 additions & 3 deletions scos_actions/actions/interfaces/measurement_action.py
Expand Up @@ -67,15 +67,16 @@ def get_calibration(self, measurement_result: dict) -> ntia_sensor.Calibration:
noise_figure=round(
measurement_result["applied_calibration"]["noise_figure"], 3
),
temperature=round(
self.sensor.sensor_calibration_data["temperature"], 1
),
reference=measurement_result["reference"],
)
if "compression_point" in measurement_result["applied_calibration"]:
cal_meta.compression_point = measurement_result["applied_calibration"][
"compression_point"
]
if "temperature" in self.sensor.sensor_calibration_data:
cal_meta.temperature = round(
self.sensor.sensor_calibration_data["temperature"], 1
)
return cal_meta

def create_metadata(
Expand Down
34 changes: 0 additions & 34 deletions scos_actions/actions/logger.py

This file was deleted.

4 changes: 2 additions & 2 deletions scos_actions/actions/sync_gps.py
Expand Up @@ -19,12 +19,12 @@ def __init__(self, parameters: dict):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug("Syncing to GPS")
self.sensor = sensor
dt = self.sensor.gps.get_gps_time()
dt = self.sensor.gps.get_gps_time(self.sensor)
date_cmd = ["date", "-s", "{:}".format(dt.strftime("%Y/%m/%d %H:%M:%S"))]
subprocess.check_output(date_cmd, shell=True)
logger.info(f"Set system time to GPS time {dt.ctime()}")

location = sensor.gps.get_location()
location = sensor.gps.get_location(self.sensor)
if location is None:
raise RuntimeError("Unable to synchronize to GPS")

Expand Down
8 changes: 0 additions & 8 deletions scos_actions/calibration/sensor_calibration.py
Expand Up @@ -25,16 +25,8 @@ class provides an implementation for the update method to allow calibration
"""

last_calibration_datetime: str
clock_rate_lookup_by_sample_rate: List[Dict[str, float]]
sensor_uid: str

def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]:
"""Find the clock rate (Hz) using the given sample_rate (samples per second)"""
for mapping in self.clock_rate_lookup_by_sample_rate:
if mapping["sample_rate"] == sample_rate:
return mapping["clock_frequency"]
return sample_rate

def update(
self,
params: dict,
Expand Down
1 change: 0 additions & 1 deletion scos_actions/calibration/tests/test_calibration.py
Expand Up @@ -103,7 +103,6 @@ def test_to_and_from_json(self, tmp_path: Path):
"testing",
tmp_path / "testing.json",
"dt_str",
[],
"uid",
)
sensor_cal.to_json()
Expand Down
22 changes: 0 additions & 22 deletions scos_actions/calibration/tests/test_sensor_calibration.py
Expand Up @@ -83,17 +83,6 @@ def setup_calibration_file(self, tmp_path: Path):
cal_data["sensor_uid"] = "SAMPLE_CALIBRATION"
cal_data["calibration_reference"] = "TESTING"

# Add SR/CF lookup table
cal_data["clock_rate_lookup_by_sample_rate"] = []
for sr in self.sample_rates:
cr = sr
while cr <= 40e6:
cr *= 2
cr /= 2
cal_data["clock_rate_lookup_by_sample_rate"].append(
{"sample_rate": int(sr), "clock_frequency": int(cr)}
)

# Create the JSON architecture for the calibration data
cal_data["calibration_data"] = {}
cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"]
Expand Down Expand Up @@ -151,7 +140,6 @@ def test_sensor_calibration_dataclass_fields(self):
# Note: does not check field order
assert fields == {
"last_calibration_datetime": str,
"clock_rate_lookup_by_sample_rate": List[Dict[str, float]],
"sensor_uid": str,
}

Expand All @@ -167,13 +155,6 @@ def test_field_validator(self):
[], {}, False, Path(""), datetime.datetime.now(), [], "uid"
)

def test_get_clock_rate(self):
"""Test the get_clock_rate method"""
# Test getting a clock rate by sample rate
assert self.sample_cal.get_clock_rate(10e6) == 40e6
# If there isn't an entry, the sample rate should be returned
assert self.sample_cal.get_clock_rate(-999) == -999

def test_get_calibration_dict_exact_match_lookup(self):
calibration_datetime = get_datetime_str_now()
calibration_params = ["sample_rate", "frequency"]
Expand All @@ -187,7 +168,6 @@ def test_get_calibration_dict_exact_match_lookup(self):
calibration_reference="testing",
file_path=Path(""),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
cal_data = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 200.0})
Expand All @@ -206,7 +186,6 @@ def test_get_calibration_dict_within_range(self):
calibration_reference="testing",
file_path=Path("test_calibration.json"),
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)

Expand Down Expand Up @@ -234,7 +213,6 @@ def test_update(self):
calibration_reference="testing",
file_path=test_cal_path,
last_calibration_datetime=calibration_datetime,
clock_rate_lookup_by_sample_rate=[],
sensor_uid="TESTING",
)
action_params = {"sample_rate": 100.0, "frequency": 200.0}
Expand Down
@@ -1,26 +1,22 @@
nasctn_sea_data_product:
name: test_nasctn_sea_data_product
name: test_SEA_CBRS_Measure_Baseline
rf_path: antenna
calibration_adjust: False
# IIR filter settings
iir_apply: True
iir_gpass_dB: 0.1 # Max passband ripple below unity gain
iir_gstop_dB: 40 # Minimum stopband attenuation
iir_pb_edge_Hz: 5e6 # Passband edge frequency
iir_sb_edge_Hz: 5.008e6 # Stopband edge frequency
# Mean/Max FFT settings
fft_size: 175
# FFT settings
nffts: 320e3
fft_window_type: flattop # See scipy.signal.get_window for supported input
# PFP frame
pfp_frame_period_ms: 10
# APD downsampling settings
apd_bin_size_dB: 0.5 # Set to 0 or negative for no downsampling
apd_min_bin_dBm: -180
apd_bin_size_dB: 1.0 # Set to 0 or negative for no downsampling
apd_max_bin_dBm: -30
apd_min_bin_dBm: -180
# Time domain power statistics settings
td_bin_size_ms: 10
# Round all power results to X decimal places
round_to_places: 2
# Sigan Settings
preamp_enable: True
reference_level: -25
Expand Down
Expand Up @@ -17,3 +17,4 @@ stepped_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Expand Up @@ -7,3 +7,4 @@ single_frequency_time_domain_iq:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
Expand Up @@ -8,3 +8,4 @@ single_frequency_fft:
nskip: 15.36e4
calibration_adjust: False
classification: UNCLASSIFIED
rf_path: antenna
1 change: 1 addition & 0 deletions scos_actions/configs/actions/test_survey_iq_action.yml
Expand Up @@ -27,3 +27,4 @@ stepped_frequency_time_domain_iq:
- 10000
nskip: 15.36e4
calibration_adjust: False
rf_path: antenna
33 changes: 18 additions & 15 deletions scos_actions/discover/__init__.py
@@ -1,20 +1,11 @@
from scos_actions.actions import action_classes
from scos_actions.actions.logger import Logger
from scos_actions.actions.monitor_sigan import MonitorSignalAnalyzer
from scos_actions.actions.sync_gps import SyncGps
from scos_actions.discover.yaml import load_from_yaml
from scos_actions.settings import ACTION_DEFINITIONS_DIR
from scos_actions.settings import ACTION_DEFINITIONS_DIR, SIGAN_CLASS, SIGAN_MODULE

actions = {
"logger": Logger(),
}
test_actions = {
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
"logger": Logger(),
}
actions = {}
test_actions = {}


def init(
Expand All @@ -31,6 +22,18 @@ def init(
return yaml_actions, yaml_test_actions


yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(yaml_test_actions)
if (
SIGAN_MODULE == "scos_actions.hardware.mocks.mock_sigan"
and SIGAN_CLASS == "MockSignalAnalyzer"
):
yaml_actions, yaml_test_actions = init()
actions.update(yaml_actions)
test_actions.update(
{
"test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
parameters={"name": "test_monitor_sigan"}
),
}
)
test_actions.update(yaml_test_actions)
6 changes: 4 additions & 2 deletions scos_actions/hardware/gps_iface.py
Expand Up @@ -3,9 +3,11 @@

class GPSInterface(ABC):
@abstractmethod
def get_location(self, timeout_s=1):
def get_location(
self, sensor: "scos_actions.hardware.sensor.Sensor", timeout_s: float = 1
):
pass

@abstractmethod
def get_gps_time(self):
def get_gps_time(self, sensor: "scos_actions.hardware.sensor.Sensor"):
pass
5 changes: 3 additions & 2 deletions scos_actions/hardware/mocks/mock_gps.py
Expand Up @@ -7,10 +7,11 @@


class MockGPS(GPSInterface):
def get_location(timeout_s=1):

def get_location(self, sensor, timeout_s=1):
logger.warning("Using mock GPS!")
return 39.995118, -105.261572, 1651.0

def get_gps_time(self):
def get_gps_time(self, sensor):
logger.warning("Using mock GPS!")
return datetime.now()
6 changes: 6 additions & 0 deletions scos_actions/hardware/mocks/mock_sigan.py
Expand Up @@ -6,6 +6,7 @@

import numpy as np

from scos_actions import __package__ as SCOS_ACTIONS_NAME
from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface
from scos_actions.utils import get_datetime_str_now
Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(
self._reference_level = -30
self._is_available = True
self._plugin_version = SCOS_ACTIONS_VERSION
self._plugin_name = SCOS_ACTIONS_NAME
self._firmware_version = "1.2.3"
self._api_version = "v1.2.3"

Expand All @@ -60,6 +62,10 @@ def is_available(self):
def plugin_version(self):
return self._plugin_version

@property
def plugin_name(self):
return self._plugin_name

@property
def sample_rate(self):
return self._sample_rate
Expand Down

0 comments on commit ed7679f

Please sign in to comment.