Skip to content

Commit

Permalink
Set connection status to ERROR when closed due to protocol error
Browse files Browse the repository at this point in the history
Amends DuplexConnection#close to accept an optional error indicating that the connection is being closed due to that error.

Updates implementations to handle this error and report it to consumers.

Updates RSocketMachine to pass protocol-level connection errors to close

Adds/updates tests to check for handling this parameter
  • Loading branch information
ada-waffles committed Apr 26, 2021
1 parent a85a4db commit 6254847
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 63 deletions.
3 changes: 1 addition & 2 deletions packages/rsocket-core/src/RSocketMachine.js
Expand Up @@ -598,8 +598,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
};

_handleConnectionError(error: Error): void {
this._handleError(error);
this._connection.close();
this._connection.close(error);
const errorHandler = this._errorHandler;
if (errorHandler) {
errorHandler(error);
Expand Down
21 changes: 13 additions & 8 deletions packages/rsocket-core/src/RSocketResumableTransport.js
Expand Up @@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down Expand Up @@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection {
if (this._isTerminated()) {
return;
}
if (error) {
this._setConnectionStatus({error, kind: 'ERROR'});
} else {
this._setConnectionStatus(CONNECTION_STATUS.CLOSED);
}

const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
this._setConnectionStatus(status);

const receivers = this._receivers;
receivers.forEach(r => r.onComplete());
receivers.forEach(subscriber => {
if (error) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
});
receivers.clear();

const senders = this._senders;
Expand Down
4 changes: 2 additions & 2 deletions packages/rsocket-core/src/ReassemblyDuplexConnection.js
Expand Up @@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection {
.lift(actual => new ReassemblySubscriber(actual));
}

close(): void {
this._source.close();
close(error?: Error): void {
this._source.close(error);
}

connect(): void {
Expand Down
8 changes: 6 additions & 2 deletions packages/rsocket-core/src/__mocks__/MockDuplexConnection.js
Expand Up @@ -28,8 +28,12 @@ export function genMockConnection() {
let closed = false;

const connection = {
close: jest.fn(() => {
connection.mock.close();
close: jest.fn(error => {
if (error) {
connection.mock.closeWithError(error);
} else {
connection.mock.close();
}
}),
connect: jest.fn(),
connectionStatus: jest.fn(() => status),
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-core/src/__tests__/RSocketClient-test.js
Expand Up @@ -310,7 +310,7 @@ describe('RSocketClient', () => {
expect(errors.values().next().value).toEqual(
`No keep-alive acks for ${keepAliveTimeout} millis`,
);
expect(status.kind).toEqual('CLOSED');
expect(status.kind).toEqual('ERROR');

jest.advanceTimersByTime(keepAliveTimeout);
});
Expand Down
Expand Up @@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => {
expect(currentTransport.sendOne.mock.calls.length).toBe(0);
});
});

describe('post-connect() APIs', () => {
beforeEach(() => {
resumableTransport.connect();
currentTransport.mock.connect();
});

describe('close()', () => {
describe('given an error', () => {
it('closes the transport', () => {
resumableTransport.close(new Error());
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to ERROR with the given error', () => {
const error = new Error();
resumableTransport.close(error);
expect(resumableStatus.kind).toBe('ERROR');
expect(resumableStatus.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onError, onSubscribe});
const error = new Error();
resumableTransport.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
});

describe('not given an error', () => {
it('closes the transport', () => {
resumableTransport.close();
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
resumableTransport.close();
expect(resumableStatus.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onComplete, onSubscribe});
resumableTransport.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});
});
});
4 changes: 2 additions & 2 deletions packages/rsocket-tcp-client/src/RSocketTcpClient.js
Expand Up @@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection {
}
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
71 changes: 52 additions & 19 deletions packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js
Expand Up @@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
7 changes: 4 additions & 3 deletions packages/rsocket-types/src/ReactiveSocketTypes.js
Expand Up @@ -118,10 +118,11 @@ export interface DuplexConnection {
receive(): Flowable<Frame>,

/**
* Close the underlying connection, emitting `onComplete` on the receive()
* Publisher.
* Close the underlying connection, optionally providing an error as reason.
* If an error is passed, emits `onError` on the receive() Publisher.
* If no error is passed, emits `onComplete` on the receive() Publisher.
*/
close(): void,
close(error?: Error): void,

/**
* Open the underlying connection. Throws if the connection is already in
Expand Down
Expand Up @@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
Expand Up @@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
Expand Up @@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection {
});
}

close(): void {
this._socket.emit('close');
close(error?: Error): void {
if (error) {
this._socket.emit('error', error);
} else {
this._socket.emit('close');
}

this._socket.close();
}

Expand Down
Expand Up @@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => {
expect(status.error).toBe(error);
});

it('returns CLOSED if explicitly closed', () => {
it('returns CLOSED if explicitly closed with no error', () => {
connection.receive().subscribe(() => {});
connection.close();
expect(status.kind).toBe('CLOSED');
});

it('returns ERROR if explicitly closed with an error', () => {
connection.receive().subscribe(() => {});
const error = new Error();
connection.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});
});
});

0 comments on commit 6254847

Please sign in to comment.