Skip to content

ReactiveX/RxPY

Repository files navigation

The ReactiveX for Python (RxPY)

Build Status Coverage Status PyPY Package Version Documentation Status

A library for composing asynchronous and event-based programs using observable collections and query operator functions in Python

ReactiveX for Python v4

For v3.X please go to the v3 branch.

ReactiveX for Python v4.x runs on Python 3.7 or above. To install:

pip3 install reactivex

About ReactiveX

ReactiveX for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using operators, and parameterize concurrency in data/event streams using Schedulers.

import reactivex as rx
from reactivex import operators as ops

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))

Learning ReactiveX

Read the documentation to learn the principles of ReactiveX and get the complete reference of the available operators.

If you need to migrate code from RxPY v1.x or v3.x, read the migration section.

There is also a list of third party documentation available here.

Community

Join the conversation on GitHub Discussions! if you have any questions or suggestions.

Differences from .NET and RxJS

ReactiveX for Python is a fairly complete implementation of Rx with more than 120 operators, and over 1300 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from Rx.NET and RxJava in terms of threading and blocking operators.

ReactiveX for Python follows PEP 8, so all function and method names are snake_cased i.e lowercase with words separated by underscores as necessary to improve readability.

Thus .NET code such as:

var group = source.GroupBy(i => i % 3);

need to be written with an _ in Python:

group = source.pipe(ops.group_by(lambda i: i % 3))

With ReactiveX for Python you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).

Development

This project is managed using Poetry. Code is formatted using Black, isort. Code is statically type checked using pyright and mypy.

If you want to take advantage of the default VSCode integration, then first configure Poetry to make its virtual environment in the repository:

poetry config virtualenvs.in-project true

After cloning the repository, activate the tooling:

poetry install
poetry run pre-commit install

Run unit tests:

poetry run pytest

Run code checks (manually):

poetry run pre-commit run --all-files