Skip to content

Commit

Permalink
This commit is a start to address step sizes in rolling windows (pand…
Browse files Browse the repository at this point in the history
…as-dev#15354)

and a hint to how to handle iterating windows (pandas-dev#11704)
  • Loading branch information
anthonytw committed May 13, 2020
1 parent 9a741d3 commit 574a199
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start,
cdef:
float64_t sum_x = 0
int64_t s, e
int64_t nobs = 0, i, j, N = len(values)
int64_t nobs = 0, i, j, N = len(start)
ndarray[float64_t] output
bint is_monotonic_bounds

Expand Down
125 changes: 77 additions & 48 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ from numpy cimport ndarray, int64_t
def calculate_variable_window_bounds(
int64_t num_values,
int64_t window_size,
object min_periods, # unused but here to match get_window_bounds signature
object step_size_obj,
object min_periods_obj,
object center, # unused but here to match get_window_bounds signature
object closed,
const int64_t[:] index
Expand All @@ -25,8 +26,11 @@ def calculate_variable_window_bounds(
window_size : int64
window size calculated from the offset
step_size : Optional[int], default None
the window step size
min_periods : object
ignored, exists for compatibility
Minimum data points in each window.
center : object
ignored, exists for compatibility
Expand All @@ -42,68 +46,93 @@ def calculate_variable_window_bounds(
(ndarray[int64], ndarray[int64])
"""
cdef:
bint left_closed = False
bint right_closed = False
int index_growth_sign = 1
bint left_open = False
bint right_open = False
int idx_scalar = 1
ndarray[int64_t, ndim=1] start, end
int64_t start_bound, end_bound
int64_t step_size, min_periods
int64_t index_i, index_si, index_ei,
int64_t index_window_i, index_step_i
int64_t index_window_max, index_step_max
int64_t window_i = 0
int64_t next_index_si = 0
int64_t next_index_ei = 0
Py_ssize_t i, j

# if windows is variable, default is 'right', otherwise default is 'both'
if closed is None:
closed = 'right' if index is not None else 'both'
closed = 'left'

if closed in ['right', 'both']:
right_closed = True
if closed not in ['right', 'both']:
right_open = True

if closed in ['left', 'both']:
left_closed = True
if closed not in ['left', 'both']:
left_open = True

# Assume index is monotonic increasing or decreasing. If decreasing (WHY??) negate values.
if index[num_values - 1] < index[0]:
index_growth_sign = -1
idx_scalar = -1

# Minimum "observations".
min_periods = min_periods_obj if min_periods_obj is not None else 0
step_size = step_size_obj if step_size_obj is not None else 1

start = np.empty(num_values, dtype='int64')
start.fill(-1)
end = np.empty(num_values, dtype='int64')
end.fill(-1)

start[0] = 0
if num_values < 1:
return start, end

# Indexing into indices: index_si index_ei (index start/end)
# Indexing into start/end arrays: window_i
# This will find closed intervals [start, end]

# right endpoint is closed
if right_closed:
end[0] = 1
# right endpoint is open
else:
end[0] = 0
window_i = 0
next_index_si = 0
next_index_ei = 0

with nogil:
while next_index_ei < num_values:
index_si = next_index_si

# start is start of slice interval (including)
# end is end of slice interval (not including)
for i in range(1, num_values):
end_bound = index[i]
start_bound = index[i] - index_growth_sign * window_size

# left endpoint is closed
if left_closed:
start_bound -= 1

# advance the start bound until we are
# within the constraint
start[i] = i
for j in range(start[i - 1], i):
if (index[j] - start_bound) * index_growth_sign > 0:
start[i] = j
start[window_i] = index_si

index_window_max = index[index_si] + idx_scalar*(window_size - 1)
index_step_max = index[index_si] + idx_scalar*(step_size - 1)

# Find end of step.
index_step_i = num_values - 1
for index_i in range(index_si + 1, num_values):
# Outside of step?
if idx_scalar*index[index_i] > idx_scalar*index_step_max:
index_step_i = index_i - 1
break

# end bound is previous end
# or current index
if (index[end[i - 1]] - end_bound) * index_growth_sign <= 0:
end[i] = i + 1
else:
end[i] = end[i - 1]

# right endpoint is open
if not right_closed:
end[i] -= 1
return start, end
# Find end of window.
index_window_i = num_values - 1
for index_i in range(next_index_ei + 1, num_values):
# Outside of window?
if idx_scalar*index[index_i] > idx_scalar*index_window_max:
index_window_i = index_i - 1
break

next_index_si = index_step_i + 1
next_index_ei = next_index_si if next_index_si > index_window_i + 1 else index_window_i + 1

end[window_i] = index_window_i
window_i += 1

# Remove excess slots.
valid_idx = (start >= 0) & (start <= end)

# And windows without enough data.
if min_periods is not None:
valid_idx &= (end - start + 1) >= min_periods

# Update open boundaries.
if left_open:
start -= 1
if right_open:
end += 1

return start[valid_idx], end[valid_idx]
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10396,6 +10396,7 @@ def rolling(
on=None,
axis=0,
closed=None,
step=None,
):
axis = self._get_axis_number(axis)

Expand All @@ -10409,6 +10410,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
)

return Rolling(
Expand All @@ -10420,6 +10422,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
)

cls.rolling = rolling
Expand Down
68 changes: 56 additions & 12 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
number of values that will be aggregated over
window_size : int, default 0
the number of rows in a window
step_size : int, default 1
the window step size
min_periods : int, default None
min_periods passed from the top level rolling API
center : bool, default None
Expand All @@ -35,7 +37,11 @@ class BaseIndexer:
"""Base class for window bounds calculations."""

def __init__(
self, index_array: Optional[np.ndarray] = None, window_size: int = 0, **kwargs,
self,
index_array: Optional[np.ndarray] = None,
window_size: int = 0,
step_size: Optional[int] = None,
**kwargs,
):
"""
Parameters
Expand All @@ -45,6 +51,8 @@ def __init__(
"""
self.index_array = index_array
self.window_size = window_size
self.step_size = step_size

# Set user defined kwargs as attributes that can be used in get_window_bounds
for key, value in kwargs.items():
setattr(self, key, value)
Expand Down Expand Up @@ -73,17 +81,52 @@ def get_window_bounds(
closed: Optional[str] = None,
) -> Tuple[np.ndarray, np.ndarray]:

start_s = np.zeros(self.window_size, dtype="int64")
start_e = (
np.arange(self.window_size, num_values, dtype="int64")
- self.window_size
+ 1
)
start = np.concatenate([start_s, start_e])[:num_values]
if self.step_size is not None:
"""
Proposed new behavior. Ignores partially filled windows, which don't really
make sense with fixed (index) width windows. Alignment assumers either
centered (`center` = True) or left-aligned (`center` = False). `align`
parameter should probably replace `center` with left, right, and center
options.
"""

# Compute intervals in semi-closed form [start, end)
loffset = self.step_size // 2 if center else 0
start = np.arange(
loffset,
num_values - self.window_size + 1,
self.step_size,
dtype="int64")
end = start + self.window_size

# Open/close interval appropriately.
if closed is None:
closed = 'right'

if closed in ['right', 'both']:
# Close right side of interval.
end -= 1

if closed not in ['left', 'both']:
# Open left side of interval.
start -= 1

else:
"""
Maintained to reproduce old behavior. Unclear if this should remain.
"""
start_s = np.zeros(self.window_size, dtype="int64")
start_e = (
np.arange(self.window_size, num_values, dtype="int64")
- self.window_size
+ 1
)
start = np.concatenate([start_s, start_e])[:num_values]

end_s = np.arange(self.window_size, dtype="int64") + 1
end_e = start_e + self.window_size
end = np.concatenate([end_s, end_e])[:num_values]

end_s = np.arange(self.window_size, dtype="int64") + 1
end_e = start_e + self.window_size
end = np.concatenate([end_s, end_e])[:num_values]
return start, end


Expand All @@ -100,7 +143,8 @@ def get_window_bounds(
) -> Tuple[np.ndarray, np.ndarray]:

return calculate_variable_window_bounds(
num_values, self.window_size, min_periods, center, closed, self.index_array,
num_values, self.window_size, self.step_size,
min_periods, center, closed, self.index_array,
)


Expand Down

1 comment on commit 574a199

@anthonytw
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • NOTE: Poorly tested, not production ready. I expect this to get rejected, it's largely for discussion.

This pull requests illustrates how step sizes in rolling windows may be handled (pandas-dev#15354) and hints at an easy way to iterate through windows (pandas-dev#11704). However, it will require a bit of manual re-writing to get working, see comments in pandas-dev#15354

Please sign in to comment.