Skip to content

Commit

Permalink
Implementing a more flexibel data ingest for a_b() (#141)
Browse files Browse the repository at this point in the history
* implementing a more flexibel data ingest for a_b()
* add new test for a multi-dimensional f_GHz as xr.DataArray
  • Loading branch information
maxmargraf committed Nov 10, 2023
1 parent 86c64a9 commit 975242b
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 35 deletions.
121 changes: 87 additions & 34 deletions pycomlink/processing/k_R_relation.py
@@ -1,5 +1,7 @@
from __future__ import division
import numpy as np
import xarray as xr
from scipy.interpolate import interp1d

from .xarray_wrapper import xarray_apply_along_time_dim

Expand Down Expand Up @@ -32,13 +34,16 @@ def calc_R_from_A(
Path-integrated attenuation of microwave link signal
L_km : float
Length of the link in km
f_GHz : float, optional
f_GHz : float, np.array, or xr.DataArray optional
Frequency in GHz. If provided together with `pol`, it will be used to
derive the parameters a and b for the k-R power law.
pol : string, optional
Polarization, that is either 'H' for horizontal or 'V' for vertical. Has
to be provided together with `f_GHz`. It will be used to derive the
parameters a and b for the k-R power law.
pol : string, np.array or xr.DataArray optional
Polarization, that is either 'horizontal' for horizontal or 'vertical'
for vertical. 'H', 'h' and 'Horizontal' as well as 'V', 'v' and 'Vertical'
are also allowed. Has to be provided together with `f_GHz`. It will be
used to derive the parameters a and b for the k-R power law. Must have
same shape as f_GHz or be a str. If it is a str, it will be expanded to
the shape of f_GHz.
a : float, optional
Parameter of A-R relationship
b : float, optional
Expand All @@ -65,15 +70,24 @@ def calc_R_from_A(
"""



# Make sure that we only continue if a correct combination of optional args is used
if (f_GHz is not None) and (pol is not None) and (a is None) and (b is None):
# f_GHz and pol must be np.arrays within this function before fed to
# a_b(), otherwise a_b() can return a xr.DataArray with non-matching
# dimensions in certain cases. That interferes with our xarray-wrapper
# decorator.
f_GHz = np.atleast_1d(f_GHz).astype(float)
pol = np.atleast_1d(pol)
a, b = a_b(f_GHz, pol=pol, approx_type=a_b_approximation)
elif (a is not None) and (b is not None) and (f_GHz is None) and (pol is None):
# in this case we use `a` and `b` from args
pass
else:
raise ValueError(
"Either `f_GHz` and `pol` or `a` and `b` have to be passed. Any other combination is not allowed."
"Either `f_GHz` and `pol` or `a` and `b` have to be passed. "
"Any other combination is not allowed."
)

A = np.atleast_1d(A).astype(float)
Expand Down Expand Up @@ -152,14 +166,17 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"):
Parameters
----------
f_GHz : int, float or np.array of these
Frequency of the microwave link in GHz
pol : str
Polarization of the microwave link
f_GHz : int, float, np.array or xr.DataArray
Frequency of the microwave link(s) in GHz.
pol : str, np.array or xr.DataArray
Polarization, that is either 'horizontal' for horizontal or 'vertical'
for vertical. 'H', 'h' and 'Horizontal' as well as 'V', 'v' and 'Vertical'
are also allowed. Must have same shape as f_GHz or be a str. If it is a
str, it will be expanded to the shape of f_GHz.
approx_type : str, optional
Approximation type (the default is 'ITU_2005', which implies parameter
approximation using a table recommanded by ITU in 2005. An older version
of 2003 is available via 'ITU_2003'.)
Approximation type (the default is 'ITU_2005', which implies parameter
approximation using a table recommanded by ITU in 2005. An older version
of 2003 is available via 'ITU_2003'.)
Returns
-------
Expand All @@ -183,32 +200,67 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"):
prediction methods", International Telecommunication Union, P.838-2 (04/2003) P.838-3 (03/2005)
"""
from scipy.interpolate import interp1d
if isinstance(f_GHz, xr.DataArray):
return_xarray = True
f_GHz_coords = f_GHz.coords
else:
return_xarray = False

f_GHz = xr.DataArray(f_GHz)

if isinstance(pol, str):
pol = xr.full_like(f_GHz, pol, dtype=object)
pol = xr.DataArray(pol)

if f_GHz.shape != pol.shape:
raise ValueError("Frequency and polarization must have identical shape.")

f_GHz = np.atleast_1d(f_GHz)
pol = np.atleast_1d(pol)

f_GHz = np.asarray(f_GHz)
pol_v_str_variants = ['v','V','vertical','Vertical']
pol_h_str_variants = ['h','H','horizontal','Horizontal']

if f_GHz.min() < 1 or f_GHz.max() > 100:
raise ValueError("Frequency must be between 1 Ghz and 100 GHz.")
if not np.isin(pol,pol_v_str_variants+pol_h_str_variants).all():
raise ValueError("Polarization must be V, v, Vertical, vertical, H,"
"Horizontal or horizontal.")
# select ITU table
if approx_type == "ITU_2003":
ITU_table = ITU_table_2003.copy()
elif approx_type == "ITU_2005":
ITU_table = ITU_table_2005.copy()
else:
# select ITU table
if approx_type == "ITU_2003":
ITU_table = ITU_table_2003.copy()
elif approx_type == "ITU_2005":
ITU_table = ITU_table_2005.copy()
else:
raise ValueError("Approximation type not available.")

if pol == "V" or pol == "v" or pol == 'Vertical' or pol == "vertical":
f_a = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic")
f_b = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic")
elif pol == "H" or pol == "h" or pol == "Horizontal" or pol == "horizontal":
f_a = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic")
f_b = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic")
else:
raise ValueError("Polarization must be V, v, Vertical, vertical, H,"
"Horizontal or horizontal.")
a = f_a(f_GHz)
b = f_b(f_GHz)
raise ValueError("Approximation type not available.")

interp_a_v = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic")
interp_b_v = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic")
interp_a_h = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic")
interp_b_h = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic")

a_v = interp_a_v(f_GHz)
b_v = interp_b_v(f_GHz)
a_h = interp_a_h(f_GHz)
b_h = interp_b_h(f_GHz)

a = np.full_like(f_GHz, fill_value=np.nan, dtype=float)
b = np.full_like(f_GHz, fill_value=np.nan, dtype=float)

pol_mask_v = np.isin(pol, pol_v_str_variants)
pol_mask_h = np.isin(pol, pol_h_str_variants)

a[pol_mask_h] = a_h[pol_mask_h]
a[pol_mask_v] = a_v[pol_mask_v]
b[pol_mask_h] = b_h[pol_mask_h]
b[pol_mask_v] = b_v[pol_mask_v]

# If f_GHz is a xr.DataArray a and b should be also returned as xr.DataArray
# with identical coordinates
if return_xarray:
a = xr.DataArray(a, coords=f_GHz_coords)
b = xr.DataArray(b, coords=f_GHz_coords)

return a, b


Expand Down Expand Up @@ -337,3 +389,4 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"):
)

# fmt: on

96 changes: 95 additions & 1 deletion pycomlink/tests/test_k_R_relation.py
Expand Up @@ -2,6 +2,7 @@

import numpy as np
from numpy.testing import assert_almost_equal
import xarray as xr

from pycomlink.processing import k_R_relation

Expand All @@ -24,7 +25,7 @@ def test_with_float(self):
assert_almost_equal(0.2403, calculated_a)
assert_almost_equal(0.9485, calculated_b)

def test_with_array(self):
def test_with_f_GHz_array(self):
f_GHz = np.arange(1, 100, 0.5)

pol = "V"
Expand Down Expand Up @@ -56,6 +57,92 @@ def test_with_array(self):
expected_b,
calculated_b,
)
def test_with_f_GHz_pol_array(self):
f_GHz = np.arange(1, 100, 0.5)
pol = np.repeat("V", len(f_GHz))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915])
expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

pol = np.repeat("H", len(f_GHz))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946])
expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

def test_with_f_GHz_pol_xarray(self):
f_GHz = xr.DataArray(np.arange(1, 100, 0.5))
pol = "V"
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915])
expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

pol = xr.DataArray(np.repeat("H", len(f_GHz)))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946])
expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)
def test_with_f_GHz_mulidim_xarray(self):
A = 5
L_km = 5
A, Z = np.array(["A", "Z"]).view("int32")
cml_ids = np.random.randint(low=A, high=Z, size=198 * 4,
dtype="int32").view(f"U{4}")
f_GHz = xr.DataArray(
data=[np.arange(1, 100, 0.5)],
dims=['sublin_id', 'cml_id'],
coords=dict(
sublink_id='sublink_1',
cml_id=cml_ids, ))
pol = "H"
expected_a, expected_b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")

assert_almost_equal(
expected_a.sum(),
128.67885799,
)
assert_almost_equal(
expected_b.sum(),
175.58906864,
)

def test_interpolation(self):
f_GHz = np.array([1.7, 28.9, 82.1])
Expand Down Expand Up @@ -110,6 +197,7 @@ def test_raises(self):
k_R_relation.a_b(30, "H", approx_type="ITU_2000")



class Test_calc_R_from_A(unittest.TestCase):
def test_with_int(self):
A = 5
Expand Down Expand Up @@ -155,6 +243,11 @@ def test_raise_with_wrong_args(self):
A=2, L_km=42, f_GHz=10, pol="H", a=1
)

with self.assertRaises(ValueError):
# pol and F_GHz as xr.DataArray but with different size
calculated_R = k_R_relation.calc_R_from_A(
A=2, L_km=42, f_GHz=xr.DataArray([10,10]), pol=xr.DataArray("H"))

def test_different_power_law_approximation_types(self):
A = 5
L_km = 5
Expand Down Expand Up @@ -205,3 +298,4 @@ def test_with_array(self):
expected_R = np.array([0.0, 47.24635411, 71.08341151])
calculated_R = k_R_relation.calc_R_from_A(A[:3], L_km, f_GHz, pol)
assert_almost_equal(expected_R, calculated_R)

0 comments on commit 975242b

Please sign in to comment.