Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(close): ensure in-flight messages are drained #952

Merged
merged 4 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ export class BatchError extends Error implements ServiceError {
*/
export abstract class MessageQueue {
numPendingRequests: number;
numInFlightRequests: number;
protected _onFlush?: defer.DeferredPromise<void>;
protected _onDrain?: defer.DeferredPromise<void>;
protected _options!: BatchOptions;
protected _requests: QueuedMessages;
protected _subscriber: Subscriber;
protected _timer?: NodeJS.Timer;
protected abstract _sendBatch(batch: QueuedMessages): Promise<void>;
constructor(sub: Subscriber, options = {} as BatchOptions) {
this.numPendingRequests = 0;
this.numInFlightRequests = 0;
this._requests = [];
this._subscriber = sub;

Expand All @@ -111,6 +114,7 @@ export abstract class MessageQueue {

this._requests.push([ackId, deadline]);
this.numPendingRequests += 1;
this.numInFlightRequests += 1;

if (this._requests.length >= maxMessages!) {
this.flush();
Expand Down Expand Up @@ -142,9 +146,15 @@ export abstract class MessageQueue {
this._subscriber.emit('error', e);
}

this.numInFlightRequests -= batchSize;
if (deferred) {
deferred.resolve();
}

if (this.numInFlightRequests <= 0 && this._onDrain) {
this._onDrain.resolve();
delete this._onDrain;
}
}
/**
* Returns a promise that resolves after the next flush occurs.
Expand All @@ -158,6 +168,15 @@ export abstract class MessageQueue {
}
return this._onFlush.promise;
}
/**
* Returns a promise that resolves when all in-flight messages have settled.
*/
onDrain(): Promise<void> {
if (!this._onDrain) {
this._onDrain = defer();
}
return this._onDrain.promise;
}
/**
* Set the batching options.
*
Expand Down
8 changes: 8 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,14 @@ export class Subscriber extends EventEmitter {
this._modAcks.flush();
}

if (this._acks.numInFlightRequests) {
promises.push(this._acks.onDrain());
}

if (this._modAcks.numInFlightRequests) {
promises.push(this._modAcks.onDrain());
}

await Promise.all(promises);
}
}
49 changes: 47 additions & 2 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {Metadata, ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';
import defer = require('p-defer');

import * as messageTypes from '../src/message-queues';
import {BatchError} from '../src/message-queues';
Expand Down Expand Up @@ -76,14 +77,14 @@ describe('MessageQueues', () => {
// tslint:disable-next-line variable-name
let ModAckQueue: typeof messageTypes.ModAckQueue;

type QueuedMessages = Array<[string, number?]>;

before(() => {
const queues = proxyquire('../src/message-queues.js', {});

AckQueue = queues.AckQueue;
ModAckQueue = queues.ModAckQueue;

type QueuedMessages = Array<[string, number?]>;

MessageQueue = class MessageQueue extends queues.MessageQueue {
batches = [] as QueuedMessages[];
protected async _sendBatch(batch: QueuedMessages): Promise<void> {
Expand Down Expand Up @@ -210,6 +211,35 @@ describe('MessageQueues', () => {
setImmediate(() => messageQueue.flush());
return promise;
});

it('should resolve onDrain only after all in-flight messages have been flushed', async () => {
const log: string[] = [];
const sendDone = defer();
sandbox.stub(messageQueue, '_sendBatch').callsFake(async () => {
log.push('send:start');
await sendDone.promise;
log.push('send:end');
});

const message = new FakeMessage();
const deadline = 10;
const onDrainBeforeFlush = messageQueue
.onDrain()
.then(() => log.push('drain1'));
messageQueue.add(message as Message, deadline);
messageQueue.flush();
assert.deepStrictEqual(log, ['send:start']);
sendDone.resolve();
await messageQueue.onDrain().then(() => log.push('drain2'));
await onDrainBeforeFlush;

assert.deepStrictEqual(log, [
'send:start',
'send:end',
'drain1',
'drain2',
]);
});
});

describe('onFlush', () => {
Expand All @@ -227,6 +257,21 @@ describe('MessageQueues', () => {
});
});

describe('onDrain', () => {
it('should create a promise', () => {
const promise = messageQueue.onDrain();

assert(promise instanceof Promise);
});

it('should re-use existing promises', () => {
const promise1 = messageQueue.onDrain();
const promise2 = messageQueue.onDrain();

assert.strictEqual(promise1, promise2);
});
});

describe('setOptions', () => {
it('should default maxMessages to 3000', () => {
const stub = sandbox.stub(messageQueue, 'flush');
Expand Down
17 changes: 17 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ class FakeLeaseManager extends EventEmitter {
class FakeQueue {
options: BatchOptions;
numPendingRequests = 0;
numInFlightRequests = 0;
maxMilliseconds = 100;
constructor(sub: s.Subscriber, options: BatchOptions) {
this.options = options;
}
add(message: s.Message, deadline?: number): void {}
async flush(): Promise<void> {}
async onFlush(): Promise<void> {}
async onDrain(): Promise<void> {}
}

class FakeAckQueue extends FakeQueue {
Expand Down Expand Up @@ -380,6 +382,7 @@ describe('Subscriber', () => {

sandbox.stub(ackQueue, 'flush').rejects();
sandbox.stub(ackQueue, 'onFlush').rejects();
sandbox.stub(ackQueue, 'onDrain').rejects();

const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue');

Expand All @@ -388,6 +391,20 @@ describe('Subscriber', () => {

return subscriber.close();
});

it('should wait for in-flight messages to drain', async () => {
const ackQueue: FakeAckQueue = stubs.get('ackQueue');
const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue');
const ackOnDrain = sandbox.stub(ackQueue, 'onDrain').resolves();
const modAckOnDrain = sandbox.stub(modAckQueue, 'onDrain').resolves();

ackQueue.numInFlightRequests = 1;
modAckQueue.numInFlightRequests = 1;
await subscriber.close();

assert.strictEqual(ackOnDrain.callCount, 1);
assert.strictEqual(modAckOnDrain.callCount, 1);
});
});
});

Expand Down