Skip to content

Commit

Permalink
Merge pull request #59 from NTIA/calibrate_to_antenna
Browse files Browse the repository at this point in the history
Remove calibrations and retries from signal analyzers
  • Loading branch information
jhazentia committed Mar 27, 2024
2 parents 0d146b4 + cdd9f18 commit 35732cc
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 148 deletions.
7 changes: 6 additions & 1 deletion README.md
Expand Up @@ -86,10 +86,15 @@ file:
cp env.template ./env
```

1. In the newly-created `env` file, set the `BASE_IMAGE`:
1. In the newly-created `env` file, set the following environment variables:

```text
DEVICE_MODEL=RSA507A # Or 'RSA306B', 'RSA517A', etc.
# These are the same for all supported Tektronix RSA devices:
BASE_IMAGE=ghcr.io/ntia/scos-tekrsa/tekrsa_usb:latest
USB_DEVICE=Tektronix
SIGAN_CLASS=TekRSASigan
SIGAN_MODULE=scos_tekrsa.hardware.tekrsa_sigan
```

1. Get environment variables:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -41,7 +41,7 @@ classifiers = [
dependencies = [
"environs>=9.5.0",
"tekrsa-api-wrap>=1.3.2",
"scos_actions @ git+https://github.com/NTIA/scos-actions@8.0.1",
"scos_actions @ git+https://github.com/NTIA/scos-actions@9.0.0",
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/scos_tekrsa/__init__.py
@@ -1 +1 @@
__version__ = "5.0.2"
__version__ = "6.0.0"
@@ -1,5 +1,6 @@
y_factor_cal:
name: SEA_CBRS_Calibrate_Baseline
reference_point: "noise source output"
# Preselector configuration
cal_source_idx: 0
temp_sensor_idx: 2
Expand Down
16 changes: 1 addition & 15 deletions src/scos_tekrsa/hardware/mocks/rsa_block.py
Expand Up @@ -6,11 +6,6 @@

rng = np.random.default_rng()

# For testing IQ capture retry on failure, this parameter controls the
# number of times that the mocked IQSTREAM_Tempfile_NoConfig() will fail
# when run consecutively, before working.
TIMES_TO_FAIL = 3

# Mock Signal Analyzer Constants
DEVICE_NOMENCLATURE = "MOCK RSA507A"
MIN_CENTER_FREQ = 9e3
Expand All @@ -30,8 +25,6 @@
class MockRSA:
def __init__(self, randomize_values=False):
# Simulate returning less than requested num samples
self.times_to_fail = TIMES_TO_FAIL
self.times_failed = 0
self.randomize_values = randomize_values

# Initialize parameters
Expand Down Expand Up @@ -107,10 +100,7 @@ def IQSTREAM_Tempfile_NoConfig(self, dur_msec, return_status):
# Get n_samp from dur_msec
n_samp = int((dur_msec / 1000) * self.IQSTREAM_GetAcqParameters()[1])

if self.times_failed < self.times_to_fail:
self.times_failed += 1
iq = np.ones(0, dtype=np.complex64)
elif self.randomize_values:
if self.randomize_values:
i = rng.normal(0.5, 0.5, n_samp)
q = rng.normal(0.5, 0.5, n_samp)
rand_iq = np.empty(n_samp, dtype=np.complex64)
Expand All @@ -127,10 +117,6 @@ def IQSTREAM_Tempfile_NoConfig(self, dur_msec, return_status):
def IQSTREAM_Acquire(self, dur_msec, return_status):
return self.IQSTREAM_Tempfile_NoConfig(dur_msec, return_status)

def set_times_to_fail(self, n):
self.times_to_fail = n
self.times_failed = 0

def DEVICE_GetFWVersion(self):
return "mock_rsa"

Expand Down
133 changes: 42 additions & 91 deletions src/scos_tekrsa/hardware/tekrsa_sigan.py
Expand Up @@ -4,7 +4,6 @@

from its_preselector.web_relay import WebRelay
from scos_actions import utils
from scos_actions.calibration.calibration import Calibration
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface

import scos_tekrsa.hardware.tekrsa_constants as rsa_constants
Expand All @@ -20,13 +19,11 @@
class TekRSASigan(SignalAnalyzerInterface):
def __init__(
self,
sensor_cal: Calibration = None,
sigan_cal: Calibration = None,
switches: Optional[Dict[str, WebRelay]] = None,
):

try:
super().__init__(sensor_cal, sigan_cal, switches)
super().__init__(switches)
logger.debug("Initializing Tektronix RSA Signal Analyzer")
self._plugin_version = SCOS_TEKRSA_VERSION

Expand All @@ -49,8 +46,6 @@ def __init__(
self.max_frequency = None
self.min_frequency = None

self.sensor_calibration_data = None
self.sigan_calibration_data = None
self._capture_time = None
self._reference_level = None
self._frequency = None
Expand Down Expand Up @@ -280,8 +275,6 @@ def acquire_time_domain_samples(
self,
num_samples: int,
num_samples_skip: int = 0,
retries: int = 5,
cal_adjust: bool = True,
):
"""Acquire specific number of time-domain IQ samples."""
with sigan_lock:
Expand All @@ -295,27 +288,6 @@ def acquire_time_domain_samples(
nskip = int(num_samples_skip) # Requested number of samples to skip
nsamps = nsamps_req + nskip # Total number of samples to collect

if cal_adjust:
# Get calibration data for acquisition
if not (settings.RUNNING_TESTS or settings.MOCK_SIGAN):
cal_params = self.sensor_calibration.calibration_parameters
else:
# Make it work for mock sigan/testing. Just match frequency.
cal_params = [vars(self)["_frequency"]]
try:
cal_args = [vars(self)[f"_{p}"] for p in cal_params]
except KeyError:
raise Exception(
"One or more required cal parameters is not a valid sigan setting."
)
logger.debug(f"Matched calibration params: {cal_args}")
self.recompute_sensor_calibration_data(cal_args)
# Compute the linear gain
db_gain = self.sensor_calibration_data["gain"]
linear_gain = 10.0 ** (db_gain / 20.0)
else:
linear_gain = 1

# Determine correct time length (round up, integer ms)
durationMsec = int(1000 * (nsamps / self.sample_rate)) + (
1000 * nsamps % self.sample_rate > 0
Expand All @@ -331,65 +303,44 @@ def acquire_time_domain_samples(
logger.debug(
f"acquire_time_domain_samples starting, num_samples = {nsamps}"
)
logger.debug(f"Number of retries = {retries}")

max_retries = retries

while True:
self._capture_time = utils.get_datetime_str_now()
data, status = self.rsa.IQSTREAM_Tempfile_NoConfig(durationMsec, True)
data = data[nskip : nskip + nsamps_req] # Remove extra samples, if any
data_len = len(data)

logger.debug(f"IQ Stream status: {status}")

# Check status string for overload / data loss
self.overload = False
if "Input overrange" in status:
self.overload = True
logger.debug("IQ stream: ADC overrange event occurred.")

if "data loss" in status or "discontinuity" in status: # Invalid data
if retries > 0:
logger.info(
f"Data loss occurred during IQ streaming. Retrying {retries} more times."
)
retries -= 1
continue
else:
err = "Data loss occurred with no retries remaining."
err += f" (tried {max_retries} times.)"
raise RuntimeError(err)
elif (
not data_len == nsamps_req
): # Invalid data: incorrect number of samples
if retries > 0:
msg = f"RSA error: requested {nsamps_req + nskip} samples, but got {data_len}."
logger.debug(msg)
logger.debug(f"Retrying {retries} more times.")
retries -= 1
continue
else:
err = "Failed to acquire correct number of samples "
err += f"{max_retries} times in a row."
raise RuntimeError(err)
else:
logger.debug(
f"IQ stream: successfully acquired {data_len} samples."
)
# Scale data to RF power and return
logger.debug(f"Applying gain of {linear_gain}")
data /= linear_gain

measurement_result = {
"data": data,
"overload": self.overload,
"frequency": self.frequency,
"reference_level": self.reference_level,
"sample_rate": self.rsa.IQSTREAM_GetAcqParameters()[1],
"capture_time": self._capture_time,
}
if self._model not in ["RSA306B", "RSA306"]:
measurement_result["attenuation"] = self.attenuation
measurement_result["preamp_enable"] = self.preamp_enable
return measurement_result

self._capture_time = utils.get_datetime_str_now()

data, status = self.rsa.IQSTREAM_Tempfile_NoConfig(durationMsec, True)

data = data[nskip : nskip + nsamps_req] # Remove extra samples, if any
data_len = len(data)

logger.debug(f"IQ Stream status: {status}")

# Check status string for overload / data loss
self.overload = False
if "Input overrange" in status:
self.overload = True
logger.debug("IQ stream: ADC overrange event occurred.")

if "data loss" in status or "discontinuity" in status: # Invalid data
msg = "Data loss occurred during IQ streaming"
logger.debug(msg)
raise RuntimeError(msg)
elif (
not data_len == nsamps_req
): # Invalid data: incorrect number of samples
msg = f"RSA error: requested {nsamps_req + nskip} samples, but got {data_len}."
logger.debug(msg)
raise RuntimeError(msg)
else:
logger.debug(f"IQ stream: successfully acquired {data_len} samples.")

measurement_result = {
"data": data,
"overload": self.overload,
"frequency": self.frequency,
"reference_level": self.reference_level,
"sample_rate": self.rsa.IQSTREAM_GetAcqParameters()[1],
"capture_time": self._capture_time,
}
if self._model not in ["RSA306B", "RSA306"]:
measurement_result["attenuation"] = self.attenuation
measurement_result["preamp_enable"] = self.preamp_enable
return measurement_result
46 changes: 7 additions & 39 deletions tests/test_tekrsa_sigan.py
Expand Up @@ -12,13 +12,12 @@
MAX_IQ_BW,
MIN_CENTER_FREQ,
MIN_IQ_BW,
TIMES_TO_FAIL,
)
from scos_tekrsa.hardware.tekrsa_sigan import TekRSASigan


class TestTekRSA:
# Ensure we write the test cal file and use mocks
# Ensure we use mock TekRSA
setup_complete = False

@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -156,34 +155,11 @@ def test_preamp_enable(self):
assert self.rx.preamp_enable is None
setattr(self.rx, "model", old_dev_name)

def test_acquire_samples_retry(self):
# Not enough retries = acquisition should fail
# The mocked IQ capture function will fail the first
# TIMES_TO_FAIL times it is called consecutively.

# With retries=0, IQ capture should fail TIMES_TO_FAIL times
for i in range(TIMES_TO_FAIL):
with pytest.raises(RuntimeError):
_ = self.rx.acquire_time_domain_samples(
100, retries=0, cal_adjust=False
)

# With retries>TIMES_TO_FAIL, IQ capture should succeed
# In this case, IQ capture fails TIMES_TO_FAIL times within
# acquire_time_domain_samples, which handles the retry logic until
# the IQ acquisition succeeds.
self.rx.rsa.set_times_to_fail(TIMES_TO_FAIL) # Reset times_failed
_ = self.rx.acquire_time_domain_samples(
100, retries=TIMES_TO_FAIL + 1, cal_adjust=False
)

def test_acquire_samples(self):
setattr(self.rx, "iq_bandwidth", max(self.CORRECT_ALLOWED_BW))

# Test non-data measurement result components
r = self.rx.acquire_time_domain_samples(
int(self.rx.iq_bandwidth * 0.001), cal_adjust=False
)
r = self.rx.acquire_time_domain_samples(int(self.rx.iq_bandwidth * 0.001))
assert r["frequency"] == self.rx.frequency
assert r["overload"] == False
assert r["reference_level"] == self.rx.reference_level
Expand All @@ -195,9 +171,7 @@ def test_acquire_samples(self):
# Attenuation/preamp keys should not exist for RSA30X
old_dev_name = self.rx.model
setattr(self.rx, "model", "RSA306B")
r = self.rx.acquire_time_domain_samples(
int(self.rx.iq_bandwidth * 0.001), cal_adjust=False
)
r = self.rx.acquire_time_domain_samples(int(self.rx.iq_bandwidth * 0.001))
with pytest.raises(KeyError):
_ = r["attenuation"]
with pytest.raises(KeyError):
Expand All @@ -207,25 +181,19 @@ def test_acquire_samples(self):
# Acquire n_samps resulting in integer number of milliseconds
for duration_ms in [1, 2, 3, 7, 10]:
n_samps = int(self.rx.iq_bandwidth * duration_ms * 0.001)
result = self.rx.acquire_time_domain_samples(n_samps, cal_adjust=False)
result = self.rx.acquire_time_domain_samples(n_samps)
assert len(result["data"]) == n_samps

# Acquire n_samps resulting in non-integer milliseconds
for duration_ms in [1.1, 2.02, 3.3, 7.007, 10.05]:
n_samps = int(self.rx.iq_bandwidth * duration_ms * 0.001)
result = self.rx.acquire_time_domain_samples(n_samps, cal_adjust=False)
result = self.rx.acquire_time_domain_samples(n_samps)
assert len(result["data"]) == n_samps

# Calibration data is not loaded, cal_adjust should fail
with pytest.raises(Exception):
_ = self.rx.acquire_time_domain_samples(100)

# Non-integer n_samps should fail
with pytest.raises(ValueError):
_ = self.rx.acquire_time_domain_samples(1.01, cal_adjust=False)
_ = self.rx.acquire_time_domain_samples(1.01)

# Test with skipping samples
r = self.rx.acquire_time_domain_samples(
int(self.rx.iq_bandwidth * 0.001), 100, cal_adjust=False
)
r = self.rx.acquire_time_domain_samples(int(self.rx.iq_bandwidth * 0.001), 100)
assert len(r["data"]) == int(self.rx.iq_bandwidth * 0.001)

0 comments on commit 35732cc

Please sign in to comment.