Skip to content

akarnokd/async-enumerable

Repository files navigation

async-enumerable

codecov.io Maven Central

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).

Gradle

compile "com.github.akarnokd:async-enumerable:0.6.0"

Getting started

The main entry point is the hu.akarnokd.asyncenum.AsyncEnumerable interface with its static factory methods similar to RxJava:

AsyncEnumerable<Integer> source = AsyncEnumerable.range(1, 10);

AsyncEnumerable<String> strings = AsyncEnumerable.fromArray("a", "b", "c", "d");

AsyncEnumerable<T> is a deferred cold source, which can be synchronous or asynchronous, the enumerator() has to be called to receive another interface, hu.akarnokd.asyncenum.AsyncEnumerator to be "iterated" over.

AsyncEnumerator<Integer> enumerator = source.enumerator();

The AsyncEnumerator<T> defines two methods, moveNext() and current(). Calling moveNext() will instruct the source to produce the next value but instead of returning a false or true immediately, the method returns a java.util.concurrent.CompletionStage<Boolean> that is completed with true if a value is ready and false if no more values to be expected. In the true case, one can read the current value via current().

(Cancelling a sequence is currently partially supported via the cancel() method on AsyncEnumerator, but it feels too much Reactive Streams and not like the pre-existing cancellation support in other async-enumerable libraries.)

CompletionStage<Boolean> stage = enumerator.moveNext();

stage.whenComplete((hasValue, error) -> {
    if (error != null) {
        error.printStackTrace();
        return;
    }
    
    if (hasValue) {
        System.out.println(enumerator.current());
    } else {
        System.out.println("Empty source!");
    }
})

Note that calling moveNext() or current() during the time the CompletionStage hasn't been terminated is an undefined behavior. Calling moveNext after the previous CompletionStage returned false or an exception is also undefined behavior.

Therefore, consuming multiple values via a plain for loop doesn't work; one has to call moveNext when the previous CompletionStage completed with true in a recursively looking pattern. Since some AsyncEnumerable chains can be synchronous, this leads to StackOverflowError if not handled properly.

For this purpose, the forEach() instance method on AsyncEnumerable is available, but given an AsyncEnumerator, the following consumption pattern can be employed:

final class EnumeratorConsumer<T> extends AtomicInteger implements BiConsumer<Boolean, Throwable> {
    
    final AsyncEnumerator<T> enumerator;
    
    public EnumeratorConsumer(AsyncEnumerator<T> enumerator) {
        this.enumerator = enumerator;
    }
    
    @Override
    public void accept(Boolean hasNext, Throwable error) {
        if (error != null) {
            // handle error case
            return;
        }
        if (hasNext) {
            T value = enumerator.current();
            // handle current value
            moveNext();   
        } else {
            // handle no more values
        }
    }
    
    public void moveNext() {
        if (getAndIncrement() == 0) {
            do {
                enumerator.moveNext().whenComplete(this);
            } while (decrementAndGet() != 0);
        }
    }
}

new EnumeratorConsumer(source.enumerator()).moveNext();

This is practically the same queue-drain or trampolining logic used throughout RxJava. It is recommended though to use the combinators and operators of AsyncEnumerable instead as working with a sequence of CompletionStage continuations, especially when there are multiple active sequences involved as complications.

About

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published