Skip to content

Commit

Permalink
Repalce xarray wrapper with new version that use apply_ufunc along ti…
Browse files Browse the repository at this point in the history
…me dimension (#89)

* Added completely new xarray wrapper
* Update whatsnew
* Example processing notebook is working again
  • Loading branch information
cchwala committed Aug 11, 2021
1 parent 020ac87 commit c56cc9e
Show file tree
Hide file tree
Showing 8 changed files with 598 additions and 123 deletions.
8 changes: 8 additions & 0 deletions docs/whats-new.rst
Expand Up @@ -5,9 +5,17 @@ What's New
v0.3.x
------

Enhancements
~~~~~~~~~~~~
* Added xarray-wrapper for WAA Leijnse and updated WAA example notebook (by cchwala
in PR #82)
* xarray wrapper now uses `xr.apply_ufunc` to apply processing functions along time
dimension, instead of looping over the `channel_id` dimension. This should be a lot
more flexible. (by cchwala in PR #89)

Bug fixes
~~~~~~~~~
* Fixed problem with xarray_wrapper for calc_R_from_A (by cchwala in PR #89)

v0.3.2
------
Expand Down
510 changes: 460 additions & 50 deletions notebooks/Basic CML processing workflow.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pycomlink/processing/baseline.py
Expand Up @@ -4,15 +4,15 @@

from numba import jit

from .xarray_wrapper import xarray_loop_vars_over_dim
from .xarray_wrapper import xarray_apply_along_time_dim


################################################
# Functions for setting the RSL baseline level #
################################################


@xarray_loop_vars_over_dim(vars_to_loop=["trsl", "wet"], loop_dim="channel_id")
@xarray_apply_along_time_dim()
def baseline_constant(trsl, wet, n_average_last_dry=1):
"""
Build baseline with constant level during a `wet` period
Expand Down
4 changes: 2 additions & 2 deletions pycomlink/processing/k_R_relation.py
@@ -1,14 +1,14 @@
from __future__ import division
import numpy as np

from .xarray_wrapper import xarray_loop_vars_over_dim
from .xarray_wrapper import xarray_apply_along_time_dim


############################################
# Functions for k-R power law calculations #
############################################

@xarray_loop_vars_over_dim(vars_to_loop=["A", "f_GHz"], loop_dim="channel_id")
@xarray_apply_along_time_dim()
def calc_R_from_A(A, L_km, f_GHz=None, a=None, b=None, pol="H", R_min=0.1):
"""Calculate rain rate from attenuation using the A-R Relationship
Expand Down
8 changes: 3 additions & 5 deletions pycomlink/processing/wet_antenna.py
Expand Up @@ -17,7 +17,7 @@
from numba import jit

from pycomlink.processing import k_R_relation
from .xarray_wrapper import xarray_loop_vars_over_dim
from .xarray_wrapper import xarray_apply_along_time_dim


########################################
Expand Down Expand Up @@ -63,9 +63,7 @@ def _numba_waa_schleiss_2013(rsl, baseline, wet, waa_max, delta_t, tau):
return waa


@xarray_loop_vars_over_dim(
vars_to_loop=["rsl", "baseline", "wet"], loop_dim="channel_id"
)
@xarray_apply_along_time_dim()
def waa_schleiss_2013(rsl, baseline, wet, waa_max, delta_t, tau):
"""Calculate WAA according to Schleiss et al 2013
Expand Down Expand Up @@ -111,7 +109,7 @@ def waa_schleiss_2013(rsl, baseline, wet, waa_max, delta_t, tau):

return waa

@xarray_loop_vars_over_dim(vars_to_loop=["A_obs", "f_Hz"], loop_dim="channel_id")
@xarray_apply_along_time_dim()
def waa_leijnse_2008_from_A_obs(
A_obs,
f_Hz,
Expand Down
117 changes: 58 additions & 59 deletions pycomlink/processing/xarray_wrapper.py
@@ -1,78 +1,77 @@
from functools import wraps
from collections import OrderedDict
import inspect
import numpy as np
import xarray as xr


def xarray_loop_vars_over_dim(vars_to_loop, loop_dim):
def xarray_apply_along_time_dim():
"""
A decorator to feed CML data with mutiple channels as xarray.DataArrays to CML processing functions
A decorator to apply CML processing function along the time dimension of DataArrays
Parameters
----------
vars_to_loop: list of strings
List of the names of the variables used as kwargs in the decorated function
which should have a dimension `loop_dim` for which the decorated function is
then applied individually to each item when looping over `loop_dim`.
loop_dim: basestring
Name of the dimension which all variables in `vars_to_loop` must have in common
and which will be looped over to apply the decorated function.
Examples
--------
Here is an example for how this decorator is used for the WAA Schleiss function::
@xarray_loop_vars_over_dim(vars_to_loop=["rsl", "baseline", "wet"], loop_dim="channel_id")
def waa_schleiss_2013(rsl, baseline, wet, waa_max, delta_t, tau):
# function body...
Here, `delta_t` and `tau` are not CML data xarray.DataArrays and hence do not
have to be looped over.
This will work if the decorated function takes 1D numpy arrays, which hold the
CML time series data, as arguments. Additional argument are also handled.
"""

def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
# TODO: Maybe check if all args or kwargs are the same type
# The case with numpy array as arg
if len(args) > 0 and isinstance(args[0], np.ndarray):
return func(*args, **kwargs)
# The case with numpy array as kwarg
if len(kwargs.keys()) > 0 and isinstance(
kwargs[vars_to_loop[0]], np.ndarray
):
return func(*args, **kwargs)
# The dummy case where nothing is passed. This is just to get the
# functions error message here instead of continuing to the loop below
if len(args) == 0 and len(kwargs) == 0:
return func(*args, **kwargs)
new_args_dict = _get_new_args_dict(func=func, args=args, kwargs=kwargs)

loop_dim_id_list = list(
np.atleast_1d(kwargs[vars_to_loop[0]][loop_dim].values)
)
if len(loop_dim_id_list) > 1:
kwargs_vars_to_loop = {var: kwargs.pop(var) for var in vars_to_loop}
data_list = []
for loop_dim_id in loop_dim_id_list:
for var in vars_to_loop:
kwargs[var] = kwargs_vars_to_loop[var].sel(
{loop_dim: loop_dim_id}
).values
data_list.append(func(**kwargs))
return xr.DataArray(
data=np.stack(data_list),
dims=(loop_dim, "time"),
coords={
loop_dim: kwargs_vars_to_loop[vars_to_loop[0]][loop_dim].values,
"time": kwargs_vars_to_loop[vars_to_loop[0]].time,
},
)
# Check if any arg has a time dim. Also build the input_core_dims
# list which will be required below and in our case will contain either
# ['time'] or [] because we do only care about applying along the time
# dim if the arg has one. Note that input_core_dims has to have the same
# number of entries as func has args.
input_core_dims = []
found_time_dim = False
for arg in new_args_dict.values():
try:
if "time" in arg.dims:
found_time_dim = True
input_core_dims.append(["time"])
else:
input_core_dims.append([])
except AttributeError:
input_core_dims.append([])

# If now arg has a `time` dim, then we just call the function as it would
# be called without the wrapper.
if not found_time_dim:
return func(*args, **kwargs)
else:
return xr.DataArray(
data=func(**kwargs),
dims=("time"),
coords={"time": kwargs[vars_to_loop[0]].time},
return xr.apply_ufunc(
func,
*list(new_args_dict.values()),
input_core_dims=input_core_dims,
output_core_dims=[["time"]],
vectorize=True,
)

return inner

return decorator


def _get_new_args_dict(func, args, kwargs):
"""Build one dict from args, kwargs and function default args
The function signature is used to build one joint dict from args and kwargs and
additional from the default arguments found in the function signature. The order
of the args in this dict is the order of the args in the function signature and
hence the list of args can be used in cases where we can only supply *args, but
we have to work with a mixture of args, kwargs and default args as in
xarray.apply_ufunc in the xarray wrapper.
"""
new_args_dict = OrderedDict()
for i, (arg, parameter) in enumerate(inspect.signature(func).parameters.items()):
if i < len(args):
new_args_dict[arg] = args[i]
elif arg in kwargs.keys():
new_args_dict[arg] = kwargs[arg]
else:
new_args_dict[arg] = parameter.default

return new_args_dict
40 changes: 35 additions & 5 deletions pycomlink/tests/test_xarray_workflow.py
Expand Up @@ -63,7 +63,9 @@ def test_waa_schleiss_kwarg():
delta_t=1,
tau=15,
)
np.testing.assert_almost_equal(waa_np, waa_da.isel(channel_id=channel_id).values)
np.testing.assert_almost_equal(
waa_np, waa_da.isel(channel_id=channel_id).values
)


def test_waa_leijnse_kwarg():
Expand All @@ -76,8 +78,8 @@ def test_waa_leijnse_kwarg():
wet=cml.wet,
)

cml['A'] = cml.trsl - cml.baseline
cml['A'] = cml.A.where((cml.A.isnull().values | (cml.A.values >= 0)), 0)
cml["A"] = cml.trsl - cml.baseline
cml["A"] = cml.A.where((cml.A.isnull().values | (cml.A.values >= 0)), 0)

waa_da = pycml.processing.wet_antenna.waa_leijnse_2008_from_A_obs(
A_obs=cml.A,
Expand All @@ -93,15 +95,43 @@ def test_waa_leijnse_kwarg():
for channel_id in range(len(cml.channel_id)):
waa_np = pycml.processing.wet_antenna.waa_leijnse_2008_from_A_obs(
A_obs=cml.A.isel(channel_id=channel_id).values,
f_Hz=cml.frequency.isel(channel_id=channel_id).values * 1e9,
f_Hz=cml.frequency.isel(channel_id=channel_id).values * 1e9,
L_km=cml.length,
T_K=293.0,
gamma=2.06e-05,
delta=0.24,
n_antenna=(1.73 + 0.014j),
l_antenna=0.001,
)
np.testing.assert_almost_equal(waa_np, waa_da.isel(channel_id=channel_id).values)
np.testing.assert_almost_equal(
waa_np, waa_da.isel(channel_id=channel_id).values
)


def test_calc_R_from_A():
cml = init_data()
cml["wet"] = cml.trsl.rolling(time=60, center=True).std()

# call baseline function using kwargs
cml["baseline"] = pycml.processing.baseline.baseline_constant(
trsl=cml.trsl,
wet=cml.wet,
)

cml["A"] = cml.trsl - cml.baseline
cml["A"] = cml.A.where((cml.A.isnull().values | (cml.A.values >= 0)), 0)

cml["R"] = pycml.processing.k_R_relation.calc_R_from_A(
A=cml.A, L_km=cml.length, f_GHz=cml.frequency
)

for channel_id in range(len(cml.channel_id)):
R_np = pycml.processing.k_R_relation.calc_R_from_A(
A=cml.isel(channel_id=channel_id).A.values,
L_km=cml.isel(channel_id=channel_id).length.values,
f_GHz=cml.isel(channel_id=channel_id).frequency,
)
np.testing.assert_almost_equal(R_np, cml.R.isel(channel_id=channel_id).values)


# TODO Add test for using positional args
30 changes: 30 additions & 0 deletions pycomlink/tests/test_xarray_wrapper.py
@@ -0,0 +1,30 @@
from unittest import TestCase
from collections import OrderedDict
import pycomlink.processing.xarray_wrapper


def test_get_new_args_dict():
def foo_func(a, b, c=None, d=1):
return None

expected = OrderedDict(
[
("a", 123),
("b", "foo"),
("c", "bar"),
("d", 1),
]
)
result = pycomlink.processing.xarray_wrapper._get_new_args_dict(
func=foo_func, args=[123, "foo", "bar", 1], kwargs={}
)
TestCase().assertDictEqual(expected, result)

result = pycomlink.processing.xarray_wrapper._get_new_args_dict(
func=foo_func,
args=[
123,
],
kwargs={"b": "foo", "c": "bar"},
)
TestCase().assertDictEqual(expected, result)

0 comments on commit c56cc9e

Please sign in to comment.