Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds support for flowable to async iterable conversion #104

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flowconfig
Expand Up @@ -28,4 +28,4 @@ suppress_type=$FlowFixMe
suppress_type=$FixMe

[version]
^0.134.0
^0.136.0
36 changes: 19 additions & 17 deletions package.json
Expand Up @@ -32,41 +32,43 @@
"packages/*"
],
"devDependencies": {
"@babel/cli": "^7.11.6",
"@babel/core": "^7.11.6",
"@babel/cli": "^7.12.1",
"@babel/core": "^7.12.3",
"babel-jest": "^26.6.0",
"babel-eslint": "^10.1.0",
"babel-plugin-minify-replace": "^0.5.0",
"@babel/plugin-transform-async-to-generator": "^7.10.4",
"@babel/plugin-proposal-class-properties": "^7.10.4",
"@babel/plugin-transform-modules-commonjs": "^7.10.4",
"@babel/plugin-transform-flow-strip-types": "^7.10.4",
"@babel/plugin-proposal-object-rest-spread": "^7.11.0",
"@babel/plugin-transform-async-to-generator": "^7.12.1",
"@babel/plugin-proposal-async-generator-functions": "^7.12.1",
"@babel/plugin-proposal-class-properties": "^7.12.1",
"@babel/plugin-transform-modules-commonjs": "^7.12.1",
"@babel/plugin-transform-flow-strip-types": "^7.12.1",
"@babel/plugin-proposal-object-rest-spread": "^7.12.1",
"@babel/plugin-transform-runtime": "^7.11.5",
"@babel/polyfill": "^7.11.5",
"@babel/polyfill": "^7.12.1",
"@babel/runtime": "^7.12.1",
"babel-preset-fbjs": "^3.3.0",
"@babel/runtime": "^7.11.2",
"buffer": "^5.6.0",
"chalk": "^4.1.0",
"cross-env": "^7.0.2",
"eslint": "^7.10.0",
"eslint": "^7.11.0",
"eslint-config-fb-strict": "^26.0.0",
"eslint-plugin-babel": "^5.3.1",
"eslint-plugin-flowtype": "^5.2.0",
"eslint-plugin-jasmine": "^4.1.1",
"eslint-plugin-jsx-a11y": "^6.3.1",
"eslint-plugin-prefer-object-spread": "^1.2.1",
"eslint-plugin-react": "^7.21.2",
"eslint-plugin-react": "^7.21.5",
"eslint-plugin-relay": "^1.8.1",
"eslint-plugin-jest": "^24.0.2",
"fbjs": "^2.0.0",
"fbjs-scripts": "^2.0.0",
"flow-bin": "^0.134.0",
"eslint-plugin-jest": "^24.1.0",
"fbjs": "^3.0.0",
"fbjs-scripts": "^3.0.0",
"flow-bin": "^0.136.0",
"glob": "^7.1.6",
"jest": "^26.4.2",
"jest": "^26.6.0",
"lerna": "^3.22.1",
"object-assign": "^4.1.1",
"prettier": "2.1.2",
"rollup": "^2.28.2",
"rollup": "^2.32.1",
"@rollup/plugin-babel": "^5.2.1",
"@rollup/plugin-commonjs": "^15.1.0",
"@rollup/plugin-node-resolve": "^9.0.0",
Expand Down
14 changes: 8 additions & 6 deletions packages/rsocket-flowable/src/Flowable.js
Expand Up @@ -26,6 +26,7 @@ import type {

import FlowableMapOperator from './FlowableMapOperator';
import FlowableTakeOperator from './FlowableTakeOperator';
import FlowableAsyncIterable from './FlowableAsyncIterable';

import invariant from 'fbjs/lib/invariant';
import warning from 'fbjs/lib/warning';
Expand Down Expand Up @@ -117,6 +118,10 @@ export default class Flowable<T> implements IPublisher<T> {
);
}

toAsyncIterable(prefetch: number = 256): AsyncIterable<T> {
return new FlowableAsyncIterable<T>(this, prefetch);
}

_wrapCallback(callback: (T) => void): IPartialSubscriber<T> {
const max = this._max;
return {
Expand Down Expand Up @@ -255,17 +260,14 @@ class FlowableSubscriber<T> implements ISubscriber<T> {

_request = (n: number) => {
invariant(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
Number.isInteger(n) && n >= 1,
'Flowable: Expected request value to be an integer with a value greater than 0, got `%s`.',
n,
);
if (!this._active) {
return;
}
if (n === this._max) {
if (n >= this._max) {
this._pending = this._max;
} else {
this._pending += n;
Expand Down
153 changes: 153 additions & 0 deletions packages/rsocket-flowable/src/FlowableAsyncIterable.js
@@ -0,0 +1,153 @@
/** Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @flow
*/

'use strict';

import type {ISubscriber, ISubscription} from 'rsocket-types';
import Flowable from './Flowable';

// $FlowFixMe
export default class FlowableAsyncIterable<T> implements AsyncIterable<T> {
_source: Flowable<T>;
_prefetch: number;

constructor(source: Flowable<T>, prefetch: number = 256) {
this._source = source;
this._prefetch = prefetch;
}

asyncIterator(): AsyncIterator<T> {
const asyncIteratorSubscriber = new AsyncIteratorSubscriber(this._prefetch);
this._source.subscribe(asyncIteratorSubscriber);
return asyncIteratorSubscriber;
}

// $FlowFixMe
[Symbol.asyncIterator](): AsyncIterator<T> {
return this.asyncIterator();
}
}

// $FlowFixMe
class AsyncIteratorSubscriber<T> implements ISubscriber<T>, AsyncIterator<T> {
_values: T[];
_prefetch: number;
_limit: number;

_subscription: ISubscription;

_produced: number;

_done: boolean;
_error: Error;

_resolve: ?(
result: Promise<IteratorResult<T, void>> | IteratorResult<T, void>,
) => void;
_reject: ?(reason?: any) => void;

constructor(prefetch: number = 256) {
this._prefetch = prefetch;
this._values = [];
this._limit =
prefetch === Number.MAX_SAFE_INTEGER
? Number.MAX_SAFE_INTEGER
: prefetch - (prefetch >> 2);
this._produced = 0;
}

onSubscribe(subscription: ISubscription): void {
this._subscription = subscription;
subscription.request(this._prefetch);
}

onNext(value: T): void {
const resolve = this._resolve;
if (resolve) {
this._resolve = undefined;
this._reject = undefined;

if (++this._produced === this._limit) {
this._produced = 0;
this._subscription.request(this._limit);
}

resolve({done: false, value});
return;
}

this._values.push(value);
}

onComplete(): void {
this._done = true;

const resolve = this._resolve;
if (resolve) {
this._resolve = undefined;
this._reject = undefined;

resolve({done: true});
}
}

onError(error: Error): void {
this._done = true;
this._error = error;

const reject = this._reject;
if (reject) {
this._resolve = undefined;
this._reject = undefined;

reject(error);
}
}

next(): Promise<IteratorResult<T, void>> {
const value = this._values.shift();
if (value) {
if (++this._produced === this._limit) {
this._produced = 0;
this._subscription.request(this._limit);
}

return Promise.resolve({done: false, value});
} else if (this._done) {
if (this._error) {
return Promise.reject(this._error);
} else {
return Promise.resolve({done: true});
}
} else {
return new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}
}

return(): Promise<IteratorResult<T, void>> {
this._subscription.cancel();
return Promise.resolve({done: true});
}

// $FlowFixMe
[Symbol.asyncIterator](): AsyncIterator<T> {
return this;
}
}