Skip to content

Commit

Permalink
ENH: Rolling window with step size (pandas-devGH-15354)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtpsw committed Jan 31, 2022
1 parent c64fbce commit ff6a898
Show file tree
Hide file tree
Showing 22 changed files with 913 additions and 493 deletions.
3 changes: 2 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
12 changes: 9 additions & 3 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -38,17 +39,20 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed
step : int64
Spacing between windows
index : ndarray[int64]
time series index to roll over
Returns
-------
(ndarray[int64], ndarray[int64])
(ndarray[int64], ndarray[int64], ndarray[int64])
"""
cdef:
bint left_closed = False
bint right_closed = False
ndarray[int64_t, ndim=1] start, end
ndarray[int64_t, ndim=1] start, end, ref
int64_t start_bound, end_bound, index_growth_sign = 1
Py_ssize_t i, j

Expand Down Expand Up @@ -143,4 +147,6 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start, end
ref = (None if step is None or step == 1
else np.arange(0, num_values, step, dtype='int64'))
return start[::step], end[::step], ref
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11263,6 +11263,7 @@ def rolling(
on: str | None = None,
axis: Axis = 0,
closed: str | None = None,
step: int | None = None,
method: str = "single",
):
axis = self._get_axis_number(axis)
Expand All @@ -11277,6 +11278,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand All @@ -11289,6 +11291,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand Down
91 changes: 67 additions & 24 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
center passed from the top level rolling API
closed : str, default None
closed passed from the top level rolling API
step : int, default None
step passed from the top level rolling API
win_type : str, default None
win_type passed from the top level rolling API
Returns
-------
A tuple of ndarray[int64]s, indicating the boundaries of each
window
A tuple of ndarray[int64]s:
start : array of start boundaries
end : array of end boundaries
ref : array of window reference locations, or None indicating all
must be None if step is None or 1
"""


Expand All @@ -55,14 +60,25 @@ def __init__(
for key, value in kwargs.items():
setattr(self, key, value)

def _get_default_ref(self, num_values: int = 0, step: int | None = None):
"""
Returns the default window reference locations.
"""
return (
None
if step is None or step == 1
else np.arange(0, num_values, step, dtype="int64")
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

raise NotImplementedError

Expand All @@ -77,14 +93,15 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
offset = (self.window_size - 1) // 2
else:
offset = 0

end = np.arange(1 + offset, num_values + 1 + offset, dtype="int64")
end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
start = end - self.window_size
if closed in ["left", "both"]:
start -= 1
Expand All @@ -94,7 +111,8 @@ def get_window_bounds(
end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class VariableWindowIndexer(BaseIndexer):
Expand All @@ -107,7 +125,8 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
Expand All @@ -119,6 +138,7 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
step if step is not None else 1,
self.index_array, # type: ignore[arg-type]
)

Expand All @@ -145,11 +165,14 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# if windows is variable, default is 'right', otherwise default is 'both'
if closed is None:
closed = "right" if self.index is not None else "both"
if step is None:
step = 1

right_closed = closed in ["right", "both"]
left_closed = closed in ["left", "both"]
Expand Down Expand Up @@ -202,7 +225,8 @@ def get_window_bounds(
if not right_closed:
end[i] -= 1

return start, end
ref = self._get_default_ref(num_values, step)
return start[::step], end[::step], ref


class ExpandingIndexer(BaseIndexer):
Expand All @@ -215,12 +239,15 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)
if step is None:
step = 1
end = np.arange(1, num_values + 1, step, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
ref = self._get_default_ref(num_values, step)
return start, end, ref


class FixedForwardWindowIndexer(BaseIndexer):
Expand Down Expand Up @@ -256,21 +283,25 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
raise ValueError("Forward-looking windows can't have center=True")
if closed is not None:
raise ValueError(
"Forward-looking windows don't support setting the closed argument"
)
if step is None:
step = 1

start = np.arange(num_values, dtype="int64")
start = np.arange(num_values, step=step, dtype="int64")
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values
end = np.clip(end, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class GroupbyIndexer(BaseIndexer):
Expand Down Expand Up @@ -319,12 +350,14 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:
# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
ref_arrays = []
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None
Expand All @@ -338,11 +371,12 @@ def get_window_bounds(
window_size=self.window_size,
**self.indexer_kwargs,
)
start, end = indexer.get_window_bounds(
len(indices), min_periods, center, closed
start, end, ref = indexer.get_window_bounds(
len(indices), min_periods, center, closed, step
)
start = start.astype(np.int64)
end = end.astype(np.int64)
ref = None if ref is None else ref.astype(np.int64)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
Expand All @@ -358,9 +392,13 @@ def get_window_bounds(
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
ref_arrays.append(
None if ref is None else window_indices.take(ensure_platform_int(ref))
)
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
ref = None if step is None or step == 1 else np.concatenate(ref_arrays)
return start, end, ref


class ExponentialMovingWindowIndexer(BaseIndexer):
Expand All @@ -373,6 +411,11 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
None,
)
11 changes: 8 additions & 3 deletions pandas/core/window/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def flex_binary_moment(arg1, arg2, f, pairwise=False):
from pandas import DataFrame

def dataframe_from_int_dict(data, frame_template):
result = DataFrame(data, index=frame_template.index)
result = DataFrame(
data, index=None if len(data) > 0 else frame_template.index
)
if len(result.columns) > 0:
result.columns = frame_template.columns[result.columns]
return result
Expand All @@ -42,13 +44,16 @@ def dataframe_from_int_dict(data, frame_template):
raise ValueError("'arg2' columns are not unique")
X, Y = arg1.align(arg2, join="outer")
X, Y = prep_binary(X, Y)
result_index = X.index
res_columns = arg1.columns.union(arg2.columns)
for col in res_columns:
if col in X and col in Y:
results[col] = f(X[col], Y[col])
return DataFrame(results, index=X.index, columns=res_columns)
result_index = results[col].index
return DataFrame(results, index=result_index, columns=res_columns)
elif pairwise is True:
results = defaultdict(dict)
result_index = arg1.index.union(arg2.index)
for i in range(len(arg1.columns)):
for j in range(len(arg2.columns)):
if j < i and arg2 is arg1:
Expand All @@ -58,10 +63,10 @@ def dataframe_from_int_dict(data, frame_template):
results[i][j] = f(
*prep_binary(arg1.iloc[:, i], arg2.iloc[:, j])
)
result_index = results[i][j].index

from pandas import concat

result_index = arg1.index.union(arg2.index)
if len(result_index):

# construct result frame
Expand Down
8 changes: 5 additions & 3 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def __init__(
)

def _check_window_bounds(
self, start: np.ndarray, end: np.ndarray, num_vals: int
self, start: np.ndarray, end: np.ndarray, ref: np.ndarray, num_vals: int
) -> None:
# emw algorithms are iterative with each point
# ExponentialMovingWindowIndexer "bounds" are the entire window
Expand Down Expand Up @@ -732,11 +732,12 @@ def cov_func(x, y):
if self.min_periods is not None
else window_indexer.window_size
)
start, end = window_indexer.get_window_bounds(
start, end, ref = window_indexer.get_window_bounds(
num_values=len(x_array),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)
result = window_aggregations.ewmcov(
x_array,
Expand Down Expand Up @@ -798,11 +799,12 @@ def cov_func(x, y):
if self.min_periods is not None
else window_indexer.window_size
)
start, end = window_indexer.get_window_bounds(
start, end, ref = window_indexer.get_window_bounds(
num_values=len(x_array),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)

def _cov(X, Y):
Expand Down
4 changes: 2 additions & 2 deletions pandas/core/window/numba_.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ def roll_table(
minimum_periods: int,
*args: Any,
):
result = np.empty(values.shape)
min_periods_mask = np.empty(values.shape)
result = np.empty((len(begin), values.shape[1]))
min_periods_mask = np.empty(result.shape)
for i in numba.prange(len(result)):
start = begin[i]
stop = end[i]
Expand Down

0 comments on commit ff6a898

Please sign in to comment.