Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use reservoir sampling for impostors to save memory #5

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
164 changes: 164 additions & 0 deletions pylmnn/impostor_sampling.py
@@ -0,0 +1,164 @@
"""Tools to assist in sampling impostors for PyLMNN. """
from abc import ABC, abstractmethod
import numpy as np


class Sampler(ABC):
@abstractmethod
def extend(self, latest_values):
pass

@abstractmethod
def sample(self):
pass


class UniformSampler(Sampler):
"""Post-hoc uniform sample of fixed size from a stream values.

Parameters
----------
sample_size : int
Number of elements to take as the sample.

random_state : numpy.RandomState
A pseudo random number generator object used for sampling.
"""

def __init__(self, sample_size, random_state):
self.sample_size = sample_size
self.random_state = random_state

self._values = []

def extend(self, latest_values):
"""Adds elements to be sampled from.

Parameters
----------
latest_values : array-like, shape (n_samples,)
The latest part of the stream to sample from.

"""
self._values.extend(latest_values)

def sample(self):
"""Gets the current sample.

Returns
-------
sample : array, shape (sample_size,)
The current sample from the stream

"""
n = len(self._values)
k = self.sample_size
if n > k:
ind = self.random_state.choice(n, k, replace=False)
return np.asarray(self._values, dtype=object)[ind]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the overhead here, when values contains both distances and indices? Maybe there is a more efficient way to sample from multiple arrays? We could also use the knowledge that there are always either 1 or 2 arrays used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this is not ideal, and you're right the root cause is needing to support sampling from parallel arrays of different types (just indicies or indicies and distances). I saw this as a bigger problem and didn't address any of it in favor of brevity.

else:
return self._values


class ReservoirSampler(Sampler):
"""Pseudo-uniform online sample of fixed size from a stream of values.

Parameters
----------
sample_size : int
Number of elements to take as the sample.

random_state : numpy.RandomState
A pseudo random number generator object used for sampling.

References
----------

.. [1] Li, Kim-Hung (4 December 1994).
"Reservoir-Sampling Algorithms of Time Complexity O(n(1+log(N/n)))"
ACM Transactions on Mathematical Software. 20 (4): 481–493.
"""

def __init__(self, sample_size, random_state):
self.sample_size = sample_size
self.random_state = random_state

self._i = 0
self._reservoir = []

u = random_state.random()
self._w = np.exp(np.log(u) / sample_size)

u = random_state.random()
self._next_i = int(sample_size + np.log(u) / np.log(1 - self._w) + 1)

# stores randomly generated values for sampling which we do in batches
# to minimize overhead of getting random numbers
# precomputing and storing these per-sampled-element values is over
# twice as fast as naively generating the random numbers and computing
# them inside the sampling loop
self._rand_replacement = None
self._rand_w_multiplier = None
self._rand_next_i_partial = None
self._rand_ind = sample_size

def extend(self, latest_values):
"""Adds elements to the reservoir sample.

Parameters
----------
latest_values : array-like, shape (n_samples,)
The latest part of the stream to sample from.

"""

offset = self._i
k = self.sample_size
rng = self.random_state

latest_values = list(latest_values) # TODO: This seems unnecessary
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make a copy even when not necessary. Also here we have to maybe exploit the fact that there are 1 or 2 arrays.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here because the caller might be (is in the current impl) sending in an iterable, not necessarily a list which makes some of the operations a little trickier (e.g. random access, getting the length). I agree, there's a better solution and it's tied up in needing to support parallel np arrays of different types.

remaining_capacity = k - offset

# if reservoir not full, copy from latest_values to fill
if remaining_capacity > 0:
to_copy = min(remaining_capacity, len(latest_values))
self._reservoir.extend(latest_values[:to_copy])

# NOTE: At this point, the reservoir might still be not full

# general case: sample from latest_values
while self._next_i - offset < len(latest_values):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this while loop can be vectorized. I need to look at this in more depth, but I think a lot of time can be saved.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I agree, and this is where things are slower for the reservoir sampler. Related to this is the idea of pre-computing the sample indicies in blocks. I could easily do some of this (https://github.com/johny-c/pylmnn/pull/5/files/42d42ab02d310aa4f5a8d7b716ae2bb50396aa16#diff-adeb34b813966f09fdcba22cd080044cf631ad669d04a94a05f971dd51c868b0R131-R139), but my math of random distributions isn't quite good enough to vectorize all of it.

More concisely: I believe it's possible to create a vector of all the necessary elements from the geometric distribution for the given block we're extending from. I just don't know how.

if self._rand_ind >= k:
# refill random numbers in batches to minimize overhead
# also pre-compute some of the arithmetic to minimize overhead
# doing this takes ReservoirSampler from taking about 5x as
# long as numpy.RandomState.choice to ~2.5x as long
self._rand_replacement = rng.randint(0, k, k)
self._rand_w_multiplier = np.exp(np.log(rng.random(k)) / k)
self._rand_next_i_partial = np.log(rng.random(k))
self._rand_ind = 0

# replace a random item of the reservoir with item i
ind_src = self._next_i - offset
ind_dst = self._rand_replacement[self._rand_ind]
self._reservoir[ind_dst] = latest_values[ind_src]

# book-keeping
self._w *= self._rand_w_multiplier[self._rand_ind]
s = self._rand_next_i_partial[self._rand_ind]
self._next_i = int(self._next_i + 1 + s / np.log(1 - self._w))
self._rand_ind += 1

self._i += len(latest_values)

def sample(self):
"""Gets the current reservoir sample.

Returns
-------
sample : array, shape (sample_size,)
The current sample from the stream

"""

return self._reservoir