Skip to content

Commit

Permalink
ENH: Rolling window with step size (pandas-devGH-15354)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtpsw committed Feb 1, 2022
1 parent c64fbce commit ccc94a9
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 96 deletions.
3 changes: 2 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
12 changes: 9 additions & 3 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -38,17 +39,20 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed
step : int64
Spacing between windows
index : ndarray[int64]
time series index to roll over
Returns
-------
(ndarray[int64], ndarray[int64])
(ndarray[int64], ndarray[int64], ndarray[int64])
"""
cdef:
bint left_closed = False
bint right_closed = False
ndarray[int64_t, ndim=1] start, end
ndarray[int64_t, ndim=1] start, end, ref
int64_t start_bound, end_bound, index_growth_sign = 1
Py_ssize_t i, j

Expand Down Expand Up @@ -143,4 +147,6 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start, end
ref = (None if step is None or step == 1
else np.arange(0, num_values, step, dtype='int64'))
return start[::step], end[::step], ref
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11263,6 +11263,7 @@ def rolling(
on: str | None = None,
axis: Axis = 0,
closed: str | None = None,
step: int | None = None,
method: str = "single",
):
axis = self._get_axis_number(axis)
Expand All @@ -11277,6 +11278,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand All @@ -11289,6 +11291,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand Down
146 changes: 114 additions & 32 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
center passed from the top level rolling API
closed : str, default None
closed passed from the top level rolling API
step : int, default None
step passed from the top level rolling API
win_type : str, default None
win_type passed from the top level rolling API
Returns
-------
A tuple of ndarray[int64]s, indicating the boundaries of each
window
A tuple of ndarray[int64]s:
start : array of start boundaries
end : array of end boundaries
ref : array of window reference locations, or None indicating all if step is None or 1
"""


Expand All @@ -55,6 +59,16 @@ def __init__(
for key, value in kwargs.items():
setattr(self, key, value)

def _get_default_ref(self, num_values: int = 0, step: int | None = None):
"""
Returns the default window reference locations.
"""
return (
None
if step is None or step == 1
else np.arange(0, num_values, step, dtype="int64")
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
Expand All @@ -66,9 +80,23 @@ def get_window_bounds(

raise NotImplementedError

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

start, end = self.get_window_bounds(num_values, min_periods, center, closed)
ref = self._get_default_ref(num_values, step)
return start[::step], end[::step], ref

class FixedWindowIndexer(BaseIndexer):
"""Creates window boundaries that are of fixed length."""

class BaseIndexer2(BaseIndexer):
"""Base class for window bounds calculations with step optimization."""

@Appender(get_window_bounds_doc)
def get_window_bounds(
Expand All @@ -79,12 +107,43 @@ def get_window_bounds(
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:

start, end, ref = self.get_window_bounds2(
num_values, min_periods, center, closed
)
return start, end

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

raise NotImplementedError


class FixedWindowIndexer(BaseIndexer2):
"""Creates window boundaries that are of fixed length."""

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
offset = (self.window_size - 1) // 2
else:
offset = 0

end = np.arange(1 + offset, num_values + 1 + offset, dtype="int64")
end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
start = end - self.window_size
if closed in ["left", "both"]:
start -= 1
Expand All @@ -94,20 +153,22 @@ def get_window_bounds(
end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class VariableWindowIndexer(BaseIndexer):
class VariableWindowIndexer(BaseIndexer2):
"""Creates window boundaries that are of variable length, namely for time series."""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
Expand All @@ -119,6 +180,7 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
step if step is not None else 1,
self.index_array, # type: ignore[arg-type]
)

Expand Down Expand Up @@ -205,25 +267,28 @@ def get_window_bounds(
return start, end


class ExpandingIndexer(BaseIndexer):
class ExpandingIndexer(BaseIndexer2):
"""Calculate expanding window bounds, mimicking df.expanding()"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)
if step is None:
step = 1
end = np.arange(1, num_values + 1, step, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
ref = self._get_default_ref(num_values, step)
return start, end, ref


class FixedForwardWindowIndexer(BaseIndexer):
class FixedForwardWindowIndexer(BaseIndexer2):
"""
Creates window boundaries for fixed-length windows that include the
current row.
Expand All @@ -250,30 +315,34 @@ class FixedForwardWindowIndexer(BaseIndexer):
"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
raise ValueError("Forward-looking windows can't have center=True")
if closed is not None:
raise ValueError(
"Forward-looking windows don't support setting the closed argument"
)
if step is None:
step = 1

start = np.arange(num_values, dtype="int64")
start = np.arange(0, num_values, step, dtype="int64")
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values
end = np.clip(end, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class GroupbyIndexer(BaseIndexer):
class GroupbyIndexer(BaseIndexer2):
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""

def __init__(
Expand Down Expand Up @@ -313,18 +382,21 @@ def __init__(
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:
# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
ref_arrays = []
empty = np.array([], dtype=np.int64)
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None
Expand All @@ -338,11 +410,12 @@ def get_window_bounds(
window_size=self.window_size,
**self.indexer_kwargs,
)
start, end = indexer.get_window_bounds(
len(indices), min_periods, center, closed
start, end, ref = indexer.get_window_bounds2(
len(indices), min_periods, center, closed, step
)
start = start.astype(np.int64)
end = end.astype(np.int64)
ref = None if ref is None else ref.astype(np.int64)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
Expand All @@ -358,21 +431,30 @@ def get_window_bounds(
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
ref_arrays.append(
empty if ref is None else window_indices.take(ensure_platform_int(ref))
)
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
ref = None if step is None or step == 1 else np.concatenate(ref_arrays)
return start, end, ref


class ExponentialMovingWindowIndexer(BaseIndexer):
class ExponentialMovingWindowIndexer(BaseIndexer2):
"""Calculate ewm window bounds (the entire window)"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
None,
)

0 comments on commit ccc94a9

Please sign in to comment.