Skip to content

Commit

Permalink
Fix api functions and doc
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Nov 23, 2020
1 parent 9d10a44 commit 0d3e55a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 60 deletions.
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,21 @@ streamed into an observer.
Observable -> Operator -> Operator -> Operator -> Observer

Aioreactive contains many of the same operators as you know from RxPY.
Our goal is not to implement them all, but to have the most essential
Our goal is not to implement them all, but to provide the most essential
ones.

* **concat** -- Concatenates two or more observables.
* **choose** -- Filters and/or transforms the observable.
* **choose_asnc** -- Asynchronously filters and/or transforms the observable.
* **debounce** -- Throttles an observable.
* **delay** -- delays the items within an observable.
* **distinct_until_changed** -- an observable with continuously distinct values.
* **filter** -- filters an observable.
* **filteri** -- filters an observable with index.
* **flat_map** -- transforms an observable into a stream of observables and flattens the resulting observable.
* **flat_map_latest** -- transforms an observable into a stream of
observables and flattens the resulting observable by producing values
from the latest observable.
* **from_iterable** -- Create an observable from an (async) iterable.
* **subscribe** -- Subscribes an observer to an observable. Returns a subscription.
* **map** -- transforms an observable.
Expand Down Expand Up @@ -284,7 +290,7 @@ async def test_slice_special():

# Fluent and chained programming style

An alternative to pipelining is to use classic and fluent method
An alternative to pipelining is to use the classic and fluent method
chaining as we know from [ReactiveX](http://reactivex.io).

An `AsyncObservable` created from class methods such as
Expand Down Expand Up @@ -322,9 +328,9 @@ async def test_observable_simple_pipe():
Aioreactive also provides a virtual time event loop
(`VirtualTimeEventLoop`) that enables you to write asyncio unit-tests
that run in virtual time. Virtual time means that time is emulated, so
tests run as quickly as possible even if they sleep or awaits long lived
tests run as quickly as possible even if they sleep or awaits long-lived
operations. A test using virtual time still gives the same result as it
would have done if it had been run in real time.
would have done if it had been run in real-time.

For example the following test still gives the correct result even if it
takes 0 seconds to run:
Expand All @@ -351,7 +357,7 @@ async def test_call_later():
assert result == [2, 3, 1]
```

The `aioreactive.testing` module provides a test `AsyncSubject` that may
The aioreactive testing module provides a test `AsyncSubject` that may
delay sending values, and a test `AsyncTestObserver` that records all
events. These two classes helps you with testing in virtual time.

Expand Down
124 changes: 122 additions & 2 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,38 @@ def single(cls, value: TSource) -> "AsyncRx[TSource]":
def as_async_observable(self) -> AsyncObservable[TSource]:
return AsyncAnonymousObservable(self.subscribe_async)

def choose(self, chooser: Callable[[TSource], Option[TSource]]) -> AsyncObservable[TSource]:
"""Choose.
Applies the given function to each element of the stream and returns
the stream comprised of the results for each element where the
function returns Some with some value.
Args:
chooser: A function to transform or filter the stream
by returning `Some(value)` or `Nothing`.
Returns:
The filtered and/or transformed stream.
"""
return AsyncRx(pipe(self, choose(chooser)))

def choose_async(self, chooser: Callable[[TSource], Awaitable[Option[TSource]]]) -> AsyncObservable[TSource]:
"""Choose async.
Applies the given async function to each element of the stream and
returns the stream comprised of the results for each element where
the function returns Some with some value.
Args:
chooser: A function to transform or filter the stream
asynchronously by returning `Some(value)` or `Nothing`.
Returns:
The filtered and transformed stream.
"""
return AsyncRx(pipe(self, choose_async(chooser)))

def combine_latest(self, other: TOther) -> "AsyncRx[Tuple[TSource, TOther]]":
from .combine import combine_latest

Expand Down Expand Up @@ -161,10 +193,40 @@ def distinct_until_changed(self) -> AsyncObservable[TSource]:
return AsyncRx(distinct_until_changed(self))

def filter(self, predicate: Callable[[TSource], bool]) -> "AsyncRx[TSource]":
"""Filter stream.
Filters the elements of an observable sequence based on a predicate.
Returns an observable sequence that contains elements from the input
sequence that satisfy the condition.
Args:
predicate:
A function to filter the stream by returning `True` to
keep the item, or `False` to filter and remove the item.
Returns:
The filtered stream.
"""

from .filtering import filter as _filter

return AsyncRx(pipe(self, _filter(predicate)))

def filteri(self, predicate: Callable[[TSource, int], bool]) -> AsyncObservable[TSource]:
"""Filter with index.
Filters the elements of an observable sequence based on a predicate
and incorporating the element's index on each element of the source.
Args:
predicate: Function to test each element.
Returns:
An observable sequence that contains elements from the input
sequence that satisfy the condition.
"""
return AsyncRx(pipe(self, filteri(predicate)))

def filter_async(self, predicate: Callable[[TSource], Awaitable[bool]]) -> "AsyncRx[TSource]":
from .filtering import filter_async

Expand Down Expand Up @@ -312,12 +374,40 @@ def as_chained(source: AsyncObservable[TSource]) -> AsyncRx[TSource]:


def choose(chooser: Callable[[TSource], Option[TSource]]) -> Stream[TSource, TSource]:
"""Choose.
Applies the given function to each element of the stream and returns
the stream comprised of the results for each element where the
function returns Some with some value.
Args:
chooser: A function to transform or filter the stream
by returning `Some(value)` or `Nothing`.
Returns:
The filtered and/or transformed stream.
"""

from .filtering import choose

return choose(chooser)


def choose_async(chooser: Callable[[TSource], Awaitable[Option[TSource]]]) -> Stream[TSource, TSource]:
"""Choose async.
Applies the given async function to each element of the stream and
returns the stream comprised of the results for each element where
the function returns Some with some value.
Args:
chooser: An async function to transform or filter the stream
by returning `Some(value)` or `Nothing`.
Returns:
The filtered and/or transformed stream.
"""

from .filtering import choose_async

return choose_async(chooser)
Expand Down Expand Up @@ -404,12 +494,41 @@ def empty() -> "AsyncObservable[TSource]":


def filter(predicate: Callable[[TSource], bool]) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]:
"""Filter stream.
Filters the elements of an observable sequence based on a predicate.
Returns an observable sequence that contains elements from the input
sequence that satisfy the condition.
Args:
predicate:
A function to filter the stream by returning `True` to
keep the item, or `False` to filter and remove the item.
Returns:
The filtered stream.
"""
from .filtering import filter as _filter

return _filter(predicate)


print(filter)
def filteri(predicate: Callable[[TSource, int], bool]) -> Stream[TSource, TSource]:
"""Filter with index.
Filters the elements of an observable sequence based on a predicate
and incorporating the element's index on each element of the source.
Args:
predicate: Function to test each element.
Returns:
An observable sequence that contains elements from the input
sequence that satisfy the condition.
"""
from .filtering import filteri

return filteri(predicate)


def filter_async(
Expand Down Expand Up @@ -742,7 +861,8 @@ def with_latest_from(other: AsyncObservable[TOther]) -> Stream[TSource, Tuple[TS
"concat_seq" "delay",
"empty",
"filter",
"filteri" "filter_async",
"filteri",
"filter_async",
"from_async",
"from_iterable",
"flat_map",
Expand Down
53 changes: 0 additions & 53 deletions aioreactive/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@


def choose_async(chooser: Callable[[TSource], Awaitable[Option[TResult]]]) -> Stream[TSource, TResult]:
"""Choose async.
Applies the given async function to each element of the stream and
returns the stream comprised of the results for each element where
the function returns Some with some value.
Args:
chooser (Callable[[TSource], Awaitable[Option[TResult]]]): [description]
Returns:
Stream[TSource, TResult]: [description]
"""

async def handler(next: Callable[[TResult], Awaitable[None]], xs: TSource) -> None:
result = await chooser(xs)
for x in result.to_list():
Expand All @@ -40,19 +27,6 @@ async def handler(next: Callable[[TResult], Awaitable[None]], xs: TSource) -> No


def choose(chooser: Callable[[TSource], Option[TResult]]) -> Stream[TSource, TResult]:
"""Choose.
Applies the given function to each element of the stream and returns
the stream comprised of the results for each element where the
function returns Some with some value.
Args:
chooser (Callable[[TSource], Option[TResult]]): [description]
Returns:
Stream[TSource, TResult]: [description]
"""

def handler(next: Callable[[TResult], Awaitable[None]], xs: TSource) -> Awaitable[None]:
for x in chooser(xs).to_list():
return next(x)
Expand Down Expand Up @@ -83,20 +57,6 @@ async def handler(next: Callable[[TSource], Awaitable[None]], x: TSource):


def filter(predicate: Callable[[TSource], bool]) -> Stream[TSource, TSource]:
"""Filter stream.
Filters the elements of an observable sequence based on a predicate.
Returns an observable sequence that contains elements from the input
sequence that satisfy the condition.
Args:
predicate (Callable[[TSource], bool]): [description]
Returns:
Stream[TSource, TSource]: [description]
"""

def handler(next: Callable[[TSource], Awaitable[None]], x: TSource) -> Awaitable[None]:
if predicate(x):
return next(x)
Expand Down Expand Up @@ -128,19 +88,6 @@ def handler(next: Callable[[Tuple[TSource, ...]], Awaitable[None]], args: Tuple[


def filteri(predicate: Callable[[TSource, int], bool]) -> Stream[TSource, TSource]:
"""Filter with index.
Filters the elements of an observable sequence based on a predicate
and incorporating the element's index on each element of the source.
Args:
predicate: Function to test each element.
Returns:
An observable sequence that contains elements from the input
sequence that satisfy the condition.
"""

return compose(
zip_seq(seq.infinite()),
starfilter(predicate),
Expand Down

0 comments on commit 0d3e55a

Please sign in to comment.