Skip to content

Commit

Permalink
Fix split example
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Nov 22, 2020
1 parent 8d851ec commit 249a582
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 31 deletions.
45 changes: 24 additions & 21 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __getitem__(self, key: Union[slice, int]) -> "AsyncRx[TSource]":
Returne a sliced source stream."""

from .filter import slice as _slice
from .filtering import slice as _slice

if isinstance(key, slice):
start, stop, step = key.start, key.stop, key.step
Expand Down Expand Up @@ -134,17 +134,17 @@ def delay(self, seconds: float) -> "AsyncRx[TSource]":
return AsyncRx(delay(seconds)(self))

def distinct_until_changed(self) -> AsyncObservable[TSource]:
from .filter import distinct_until_changed
from .filtering import distinct_until_changed

return AsyncRx(distinct_until_changed(self))

def filter(self, predicate: Callable[[TSource], bool]) -> "AsyncRx[TSource]":
from .filter import filter
from .filtering import filter as _filter

return AsyncRx(pipe(self, filter(predicate)))
return AsyncRx(pipe(self, _filter(predicate)))

def filter_async(self, predicate: Callable[[TSource], Awaitable[bool]]) -> "AsyncRx[TSource]":
from .filter import filter_async
from .filtering import filter_async

return AsyncRx(pipe(self, filter_async(predicate)))

Expand Down Expand Up @@ -182,7 +182,7 @@ def skip(self, count: int) -> AsyncObservable[TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import skip
from .filtering import skip

return AsyncRx(pipe(self, skip(count)))

Expand All @@ -194,7 +194,7 @@ def starfilter(self, predicate: Callable[..., bool]) -> AsyncObservable[Tuple[TS
An observable sequence that contains elements from the input
sequence that satisfy the condition.
"""
from .filter import starfilter
from .filtering import starfilter

return AsyncRx.create(pipe(self, starfilter(predicate)))

Expand All @@ -220,7 +220,7 @@ def take(self, count: int) -> AsyncObservable[TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take
from .filtering import take

return AsyncRx(pipe(self, take(count)))

Expand All @@ -236,7 +236,7 @@ def take_last(self, count: int) -> AsyncObservable[TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take_last
from .filtering import take_last

return AsyncRx(pipe(self, take_last(count)))

Expand All @@ -252,7 +252,7 @@ def take_until(self, other: AsyncObservable[TResult]) -> AsyncObservable[TSource
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take_until
from .filtering import take_until

return AsyncRx(pipe(self, take_until(other)))

Expand All @@ -276,13 +276,13 @@ def as_chained(source: AsyncObservable[TSource]) -> AsyncRx[TSource]:


def choose(chooser: Callable[[TSource], Option[TSource]]) -> Stream[TSource, TSource]:
from .filter import choose
from .filtering import choose

return choose(chooser)


def choose_async(chooser: Callable[[TSource], Awaitable[Option[TSource]]]) -> Stream[TSource, TSource]:
from .filter import choose_async
from .filtering import choose_async

return choose_async(chooser)

Expand Down Expand Up @@ -354,7 +354,7 @@ def delay(seconds: float) -> Stream[TSource, TSource]:


def distinct_until_changed(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]:
from .filter import distinct_until_changed
from .filtering import distinct_until_changed

return distinct_until_changed(source)

Expand All @@ -366,15 +366,18 @@ def empty() -> "AsyncObservable[TSource]":


def filter(predicate: Callable[[TSource], bool]) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]:
from .filter import filter
from .filtering import filter as _filter

return filter(predicate)
return _filter(predicate)


print(filter)


def filter_async(
predicate: Callable[[TSource], Awaitable[bool]]
) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]:
from .filter import filter_async
from .filtering import filter_async

return filter_async(predicate)

Expand Down Expand Up @@ -552,7 +555,7 @@ def skip(count: int) -> Stream[TSource, TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import skip
from .filtering import skip

return skip(count)

Expand All @@ -565,7 +568,7 @@ def starfilter(predicate: Callable[..., bool]) -> Stream[TSource, Tuple[Any, ...
An observable sequence that contains elements from the input
sequence that satisfy the condition.
"""
from .filter import starfilter
from .filtering import starfilter

return starfilter(predicate)

Expand Down Expand Up @@ -599,7 +602,7 @@ def take(count: int) -> Stream[TSource, TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take
from .filtering import take

return take(count)

Expand All @@ -616,7 +619,7 @@ def take_last(count: int) -> Stream[TSource, TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take_last
from .filtering import take_last

return take_last(count)

Expand All @@ -633,7 +636,7 @@ def take_until(other: AsyncObservable[TResult]) -> Stream[TSource, TSource]:
Returns:
Stream[TSource, TSource]: [description]
"""
from .filter import take_until
from .filtering import take_until

return take_until(other)

Expand Down
File renamed without changes.
19 changes: 9 additions & 10 deletions examples/streams/split.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
"""Example to show how to split a stream into two substreams."""
import asyncio

from aioreactive.core import subscribe, AsyncAnonymousObserver

from aioreactive.core import AsyncObservable, Operators as op
import aioreactive as rx
from expression.core import pipe


async def main():
xs = AsyncObservable.from_iterable(range(10))
xs = rx.from_iterable(range(10))

# Split into odds and evens
odds = xs | op.filter(lambda x: x % 2 == 1)
evens = xs | op.filter(lambda x: x % 2 == 0)
evens = pipe(xs, rx.filter(lambda x: x % 2 == 0))
odds = pipe(xs, rx.filter(lambda x: x % 2 == 1))

async def mysink(value):
async def mysink(value: int):
print(value)

await subscribe(odds, AsyncAnonymousObserver(mysink))
await subscribe(evens, AsyncAnonymousObserver(mysink))
await odds.subscribe_async(rx.AsyncAnonymousObserver(mysink))
await evens.subscribe_async(rx.AsyncAnonymousObserver(mysink))


if __name__ == '__main__':
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

0 comments on commit 249a582

Please sign in to comment.