Skip to content

Commit fd72fb7

Browse files
authored
Adcp clean (#408)
* cleaning - add filter for dataframe * Update adcp_parser.py * Update packagetest.yaml * Create test_adcp_parser.py * Update test_adcp_parser.py remove mocked tests * cleanup * more cleanup * Update test_adcp_parser.py * Update test_adcp_parser.py * Update test_adcp_parser.py * Update test_adcp_parser.py * Update test_adcp_parser.py remove depth calc tests * Update test_adcp_parser.py * Update test_adcp_parser.py remove - try again later
1 parent e743e32 commit fd72fb7

File tree

7 files changed

+153
-67
lines changed

7 files changed

+153
-67
lines changed

.github/workflows/packagetest.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ jobs:
2525
python -m pip install -e . --no-deps --force-reinstall
2626
- name: Test with pytest
2727
run: |
28-
pip install flake8 pytest pytest-cov
28+
pip install flake8 pytest pytest-cov pytest-mock
2929
pytest -v -s --junitxml=junit/test-results.xml --cov --cov-report=xml --cov-report=html

src/EcoFOCIpy/io/adcp_parser.py

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
try:
1818
import EcoFOCIpy.math.geomag.geomag.geomag as geomag
1919
import EcoFOCIpy.math.geotools as geotools
20+
2021
ECOFOCIPY_AVAILABLE = True
2122
except ImportError:
2223
ECOFOCIPY_AVAILABLE = False
@@ -30,7 +31,9 @@ class adcp(object):
3031
apply magnetic declination corrections, and calculate depth information for bins.
3132
"""
3233

33-
def __init__(self, serial_no: str, deployment_dir: Optional[Union[str, Path]] = None):
34+
def __init__(
35+
self, serial_no: str, deployment_dir: Optional[Union[str, Path]] = None
36+
):
3437
"""
3538
Initializes the ADCP parser.
3639
@@ -48,14 +51,18 @@ def __init__(self, serial_no: str, deployment_dir: Optional[Union[str, Path]] =
4851
self.ein_df: Optional[pd.DataFrame] = None
4952
self.scal_df: Optional[pd.DataFrame] = None
5053

51-
def _get_filepath(self, extension: str, file_path: Optional[Union[str, Path]]) -> Path:
54+
def _get_filepath(
55+
self, extension: str, file_path: Optional[Union[str, Path]]
56+
) -> Path:
5257
"""Constructs the full file path or validates an existing one."""
5358
if file_path:
5459
p = Path(file_path)
5560
elif self.deployment_dir:
5661
p = self.deployment_dir / f"{self.serial_no}{extension}"
5762
else:
58-
raise ValueError("Must provide either a deployment directory or a direct file path.")
63+
raise ValueError(
64+
"Must provide either a deployment directory or a direct file path."
65+
)
5966

6067
if not p.exists():
6168
raise FileNotFoundError(f"The specified ADCP file does not exist: {p}")
@@ -88,20 +95,34 @@ def _load_data_file(
8895
header=None,
8996
names=column_names,
9097
)
91-
df["date_time"] = pd.to_datetime(df["date"] + " " + df["time"], format="%y/%m/%d %H:%M:%S")
98+
df["date_time"] = pd.to_datetime(
99+
df["date"] + " " + df["time"], format="%y/%m/%d %H:%M:%S"
100+
)
92101

93102
if datetime_index:
94103
df = df.set_index("date_time").drop(columns=["date", "time"])
95104

96105
return df
97106

98-
def load_vel_file(self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True) -> pd.DataFrame:
107+
def load_vel_file(
108+
self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True
109+
) -> pd.DataFrame:
99110
"""Loads a .VEL (velocity) file."""
100-
cols = ["date", "time", "bin", "u_curr_comp", "v_curr_comp", "w_curr_comp", "w_curr_comp_err"]
111+
cols = [
112+
"date",
113+
"time",
114+
"bin",
115+
"u_curr_comp",
116+
"v_curr_comp",
117+
"w_curr_comp",
118+
"w_curr_comp_err",
119+
]
101120
self.vel_df = self._load_data_file(".VEL", cols, file_path, datetime_index)
102121
return self.vel_df
103122

104-
def load_pg_file(self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True) -> pd.DataFrame:
123+
def load_pg_file(
124+
self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True
125+
) -> pd.DataFrame:
105126
"""
106127
Loads a .PG (Percent Good) file.
107128
@@ -111,23 +132,48 @@ def load_pg_file(self, file_path: Optional[Union[str, Path]] = None, datetime_in
111132
3) Percentage of measurements where more than one beam was bad.
112133
4) Percentage of measurements with four-beam solutions (useful for QC).
113134
"""
114-
cols = ["date", "time", "bin", "pg3beam-good", "pgtransf-good", "pg1beam-bad", "pg4beam-good"]
135+
cols = [
136+
"date",
137+
"time",
138+
"bin",
139+
"pg3beam-good",
140+
"pgtransf-good",
141+
"pg1beam-bad",
142+
"pg4beam-good",
143+
]
115144
self.pg_df = self._load_data_file(".PG", cols, file_path, datetime_index)
116145
return self.pg_df
117146

118-
def load_ein_file(self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True) -> pd.DataFrame:
147+
def load_ein_file(
148+
self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True
149+
) -> pd.DataFrame:
119150
"""Loads an .EIN (Echo Intensity) file."""
120151
cols = ["date", "time", "bin", "agc1", "agc2", "agc3", "agc4"]
121152
self.ein_df = self._load_data_file(".EIN", cols, file_path, datetime_index)
122153
return self.ein_df
123154

124-
def load_scal_file(self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True) -> pd.DataFrame:
155+
def load_scal_file(
156+
self, file_path: Optional[Union[str, Path]] = None, datetime_index: bool = True
157+
) -> pd.DataFrame:
125158
"""Loads a .SCA (Scalar) file."""
126-
cols = ["date", "time", "unknown", "temperature", "heading", "pitch", "roll", "heading_stdev", "pitch_stdev", "roll_stdev"]
159+
cols = [
160+
"date",
161+
"time",
162+
"unknown",
163+
"temperature",
164+
"heading",
165+
"pitch",
166+
"roll",
167+
"heading_stdev",
168+
"pitch_stdev",
169+
"roll_stdev",
170+
]
127171
self.scal_df = self._load_data_file(".SCA", cols, file_path, datetime_index)
128172
return self.scal_df
129173

130-
def load_rpt_file(self, file_path: Optional[Union[str, Path]] = None) -> Tuple[List[str], Dict[str, float]]:
174+
def load_rpt_file(
175+
self, file_path: Optional[Union[str, Path]] = None
176+
) -> Tuple[List[str], Dict[str, float]]:
131177
"""
132178
Loads a .RPT (Report) file to extract instrument setup parameters.
133179
@@ -138,7 +184,7 @@ def load_rpt_file(self, file_path: Optional[Union[str, Path]] = None) -> Tuple[L
138184
Tuple[List[str], Dict[str, float]]: A tuple containing the raw lines of the
139185
report file and a dictionary of extracted setup parameters.
140186
"""
141-
full_path = self._get_filepath('.RPT', file_path)
187+
full_path = self._get_filepath(".RPT", file_path)
142188

143189
lines = full_path.read_text().splitlines()
144190

@@ -147,15 +193,17 @@ def load_rpt_file(self, file_path: Optional[Union[str, Path]] = None) -> Tuple[L
147193
if not parts:
148194
continue
149195
if "Bin length" in line:
150-
self.setup['bin_length'] = float(parts[2])
196+
self.setup["bin_length"] = float(parts[2])
151197
elif "Distance" in line:
152-
self.setup['distance_to_first_bin'] = float(parts[4])
198+
self.setup["distance_to_first_bin"] = float(parts[4])
153199
elif "Number of bins" in line:
154-
self.setup['num_of_bins'] = int(parts[3])
200+
self.setup["num_of_bins"] = int(parts[3])
155201

156202
return lines, self.setup
157203

158-
def mag_dec_corr(self, lat: float, lon_w: float, deployment_date: pd.Timestamp) -> float:
204+
def mag_dec_corr(
205+
self, lat: float, lon_w: float, deployment_date: pd.Timestamp
206+
) -> float:
159207
"""
160208
Calculates and applies magnetic declination correction to velocity data.
161209
@@ -175,20 +223,23 @@ def mag_dec_corr(self, lat: float, lon_w: float, deployment_date: pd.Timestamp)
175223
ValueError: If the velocity data (`vel_df`) has not been loaded.
176224
"""
177225
if not ECOFOCIPY_AVAILABLE:
178-
raise ImportError("EcoFOCIpy is required for magnetic declination correction but is not installed.")
226+
raise ImportError(
227+
"EcoFOCIpy is required for magnetic declination correction but is not installed."
228+
)
179229
if self.vel_df is None:
180-
raise ValueError("Velocity data must be loaded before applying magnetic correction.")
230+
raise ValueError(
231+
"Velocity data must be loaded before applying magnetic correction."
232+
)
181233

182234
t = geomag.GeoMag()
183-
declination = t.GeoMag(lat, -1 * lon_w, time=deployment_date).declination
235+
declination = t.GeoMag(lat, lon_w, time=deployment_date).dec
184236

185237
u_rotated, v_rotated = geotools.rotate_coord(
186-
self.vel_df['u_curr_comp'],
187-
self.vel_df['v_curr_comp'],
188-
declination)
238+
self.vel_df["u_curr_comp"], self.vel_df["v_curr_comp"], declination
239+
)
189240

190-
self.vel_df['u_curr_comp'] = u_rotated
191-
self.vel_df['v_curr_comp'] = v_rotated
241+
self.vel_df["u_curr_comp"] = u_rotated
242+
self.vel_df["v_curr_comp"] = v_rotated
192243

193244
return declination
194245

@@ -199,7 +250,7 @@ def bins2depth(self, inst_depth: float = None):
199250
Args:
200251
inst_depth (float, optional): Deployment Depth of Instrument.
201252
"""
202-
start = inst_depth - self.setup['distance']
203-
stop = start - self.setup['numofbins']*self.setup['bin_length']
253+
start = inst_depth - self.setup["distance_to_first_bin"]
254+
stop = start - self.setup["num_of_bins"] * self.setup["bin_length"]
204255

205-
return np.arange(start, stop, -1*self.setup['bin_length'])
256+
return np.arange(start, stop, -1 * self.setup["bin_length"])

src/EcoFOCIpy/io/wpak_parser.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import pandas as pd
21
from typing import Optional
32

3+
import pandas as pd
4+
45

56
class wpak(object):
67
r"""

src/EcoFOCIpy/math/cleaning.py

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
#modified from GliderTools.py
1+
import numpy as np
2+
import pandas as pd
3+
import xarray as xr
4+
25

36
def outlier_bounds_std(arr, multiplier=3):
47
r"""
@@ -19,25 +22,23 @@ def outlier_bounds_std(arr, multiplier=3):
1922
2023
Returns
2124
-------
22-
arr : array | xarray.DataArray
23-
A data object where values outside the limits are masked.
24-
Metdata will be preserved if the original input array is xr.DataArray
25-
25+
xr.Dataset
26+
A new xarray Dataset with the filtered variable and a corresponding
27+
quality control (QC) variable.
28+
QC flags:
29+
0: Good data
30+
1: Data at beginning/end of series (not processed by rolling filter)
31+
4: Data flagged as outlier and replaced with NaN (before interpolation)
32+
8: Data that was originally an outlier (flag 4) and subsequently interpolated
2633
"""
2734

28-
from numpy import array, nan, nanmean, nanstd
29-
30-
arr = array(arr)
31-
32-
mean = nanmean(arr)
33-
std = nanstd(arr)
34-
35+
arr = np.array(arr)
36+
mean = np.nanmean(arr)
37+
std = np.nanstd(arr)
3538
ll = mean - std * multiplier
3639
ul = mean + std * multiplier
37-
3840
mask = (arr < ll) | (arr > ul)
39-
arr[mask] = nan
40-
41+
arr[mask] = np.nan
4142
return arr
4243

4344

@@ -68,19 +69,14 @@ def outlier_bounds_iqr(arr, multiplier=1.5):
6869
6970
7071
"""
71-
from numpy import array, nan, nanpercentile
72-
73-
arr = array(arr)
7472

75-
q1, q3 = nanpercentile(arr, [25, 75])
73+
arr = np.array(arr)
74+
q1, q3 = np.nanpercentile(arr, [25, 75])
7675
iqr = q3 - q1
77-
7876
ll = q1 - iqr * multiplier
7977
ul = q3 + iqr * multiplier
80-
8178
mask = (arr < ll) | (arr > ul)
82-
arr[mask] = nan
83-
79+
arr[mask] = np.nan
8480
return arr
8581

8682
def rolling_outlier_std(tdf, var_choice=None, timebase=1, stddev=1, interp_fill_timebase='1h'):
@@ -92,28 +88,66 @@ def rolling_outlier_std(tdf, var_choice=None, timebase=1, stddev=1, interp_fill_
9288
timebase (int, optional): [time window value to be applied to rolling]. Defaults to 1.
9389
stddev (int, optionl): [standard deviation threshold to use to filter data out]. Defaults to 1.
9490
"""
95-
from numpy import isnan
96-
from xarray import zeros_like
9791

9892
xdf = tdf.copy()
9993
r = (xdf[var_choice] - xdf.rolling(time=timebase, center=True).median()[var_choice])
100-
rc = outlier_bounds_std(r,stddev)
94+
rc = outlier_bounds_std(r, stddev)
10195

102-
xdf[var_choice] = (xdf[var_choice].where(~isnan(rc))).interpolate_na(dim='time',max_gap=interp_fill_timebase)
96+
xdf[var_choice] = (xdf[var_choice].where(~np.isnan(rc))).interpolate_na(dim='time', max_gap=interp_fill_timebase)
10397

104-
#mask values that are filtered out by algorithm
105-
xdf[var_choice+'_QC'] = zeros_like(xdf[var_choice])
106-
xdf[var_choice+'_QC'] = xdf[var_choice].where((~isnan(rc)),4)
107-
xdf[var_choice+'_QC'].values[xdf[var_choice+'_QC'].values !=4] = 0
98+
# QC flag: 4 for outlier, 0 for good, 8 for interpolated, 1 for edge
99+
xdf[var_choice + '_QC'] = xr.zeros_like(xdf[var_choice])
100+
xdf[var_choice + '_QC'] = xdf[var_choice].where((~np.isnan(rc)), 4)
101+
xdf[var_choice + '_QC'].values[xdf[var_choice + '_QC'].values != 4] = 0
108102

109-
#mask values that are interpolated back in
110-
mask = (tdf[var_choice].where(~isnan(rc)))
111-
mask_QC = xdf[var_choice].where(((xdf[var_choice] == mask) | isnan(xdf[var_choice])),8)
112-
xdf[var_choice+'_QC'].values[(mask_QC.values) == 8] = 8
103+
mask = (tdf[var_choice].where(~np.isnan(rc)))
104+
mask_QC = xdf[var_choice].where(((xdf[var_choice] == mask) | np.isnan(xdf[var_choice])), 8)
105+
xdf[var_choice + '_QC'].values[(mask_QC.values) == 8] = 8
113106

114107
xdf[var_choice][:round(timebase/2)+1] = tdf[var_choice][:round(timebase/2)+1]
115108
xdf[var_choice+'_QC'][:round(timebase/2)+1] = tdf[var_choice][:round(timebase/2)+1]*0 +1
116109
xdf[var_choice][-1*round(timebase/2)-1:] = tdf[var_choice][-1*round(timebase/2)-1:]
117110
xdf[var_choice+'_QC'][-1*round(timebase/2)-1:] = tdf[var_choice][-1*round(timebase/2)-1:]*0 +1
118111

119-
return xdf
112+
return xdf
113+
114+
115+
def rolling_outlier_pd(
116+
df: pd.DataFrame, column: str = 'salinity', window_size: str = '7D', num_std_dev: int = 5
117+
) -> pd.DataFrame:
118+
"""
119+
Apply a rolling standard deviation filter to a column of a pandas DataFrame.
120+
Outliers are set to NaN.
121+
122+
Parameters
123+
----------
124+
df : pd.DataFrame
125+
Input DataFrame containing the column to filter.
126+
column : str, default='salinity'
127+
Name of the column to filter.
128+
window_size : str, default='7D'
129+
Size of the rolling window (e.g., '7D' for 7 days).
130+
num_std_dev : int, default=5
131+
Number of standard deviations for outlier threshold.
132+
133+
Returns
134+
-------
135+
pd.DataFrame
136+
DataFrame with outliers replaced by NaN in the specified column.
137+
"""
138+
df_filtered = df.copy()
139+
mean_col = f'{column}_rolling_mean'
140+
std_col = f'{column}_rolling_std'
141+
upper_col = f'{column}_upper_bound'
142+
lower_col = f'{column}_lower_bound'
143+
144+
df_filtered[mean_col] = df_filtered[column].rolling(window=window_size).mean()
145+
df_filtered[std_col] = df_filtered[column].rolling(window=window_size).std()
146+
df_filtered[upper_col] = df_filtered[mean_col] + num_std_dev * df_filtered[std_col]
147+
df_filtered[lower_col] = df_filtered[mean_col] - num_std_dev * df_filtered[std_col]
148+
149+
outlier_mask = (df_filtered[column] > df_filtered[upper_col]) | (df_filtered[column] < df_filtered[lower_col])
150+
df_filtered.loc[outlier_mask, column] = np.nan
151+
152+
df_filtered = df_filtered.drop(columns=[mean_col, std_col, upper_col, lower_col])
153+
return df_filtered

tests/io/test_adcp_parser.py

Whitespace-only changes.

tests/io/test_wetlabs_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import numpy as np
44
import pandas as pd
55
import pytest
6-
from EcoFOCIpy.io.wetlabs_parser import wetlabs # Import your class
6+
from EcoFOCIpy.io.wetlabs_parser import wetlabs # Import your class
77

88

99
# Define a fixture to create mock data files for tests

tests/math/test_cleaning.py

Whitespace-only changes.

0 commit comments

Comments
 (0)