Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set connection status to ERROR when closed due to protocol error (#126) #127

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/rsocket-core/src/RSocketMachine.js
Expand Up @@ -598,8 +598,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
};

_handleConnectionError(error: Error): void {
this._handleError(error);
this._connection.close();
this._connection.close(error);
const errorHandler = this._errorHandler;
if (errorHandler) {
errorHandler(error);
Expand Down
21 changes: 13 additions & 8 deletions packages/rsocket-core/src/RSocketResumableTransport.js
Expand Up @@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
ada-waffles marked this conversation as resolved.
Show resolved Hide resolved
this._close(error);
}

connect(): void {
Expand Down Expand Up @@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection {
if (this._isTerminated()) {
return;
}
if (error) {
this._setConnectionStatus({error, kind: 'ERROR'});
} else {
this._setConnectionStatus(CONNECTION_STATUS.CLOSED);
}

const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
this._setConnectionStatus(status);
ada-waffles marked this conversation as resolved.
Show resolved Hide resolved

const receivers = this._receivers;
receivers.forEach(r => r.onComplete());
receivers.forEach(subscriber => {
if (error) {
subscriber.onError(error);
ada-waffles marked this conversation as resolved.
Show resolved Hide resolved
} else {
subscriber.onComplete();
}
});
receivers.clear();

const senders = this._senders;
Expand Down
4 changes: 2 additions & 2 deletions packages/rsocket-core/src/ReassemblyDuplexConnection.js
Expand Up @@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection {
.lift(actual => new ReassemblySubscriber(actual));
}

close(): void {
this._source.close();
close(error?: Error): void {
this._source.close(error);
}

connect(): void {
Expand Down
8 changes: 6 additions & 2 deletions packages/rsocket-core/src/__mocks__/MockDuplexConnection.js
Expand Up @@ -28,8 +28,12 @@ export function genMockConnection() {
let closed = false;

const connection = {
close: jest.fn(() => {
connection.mock.close();
close: jest.fn(error => {
if (error) {
connection.mock.closeWithError(error);
} else {
connection.mock.close();
}
}),
connect: jest.fn(),
connectionStatus: jest.fn(() => status),
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-core/src/__tests__/RSocketClient-test.js
Expand Up @@ -310,7 +310,7 @@ describe('RSocketClient', () => {
expect(errors.values().next().value).toEqual(
`No keep-alive acks for ${keepAliveTimeout} millis`,
);
expect(status.kind).toEqual('CLOSED');
expect(status.kind).toEqual('ERROR');

jest.advanceTimersByTime(keepAliveTimeout);
});
Expand Down
Expand Up @@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => {
expect(currentTransport.sendOne.mock.calls.length).toBe(0);
});
});

describe('post-connect() APIs', () => {
beforeEach(() => {
resumableTransport.connect();
currentTransport.mock.connect();
});

describe('close()', () => {
describe('given an error', () => {
it('closes the transport', () => {
resumableTransport.close(new Error());
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to ERROR with the given error', () => {
const error = new Error();
resumableTransport.close(error);
expect(resumableStatus.kind).toBe('ERROR');
expect(resumableStatus.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onError, onSubscribe});
const error = new Error();
resumableTransport.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
});

describe('not given an error', () => {
it('closes the transport', () => {
resumableTransport.close();
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
resumableTransport.close();
expect(resumableStatus.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onComplete, onSubscribe});
resumableTransport.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});
});
});
4 changes: 2 additions & 2 deletions packages/rsocket-tcp-client/src/RSocketTcpClient.js
Expand Up @@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection {
}
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
71 changes: 52 additions & 19 deletions packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js
Expand Up @@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
7 changes: 4 additions & 3 deletions packages/rsocket-types/src/ReactiveSocketTypes.js
Expand Up @@ -118,10 +118,11 @@ export interface DuplexConnection {
receive(): Flowable<Frame>,

/**
* Close the underlying connection, emitting `onComplete` on the receive()
* Publisher.
* Close the underlying connection, optionally providing an error as reason.
* If an error is passed, emits `onError` on the receive() Publisher.
* If no error is passed, emits `onComplete` on the receive() Publisher.
*/
close(): void,
close(error?: Error): void,

/**
* Open the underlying connection. Throws if the connection is already in
Expand Down
Expand Up @@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
Expand Up @@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
Expand Up @@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection {
});
}

close(): void {
this._socket.emit('close');
close(error?: Error): void {
if (error) {
this._socket.emit('error', error);
} else {
this._socket.emit('close');
}

this._socket.close();
}

Expand Down
Expand Up @@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => {
expect(status.error).toBe(error);
});

it('returns CLOSED if explicitly closed', () => {
it('returns CLOSED if explicitly closed with no error', () => {
connection.receive().subscribe(() => {});
connection.close();
expect(status.kind).toBe('CLOSED');
});

it('returns ERROR if explicitly closed with an error', () => {
connection.receive().subscribe(() => {});
const error = new Error();
connection.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});
});
});