Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async adapter #208

Draft
wants to merge 6 commits into
base: 1.0.x-alpha
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/rsocket-adapter-async/jest.config.ts
@@ -0,0 +1,20 @@
import type { Config } from "@jest/types";
import { pathsToModuleNameMapper } from "ts-jest/utils";
import { compilerOptions } from "../../tsconfig.json";

const config: Config.InitialOptions = {
preset: "ts-jest",
testRegex: "(\\/__tests__\\/.*|\\.(test|spec))\\.(ts)$",
moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, {
// This has to match the baseUrl defined in tsconfig.json.
prefix: "<rootDir>/../../",
}),
modulePathIgnorePatterns: [
"<rootDir>/__tests__/test-utils",
"<rootDir>/__tests__/*.d.ts",
],
collectCoverage: true,
collectCoverageFrom: ["<rootDir>/src/**/*.ts", "!**/node_modules/**"],
};

export default config;
28 changes: 28 additions & 0 deletions packages/rsocket-adapter-async/package.json
@@ -0,0 +1,28 @@
{
"name": "rsocket-adapter-async",
"version": "1.0.0",
"main": "dist/index",
"types": "dist/index",
"files": [
"dist"
],
"scripts": {
"build": "yarn run clean && yarn run compile",
"clean": "rimraf -rf ./dist",
"compile": "tsc -p tsconfig.build.json",
"prepublishOnly": "yarn run build",
"test": "jest"
},
"dependencies": {
"rsocket-composite-metadata": "^1.0.0-alpha.3",
"rsocket-core": "^1.0.0-alpha.3",
"rsocket-messaging": "^1.0.0-alpha.3",
"rsocket-adapter-rxjs": "^1.0.0-alpha.3",
"rxjs": "^7.4.0",
"rxjs-for-await": "^1.0.0"
},
"devDependencies": {
"rimraf": "~3.0.2",
"typescript": "~4.5.2"
}
}
@@ -0,0 +1,95 @@
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
import { mock } from "jest-mock-extended";
import {
OnExtensionSubscriber,
OnNextSubscriber,
OnTerminalSubscriber,
Requestable,
} from "rsocket-core";
import { Codec } from "rsocket-messaging";
import BufferingForwardingSubscriber from "../lib/BufferingForwardingSubscriber";
import { Buffer } from "buffer";

jest.useFakeTimers();

describe("BufferingForwardingSubscriber", function () {
it("forwards all received onNext calls when received before subscription", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, true);

testObj.subscribe(mockSubscriber);

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
true
);
});

it("forwards all received onNext calls when received after subscription", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.subscribe(mockSubscriber);

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, true);

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
true
);
});

it("forwards all received onNext calls before forwarding subsequent onComplete", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.subscribe(mockSubscriber);

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, false);
testObj.onComplete();

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
false
);
expect(mockSubscriber.onComplete).toBeCalledWith();
});
});
@@ -0,0 +1,233 @@
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
import { mock } from "jest-mock-extended";
import { Cancellable, Requestable } from "rsocket-core";
import { Codec } from "rsocket-messaging";

jest.useFakeTimers();

class StringCodec implements Codec<string> {
readonly mimeType: string = "text/plain";

decode(buffer: Buffer): string {
return buffer.toString();
}

encode(entity: string): Buffer {
return Buffer.from(entity);
}
}

describe("SubscribingAsyncIterator", function () {
it("iterates over emitted values", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 3;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1", "2"]);
expect(requestSpy).toBeCalledTimes(1);
});

it("iterates over emitted values until onComplete", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
if (i === requestN - 1) {
subscriber.onComplete();
} else {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
false
);
}
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 3;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
});

it("cancels when break statement reached", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");
const cancelSpy = jest.spyOn(subscription, "cancel");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
if (values.length == 2) {
break;
}
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
expect(requestSpy).toBeCalledWith(10);
expect(cancelSpy).toBeCalledTimes(1);
});

it("ends and throws with emitted exception", async function () {
let subscriber;
const expectedError = new Error("test error");
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
setTimeout(() => {
subscriber.onError(expectedError);
});
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];

let capturedError;
try {
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}
} catch (error) {
capturedError = error;
}

expect(capturedError).toBe(expectedError);
expect(values).toStrictEqual([]);
expect(requestSpy).toBeCalledWith(10);
});

it("cancels on exception processing emitted value", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");
const cancelSpy = jest.spyOn(subscription, "cancel");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
try {
for await (const value of subscriber) {
if (values.length == 2) {
throw new Error("test error");
}
values.push(value);
jest.runAllTimers();
}
} catch (e) {}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
expect(requestSpy).toBeCalledWith(10);
expect(cancelSpy).toBeCalledTimes(1);
});
});