From 975242b8d1a0788a981cc6e7106737ed91a6be49 Mon Sep 17 00:00:00 2001 From: Max Graf <44392812+maxmargraf@users.noreply.github.com> Date: Fri, 10 Nov 2023 11:29:13 +0100 Subject: [PATCH] Implementing a more flexibel data ingest for a_b() (#141) * implementing a more flexibel data ingest for a_b() * add new test for a multi-dimensional f_GHz as xr.DataArray --- pycomlink/processing/k_R_relation.py | 121 +++++++++++++++++++-------- pycomlink/tests/test_k_R_relation.py | 96 ++++++++++++++++++++- 2 files changed, 182 insertions(+), 35 deletions(-) diff --git a/pycomlink/processing/k_R_relation.py b/pycomlink/processing/k_R_relation.py index bb0fe4f..60c8c9a 100644 --- a/pycomlink/processing/k_R_relation.py +++ b/pycomlink/processing/k_R_relation.py @@ -1,5 +1,7 @@ from __future__ import division import numpy as np +import xarray as xr +from scipy.interpolate import interp1d from .xarray_wrapper import xarray_apply_along_time_dim @@ -32,13 +34,16 @@ def calc_R_from_A( Path-integrated attenuation of microwave link signal L_km : float Length of the link in km - f_GHz : float, optional + f_GHz : float, np.array, or xr.DataArray optional Frequency in GHz. If provided together with `pol`, it will be used to derive the parameters a and b for the k-R power law. - pol : string, optional - Polarization, that is either 'H' for horizontal or 'V' for vertical. Has - to be provided together with `f_GHz`. It will be used to derive the - parameters a and b for the k-R power law. + pol : string, np.array or xr.DataArray optional + Polarization, that is either 'horizontal' for horizontal or 'vertical' + for vertical. 'H', 'h' and 'Horizontal' as well as 'V', 'v' and 'Vertical' + are also allowed. Has to be provided together with `f_GHz`. It will be + used to derive the parameters a and b for the k-R power law. Must have + same shape as f_GHz or be a str. If it is a str, it will be expanded to + the shape of f_GHz. a : float, optional Parameter of A-R relationship b : float, optional @@ -65,15 +70,24 @@ def calc_R_from_A( """ + + # Make sure that we only continue if a correct combination of optional args is used if (f_GHz is not None) and (pol is not None) and (a is None) and (b is None): + # f_GHz and pol must be np.arrays within this function before fed to + # a_b(), otherwise a_b() can return a xr.DataArray with non-matching + # dimensions in certain cases. That interferes with our xarray-wrapper + # decorator. + f_GHz = np.atleast_1d(f_GHz).astype(float) + pol = np.atleast_1d(pol) a, b = a_b(f_GHz, pol=pol, approx_type=a_b_approximation) elif (a is not None) and (b is not None) and (f_GHz is None) and (pol is None): # in this case we use `a` and `b` from args pass else: raise ValueError( - "Either `f_GHz` and `pol` or `a` and `b` have to be passed. Any other combination is not allowed." + "Either `f_GHz` and `pol` or `a` and `b` have to be passed. " + "Any other combination is not allowed." ) A = np.atleast_1d(A).astype(float) @@ -152,14 +166,17 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"): Parameters ---------- - f_GHz : int, float or np.array of these - Frequency of the microwave link in GHz - pol : str - Polarization of the microwave link + f_GHz : int, float, np.array or xr.DataArray + Frequency of the microwave link(s) in GHz. + pol : str, np.array or xr.DataArray + Polarization, that is either 'horizontal' for horizontal or 'vertical' + for vertical. 'H', 'h' and 'Horizontal' as well as 'V', 'v' and 'Vertical' + are also allowed. Must have same shape as f_GHz or be a str. If it is a + str, it will be expanded to the shape of f_GHz. approx_type : str, optional - Approximation type (the default is 'ITU_2005', which implies parameter - approximation using a table recommanded by ITU in 2005. An older version - of 2003 is available via 'ITU_2003'.) + Approximation type (the default is 'ITU_2005', which implies parameter + approximation using a table recommanded by ITU in 2005. An older version + of 2003 is available via 'ITU_2003'.) Returns ------- @@ -183,32 +200,67 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"): prediction methods", International Telecommunication Union, P.838-2 (04/2003) P.838-3 (03/2005) """ - from scipy.interpolate import interp1d + if isinstance(f_GHz, xr.DataArray): + return_xarray = True + f_GHz_coords = f_GHz.coords + else: + return_xarray = False + + f_GHz = xr.DataArray(f_GHz) + + if isinstance(pol, str): + pol = xr.full_like(f_GHz, pol, dtype=object) + pol = xr.DataArray(pol) + + if f_GHz.shape != pol.shape: + raise ValueError("Frequency and polarization must have identical shape.") + + f_GHz = np.atleast_1d(f_GHz) + pol = np.atleast_1d(pol) - f_GHz = np.asarray(f_GHz) + pol_v_str_variants = ['v','V','vertical','Vertical'] + pol_h_str_variants = ['h','H','horizontal','Horizontal'] if f_GHz.min() < 1 or f_GHz.max() > 100: raise ValueError("Frequency must be between 1 Ghz and 100 GHz.") + if not np.isin(pol,pol_v_str_variants+pol_h_str_variants).all(): + raise ValueError("Polarization must be V, v, Vertical, vertical, H," + "Horizontal or horizontal.") + # select ITU table + if approx_type == "ITU_2003": + ITU_table = ITU_table_2003.copy() + elif approx_type == "ITU_2005": + ITU_table = ITU_table_2005.copy() else: - # select ITU table - if approx_type == "ITU_2003": - ITU_table = ITU_table_2003.copy() - elif approx_type == "ITU_2005": - ITU_table = ITU_table_2005.copy() - else: - raise ValueError("Approximation type not available.") - - if pol == "V" or pol == "v" or pol == 'Vertical' or pol == "vertical": - f_a = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic") - f_b = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic") - elif pol == "H" or pol == "h" or pol == "Horizontal" or pol == "horizontal": - f_a = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic") - f_b = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic") - else: - raise ValueError("Polarization must be V, v, Vertical, vertical, H," - "Horizontal or horizontal.") - a = f_a(f_GHz) - b = f_b(f_GHz) + raise ValueError("Approximation type not available.") + + interp_a_v = interp1d(ITU_table[0, :], ITU_table[2, :], kind="cubic") + interp_b_v = interp1d(ITU_table[0, :], ITU_table[4, :], kind="cubic") + interp_a_h = interp1d(ITU_table[0, :], ITU_table[1, :], kind="cubic") + interp_b_h = interp1d(ITU_table[0, :], ITU_table[3, :], kind="cubic") + + a_v = interp_a_v(f_GHz) + b_v = interp_b_v(f_GHz) + a_h = interp_a_h(f_GHz) + b_h = interp_b_h(f_GHz) + + a = np.full_like(f_GHz, fill_value=np.nan, dtype=float) + b = np.full_like(f_GHz, fill_value=np.nan, dtype=float) + + pol_mask_v = np.isin(pol, pol_v_str_variants) + pol_mask_h = np.isin(pol, pol_h_str_variants) + + a[pol_mask_h] = a_h[pol_mask_h] + a[pol_mask_v] = a_v[pol_mask_v] + b[pol_mask_h] = b_h[pol_mask_h] + b[pol_mask_v] = b_v[pol_mask_v] + + # If f_GHz is a xr.DataArray a and b should be also returned as xr.DataArray + # with identical coordinates + if return_xarray: + a = xr.DataArray(a, coords=f_GHz_coords) + b = xr.DataArray(b, coords=f_GHz_coords) + return a, b @@ -337,3 +389,4 @@ def a_b(f_GHz, pol, approx_type="ITU_2005"): ) # fmt: on + diff --git a/pycomlink/tests/test_k_R_relation.py b/pycomlink/tests/test_k_R_relation.py index ab0f8a7..fa44a9e 100644 --- a/pycomlink/tests/test_k_R_relation.py +++ b/pycomlink/tests/test_k_R_relation.py @@ -2,6 +2,7 @@ import numpy as np from numpy.testing import assert_almost_equal +import xarray as xr from pycomlink.processing import k_R_relation @@ -24,7 +25,7 @@ def test_with_float(self): assert_almost_equal(0.2403, calculated_a) assert_almost_equal(0.9485, calculated_b) - def test_with_array(self): + def test_with_f_GHz_array(self): f_GHz = np.arange(1, 100, 0.5) pol = "V" @@ -56,6 +57,92 @@ def test_with_array(self): expected_b, calculated_b, ) + def test_with_f_GHz_pol_array(self): + f_GHz = np.arange(1, 100, 0.5) + pol = np.repeat("V", len(f_GHz)) + a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005") + calculated_a = a[2::40] + calculated_b = b[2::40] + expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915]) + expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988]) + assert_almost_equal( + expected_a, + calculated_a, + ) + assert_almost_equal( + expected_b, + calculated_b, + ) + + pol = np.repeat("H", len(f_GHz)) + a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005") + calculated_a = a[2::40] + calculated_b = b[2::40] + expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946]) + expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077]) + assert_almost_equal( + expected_a, + calculated_a, + ) + assert_almost_equal( + expected_b, + calculated_b, + ) + + def test_with_f_GHz_pol_xarray(self): + f_GHz = xr.DataArray(np.arange(1, 100, 0.5)) + pol = "V" + a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005") + calculated_a = a[2::40] + calculated_b = b[2::40] + expected_a = np.array([0.0000998, 0.117, 0.4712, 0.8889, 1.1915]) + expected_b = np.array([0.949, 0.97, 0.8296, 0.7424, 0.6988]) + assert_almost_equal( + expected_a, + calculated_a, + ) + assert_almost_equal( + expected_b, + calculated_b, + ) + + pol = xr.DataArray(np.repeat("H", len(f_GHz))) + a, b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005") + calculated_a = a[2::40] + calculated_b = b[2::40] + expected_a = np.array([0.0000847, 0.1155, 0.4865, 0.8974, 1.1946]) + expected_b = np.array([1.0664, 1.0329, 0.8539, 0.7586, 0.7077]) + assert_almost_equal( + expected_a, + calculated_a, + ) + assert_almost_equal( + expected_b, + calculated_b, + ) + def test_with_f_GHz_mulidim_xarray(self): + A = 5 + L_km = 5 + A, Z = np.array(["A", "Z"]).view("int32") + cml_ids = np.random.randint(low=A, high=Z, size=198 * 4, + dtype="int32").view(f"U{4}") + f_GHz = xr.DataArray( + data=[np.arange(1, 100, 0.5)], + dims=['sublin_id', 'cml_id'], + coords=dict( + sublink_id='sublink_1', + cml_id=cml_ids, )) + pol = "H" + expected_a, expected_b = k_R_relation.a_b(f_GHz, pol, approx_type="ITU_2005") + + assert_almost_equal( + expected_a.sum(), + 128.67885799, + ) + assert_almost_equal( + expected_b.sum(), + 175.58906864, + ) def test_interpolation(self): f_GHz = np.array([1.7, 28.9, 82.1]) @@ -110,6 +197,7 @@ def test_raises(self): k_R_relation.a_b(30, "H", approx_type="ITU_2000") + class Test_calc_R_from_A(unittest.TestCase): def test_with_int(self): A = 5 @@ -155,6 +243,11 @@ def test_raise_with_wrong_args(self): A=2, L_km=42, f_GHz=10, pol="H", a=1 ) + with self.assertRaises(ValueError): + # pol and F_GHz as xr.DataArray but with different size + calculated_R = k_R_relation.calc_R_from_A( + A=2, L_km=42, f_GHz=xr.DataArray([10,10]), pol=xr.DataArray("H")) + def test_different_power_law_approximation_types(self): A = 5 L_km = 5 @@ -205,3 +298,4 @@ def test_with_array(self): expected_R = np.array([0.0, 47.24635411, 71.08341151]) calculated_R = k_R_relation.calc_R_from_A(A[:3], L_km, f_GHz, pol) assert_almost_equal(expected_R, calculated_R) +