Skip to content

Commit

Permalink
feat(r): A new method of piping has been added
Browse files Browse the repository at this point in the history
+ Adds `r` a new method of piping observables. This allows any valid observable input to be passed as the first argument, and
 it will implicitly convert the argument to an Observable and pass it to the function that may be present in the second argument, the
 return value of THAT function is then passed an argument to the next function, and so on
+ Corrects the types of `Observable#pipe` such that anything can be returned at any step of the functional chain, however the first pipeable function must accept an `Observable<T>`.
+ Adds dtslint and runtime tests for `r` and `pipe`.

NOTE: This does NOT deprecate`Observable#pipe`. That will be done in a follow up, and we need to define what timeline we'll take to remove that, as it's an API that is broadly used - could be v9, could be v10, could be never. At this point, this is alpha/beta functionality.

BREAKING CHANGE: `Observable#pipe` now allows any pipeable unary function as an argument, just so long as the types properly compose, this means in some cases it will now return `unknown` instead of `Observable<unknown>` the workaround is just to cast the result for those cases. (or you can break your operator piping into smaller pipe sets)

Related ReactiveX#7203
  • Loading branch information
benlesh committed Mar 23, 2023
1 parent 667e185 commit 4da2e83
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 69 deletions.
11 changes: 8 additions & 3 deletions spec-dtslint/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Observable, of, OperatorFunction } from 'rxjs';
import { mapTo } from 'rxjs/operators';
import { Observable, of, OperatorFunction, map, filter } from 'rxjs';

function a<I extends string, O extends string>(input: I, output: O): OperatorFunction<I, O>;
function a<I, O extends string>(output: O): OperatorFunction<I, O>;
Expand Down Expand Up @@ -30,7 +29,7 @@ function a<I, O extends string>(output: O): OperatorFunction<I, O>;
* @param {string} output The `OperatorFunction` output type parameter
*/
function a<I, O extends string>(inputOrOutput: I | O, output?: O): OperatorFunction<I, O> {
return mapTo<I, O>(output === undefined ? inputOrOutput as O : output);
return map(() => output === undefined ? inputOrOutput as O : output);
}

describe('pipe', () => {
Expand Down Expand Up @@ -126,4 +125,10 @@ describe('pipe', () => {
const customOperator = () => <T>(a: Observable<T>) => a;
const o = of('foo').pipe(customOperator()); // $ExpectType Observable<string>
});

it('should infer properly for any reasonable pipe chain', () => {
const o1 = of('foo').pipe(source => source.toString(), s => s.length, n => n + 1); // $ExpectType number
const o2 = of(123).pipe(map(n => n + '?'), source => source.subscribe()); // $ExpectType Subscription
const o3 = of('test').pipe(map(n => n + ':' + n), filter(n => n < 30)); // $ExpectError
})
});
41 changes: 41 additions & 0 deletions spec-dtslint/util/r-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { r, map, of, toArray, filter } from 'rxjs';


it('should infer conversions from ObservableInputs', () => {
const o1 = r([1, 2, 3]); // $ExpectType Observable<number>
const o2 = r(new Set<number>()); // $ExpectType Observable<number>
const o3 = r(new Map<string, number>()); // $ExpectType Observable<[string, number]>
const o4 = r(of(1, 2, 3)); // $ExpectType Observable<number>
const o5 = r(Promise.resolve(1)); // $ExpectType Observable<number>
const o6 = r(Promise.resolve([1, 2, 3])); // $ExpectType Observable<number[]>

function* test() {
yield 1;
yield 2;
yield 3;
}

const o7 = r(test()); // $ExpectType Observable<1 | 2 | 3>

async function* test2() {
yield 1;
yield 2;
yield 3;
}

const o8 = r(test2()); // $ExpectType Observable<1 | 2 | 3>
const o9 = r({}); // $ExpectError
});

it('should compose with pipeable functions, passing an Observable to the first of those functions', () => {
const o1 = r([1, 2, 3], map(n => n + 1)); // $ExpectType Observable<number>
const o2 = r([1, 2, 3], map(n => n + 1), filter(n => n < 3)); // $ExpectType Observable<number>
const o3 = r([1, 2, 3], map(n => n + 1), filter(n => n < 3), toArray()); // $ExpectType Observable<number[]>
const o4 = r([1, 2, 3], map(n => n + 1), filter(n => n < 3), toArray(), map(n => n.length)); // $ExpectType Observable<number>

// Even with unary functions that are not RxJS operators
const o5 = r([1, 2, 3], map(n => n + 1), toArray(), source => Object.keys(source), keys => keys.length); // $ExpectType number

// Maybe as a means of subscription
const o6 = r([1, 2, 3], map(n => n + 1), toArray(), source => source.subscribe()); // $ExpectType Subscription
})
36 changes: 35 additions & 1 deletion spec/util/pipe-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { pipe } from 'rxjs';
import { map, Observable, pipe, r } from 'rxjs';

describe('pipe', () => {
it('should exist', () => {
Expand Down Expand Up @@ -31,3 +31,37 @@ describe('pipe', () => {
expect(c(someObj)).to.equal(someObj);
});
});

describe('r', () => {
it('should work like pipe, convert the first argument to an observable', () => {
const a = [1, 2, 3];
const results: any[] = [];

r(a, map(x => x + 1)).subscribe({
next: value => results.push(value),
complete: () => {
results.push('done');
}
})
expect(results).to.deep.equal([2, 3, 4, 'done'])
});

it('should simply convert the first argument to an observable if it is the only thing provided', () => {
const a = [1, 2, 3];
const results: any[] = [];

r(a).subscribe({
next: value => results.push(value),
complete: () => {
results.push('done');
}
})
expect(results).to.deep.equal([1, 2, 3, 'done'])
});

it('should allow any kind of custom piping', () => {
const a = [1, 2, 3];
const result = r(a, map(x => x + 1), source => source instanceof Observable)
expect(result).to.be.true;
});
});
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export { Subscription } from './internal/Subscription';
export { Subscriber } from './internal/Subscriber';

/* Utils */
export { r } from './internal/util/r';
export { pipe } from './internal/util/pipe';
export { noop } from './internal/util/noop';
export { identity } from './internal/util/identity';
Expand Down
128 changes: 67 additions & 61 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types';
import { TeardownLogic, UnaryFunction, Subscribable, Observer, OperatorFunction } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
* of RxJS.
Expand Down Expand Up @@ -256,72 +255,79 @@ export class Observable<T> implements Subscribable<T> {

/* tslint:disable:max-line-length */
pipe(): Observable<T>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>;
pipe<A, B, C, D>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>
): Observable<D>;
pipe<A>(op1: UnaryFunction<Observable<T>, A>): A;
pipe<A, B>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>): B;
pipe<A, B, C>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>): C;
pipe<A, B, C, D>(op1: UnaryFunction<Observable<T>, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>): D;
pipe<A, B, C, D, E>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>
): Observable<E>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>
): E;
pipe<A, B, C, D, E, F>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>
): Observable<F>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>
): F;
pipe<A, B, C, D, E, F, G>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>
): Observable<G>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>
): G;
pipe<A, B, C, D, E, F, G, H>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>
): Observable<H>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>
): H;
pipe<A, B, C, D, E, F, G, H, I>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>,
op9: OperatorFunction<H, I>
): Observable<I>;
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>
): I;
pipe<A, B, C, D, E, F, G, H, I>(
op1: OperatorFunction<T, A>,
op2: OperatorFunction<A, B>,
op3: OperatorFunction<B, C>,
op4: OperatorFunction<C, D>,
op5: OperatorFunction<D, E>,
op6: OperatorFunction<E, F>,
op7: OperatorFunction<F, G>,
op8: OperatorFunction<G, H>,
op9: OperatorFunction<H, I>,
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, Observable<I>>,
...operations: OperatorFunction<any, any>[]
): Observable<unknown>;
pipe<A, B, C, D, E, F, G, H, I>(
op1: UnaryFunction<Observable<T>, A>,
op2: UnaryFunction<A, B>,
op3: UnaryFunction<B, C>,
op4: UnaryFunction<C, D>,
op5: UnaryFunction<D, E>,
op6: UnaryFunction<E, F>,
op7: UnaryFunction<F, G>,
op8: UnaryFunction<G, H>,
op9: UnaryFunction<H, I>,
...operations: UnaryFunction<any, any>[]
): unknown;
/* tslint:enable:max-line-length */

/**
Expand All @@ -344,7 +350,7 @@ export class Observable<T> implements Subscribable<T> {
* .subscribe(x => console.log(x));
* ```
*/
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
pipe(...operations: UnaryFunction<any, any>[]): unknown {
return pipeFromArray(operations)(this);
}
}
8 changes: 4 additions & 4 deletions src/internal/util/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ export function pipe<T, A, B, C, D, E, F, G, H, I>(
* pipe() can be called on one or more functions, each of which can take one argument ("UnaryFunction")
* and uses it to return a value.
* It returns a function that takes one argument, passes it to the first UnaryFunction, and then
* passes the result to the next one, passes that result to the next one, and so on.
* passes the result to the next one, passes that result to the next one, and so on.
*/
export function pipe(...fns: Array<UnaryFunction<any, any>>): UnaryFunction<any, any> {
return pipeFromArray(fns);
}

/** @internal */
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
export function pipeFromArray(fns: UnaryFunction<unknown, unknown>[]): UnaryFunction<unknown, unknown> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
Expand All @@ -89,7 +89,7 @@ export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunct
return fns[0];
}

return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
return function piped(input) {
return fns.reduce((prev, fn) => fn(prev), input);
};
}

0 comments on commit 4da2e83

Please sign in to comment.