Skip to content

Commit

Permalink
Merge pull request #44 from dbrattli/fix-example
Browse files Browse the repository at this point in the history
Fix split example
  • Loading branch information
dbrattli committed Jan 18, 2024
2 parents 51608bd + dd43379 commit c65b019
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repos:
- id: ruff-format
args: [--check]
repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.11
rev: v0.1.13
- hooks:
- id: pyright
name: pyright
Expand Down
6 changes: 2 additions & 4 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,14 @@ def merge(self, other: AsyncObservable[_TSource]) -> AsyncRx[_TSource]:
AsyncRx.create,
)

def reduce(
self, accumulator: Callable[[_TResult, _TSource], _TResult], initial: _TResult
) -> "AsyncRx[_TResult]":
def reduce(self, accumulator: Callable[[_TResult, _TSource], _TResult], initial: _TResult) -> AsyncRx[_TResult]:
return pipe(self, reduce(accumulator, initial), AsyncRx[_TResult])

def reduce_async(
self,
accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]],
initial: _TResult,
) -> "AsyncRx[_TResult]":
) -> AsyncRx[_TResult]:
return pipe(self, reduce_async(accumulator, initial), AsyncRx[_TResult])

def skip(self, count: int) -> AsyncObservable[_TSource]:
Expand Down
34 changes: 2 additions & 32 deletions aioreactive/notification.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable, Iterable
from collections.abc import Awaitable, Callable
from enum import Enum
from typing import Any, Generic, TypeVar, get_origin
from typing import Any, Generic, TypeVar

from .types import AsyncObserver

Expand Down Expand Up @@ -59,15 +59,6 @@ async def accept(
async def accept_observer(self, obv: AsyncObserver[_TSource]) -> None:
await obv.asend(self.value)

def __match__(self, pattern: Any) -> Iterable[_TSource]:
origin: Any = get_origin(pattern)
try:
if isinstance(self, origin or pattern):
return [self.value]
except TypeError:
pass
return []

def __eq__(self, other: Any) -> bool:
if isinstance(other, OnNext):
return self.value == other.value # type: ignore
Expand Down Expand Up @@ -98,15 +89,6 @@ async def accept(
async def accept_observer(self, obv: AsyncObserver[_TSource]) -> None:
await obv.athrow(self.exception)

def __match__(self, pattern: Any) -> Iterable[Exception]:
origin: Any = get_origin(pattern)
try:
if isinstance(self, origin or pattern):
return [self.exception]
except TypeError:
pass
return []

def __eq__(self, other: Any) -> bool:
if isinstance(other, OnError):
return self.exception == other.exception
Expand Down Expand Up @@ -137,18 +119,6 @@ async def accept(
async def accept_observer(self, obv: AsyncObserver[_TSource]) -> None:
await obv.aclose()

def __match__(self, pattern: Any) -> Iterable[bool]:
if self is pattern:
return [True]

origin: Any = get_origin(pattern)
try:
if isinstance(self, origin or pattern):
return [True]
except TypeError:
pass
return []

def __eq__(self, other: Any) -> bool:
if isinstance(other, OnCompleted):
return True
Expand Down
3 changes: 3 additions & 0 deletions examples/streams/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ async def mysink(value: int):
await odds.subscribe_async(rx.AsyncAnonymousObserver(mysink))
await evens.subscribe_async(rx.AsyncAnonymousObserver(mysink))

# Wait to avoid the program exiting before the streams are finished.
await asyncio.sleep(1)


if __name__ == "__main__":
asyncio.run(main())
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tests/test_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def test_reduce():
values = list(map(lambda t: t[1], observer.values))
assert values == [
OnNext(10),
OnCompleted,
OnCompleted(),
]


Expand All @@ -55,5 +55,5 @@ async def test_reduce_async():
values = list(map(lambda t: t[1], observer.values))
assert values == [
OnNext(10),
OnCompleted,
OnCompleted(),
]

0 comments on commit c65b019

Please sign in to comment.