Skip to content

Commit

Permalink
implementing a more flexibel data ingest for a_b()
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmargraf committed Nov 9, 2023
1 parent 86c64a9 commit 6403b67
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 24 deletions.
85 changes: 62 additions & 23 deletions pycomlink/processing/k_R_relation.py
@@ -1,5 +1,7 @@
from __future__ import division
import numpy as np
import xarray as xr
from scipy.interpolate import interp1d

from .xarray_wrapper import xarray_apply_along_time_dim

Expand Down Expand Up @@ -36,8 +38,8 @@ def calc_R_from_A(
Frequency in GHz. If provided together with `pol`, it will be used to
derive the parameters a and b for the k-R power law.
pol : string, optional
Polarization, that is either 'H' for horizontal or 'V' for vertical. Has
to be provided together with `f_GHz`. It will be used to derive the
Polarization, that is either 'H' for horizontal or 'V' for vertical. Has
to be provided together with `f_GHz`. It will be used to derive the
parameters a and b for the k-R power law.
a : float, optional
Parameter of A-R relationship
Expand Down Expand Up @@ -183,32 +185,68 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"):
prediction methods", International Telecommunication Union, P.838-2 (04/2003) P.838-3 (03/2005)
"""
from scipy.interpolate import interp1d
if isinstance(f_GHz, xr.DataArray):
return_xarray = True
else:
return_xarray = False

f_GHz = xr.DataArray(f_GHz)

if isinstance(pol, str):
pol = np.full_like(f_GHz, pol, dtype=object)
pol = xr.DataArray(pol)

tmp_dims_f_GHz = f_GHz.dims
tmp_dims_pol = pol.dims

f_GHz = np.asarray(f_GHz)
if tmp_dims_f_GHz != tmp_dims_pol:
raise ValueError("Frequency and polarization must have identical "
"dimensions.")

f_GHz = np.atleast_1d(f_GHz)
pol = np.atleast_1d(pol)

pol_v_str_variants = ['v','V','vertical','Vertical']
pol_h_str_variants = ['h','H','horizontal','Horizontal']

if f_GHz.min() < 1 or f_GHz.max() > 100:
raise ValueError("Frequency must be between 1 Ghz and 100 GHz.")
if not np.isin(pol,pol_v_str_variants+pol_h_str_variants).all():
raise ValueError("Polarization must be V, v, Vertical, vertical, H,"
"Horizontal or horizontal.")
# select ITU table
if approx_type == "ITU_2003":
ITU_table = ITU_table_2003.copy()
elif approx_type == "ITU_2005":
ITU_table = ITU_table_2005.copy()
else:
# select ITU table
if approx_type == "ITU_2003":
ITU_table = ITU_table_2003.copy()
elif approx_type == "ITU_2005":
ITU_table = ITU_table_2005.copy()
else:
raise ValueError("Approximation type not available.")

if pol == "V" or pol == "v" or pol == 'Vertical' or pol == "vertical":
f_a = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic")
f_b = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic")
elif pol == "H" or pol == "h" or pol == "Horizontal" or pol == "horizontal":
f_a = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic")
f_b = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic")
else:
raise ValueError("Polarization must be V, v, Vertical, vertical, H,"
"Horizontal or horizontal.")
a = f_a(f_GHz)
b = f_b(f_GHz)
raise ValueError("Approximation type not available.")

interp_a_v = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic")
interp_b_v = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic")
interp_a_h = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic")
interp_b_h = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic")

a_v = interp_a_v(f_GHz)
b_v = interp_b_v(f_GHz)
a_h = interp_a_h(f_GHz)
b_h = interp_b_h(f_GHz)

a = np.full_like(f_GHz, fill_value=np.nan)
b = np.full_like(f_GHz, fill_value=np.nan)

pol_mask_v = np.isin(pol, pol_v_str_variants)
pol_mask_h = np.isin(pol, pol_h_str_variants)

a[pol_mask_h] = a_h[pol_mask_h]
a[pol_mask_v] = a_v[pol_mask_v]
b[pol_mask_h] = b_h[pol_mask_h]
b[pol_mask_v] = b_v[pol_mask_v]

if return_xarray:
a = xr.DataArray(a, dims=tmp_dims_f_GHz)
b = xr.DataArray(b, dims=tmp_dims_f_GHz)

return a, b


Expand Down Expand Up @@ -337,3 +375,4 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"):
)

# fmt: on

72 changes: 71 additions & 1 deletion pycomlink/tests/test_k_R_relation.py
Expand Up @@ -2,6 +2,7 @@

import numpy as np
from numpy.testing import assert_almost_equal
import xarray as xr

from pycomlink.processing import k_R_relation

Expand All @@ -24,7 +25,7 @@ def test_with_float(self):
assert_almost_equal(0.2403, calculated_a)
assert_almost_equal(0.9485, calculated_b)

def test_with_array(self):
def test_with_f_GHz_array(self):
f_GHz = np.arange(1, 100, 0.5)

pol = "V"
Expand Down Expand Up @@ -56,6 +57,69 @@ def test_with_array(self):
expected_b,
calculated_b,
)
def test_with_f_GHz_pol_array(self):
f_GHz = np.arange(1, 100, 0.5)
pol = np.repeat("V", len(f_GHz))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915])
expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

pol = np.repeat("H", len(f_GHz))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946])
expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

def test_with_f_GHz_pol_xarray(self):
f_GHz = xr.DataArray(np.arange(1, 100, 0.5))
pol = "V"
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915])
expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

pol = xr.DataArray(np.repeat("H", len(f_GHz)))
a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005")
calculated_a = a[2::40]
calculated_b = b[2::40]
expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946])
expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077])
assert_almost_equal(
expected_a,
calculated_a,
)
assert_almost_equal(
expected_b,
calculated_b,
)

def test_interpolation(self):
f_GHz = np.array([1.7, 28.9, 82.1])
Expand Down Expand Up @@ -155,6 +219,11 @@ def test_raise_with_wrong_args(self):
A=2, L_km=42, f_GHz=10, pol="H", a=1
)

with self.assertRaises(ValueError):
# pol and F_GHz as xr.DataArray but with different size
calculated_R = k_R_relation.calc_R_from_A(
A=2, L_km=42, f_GHz=xr.DataArray([10,10]), pol=xr.DataArray("H"), a=1)

def test_different_power_law_approximation_types(self):
A = 5
L_km = 5
Expand Down Expand Up @@ -205,3 +274,4 @@ def test_with_array(self):
expected_R = np.array([0.0, 47.24635411, 71.08341151])
calculated_R = k_R_relation.calc_R_from_A(A[:3], L_km, f_GHz, pol)
assert_almost_equal(expected_R, calculated_R)

0 comments on commit 6403b67

Please sign in to comment.