Skip to content

Commit

Permalink
fix(close): ensure in-flight messages are drained (#952) (#980)
Browse files Browse the repository at this point in the history
* fix(close): ensure in-flight messages are drained (#952)

Track in-flight requests and add an `onDrain`deferred that resolves when all in-flight requests are done.

Co-authored-by: Megan Potter <57276408+feywind@users.noreply.github.com>
Co-authored-by: Benjamin E. Coe <bencoe@google.com>

* chore: update equal to strictEqual for linter changes

Co-authored-by: Jeff Hansen <jeffijoe@hotmail.com>
Co-authored-by: Benjamin E. Coe <bencoe@google.com>
  • Loading branch information
3 people committed Apr 24, 2020
1 parent ea80276 commit 4731535
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 3 deletions.
19 changes: 19 additions & 0 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,17 @@ export class BatchError extends Error implements ServiceError {
*/
export abstract class MessageQueue {
numPendingRequests: number;
numInFlightRequests: number;
protected _onFlush?: defer.DeferredPromise<void>;
protected _onDrain?: defer.DeferredPromise<void>;
protected _options!: BatchOptions;
protected _requests: QueuedMessages;
protected _subscriber: Subscriber;
protected _timer?: NodeJS.Timer;
protected abstract _sendBatch(batch: QueuedMessages): Promise<void>;
constructor(sub: Subscriber, options = {} as BatchOptions) {
this.numPendingRequests = 0;
this.numInFlightRequests = 0;
this._requests = [];
this._subscriber = sub;

Expand All @@ -110,6 +113,7 @@ export abstract class MessageQueue {

this._requests.push([ackId, deadline]);
this.numPendingRequests += 1;
this.numInFlightRequests += 1;

if (this._requests.length >= maxMessages!) {
this.flush();
Expand Down Expand Up @@ -141,9 +145,15 @@ export abstract class MessageQueue {
this._subscriber.emit('error', e);
}

this.numInFlightRequests -= batchSize;
if (deferred) {
deferred.resolve();
}

if (this.numInFlightRequests <= 0 && this._onDrain) {
this._onDrain.resolve();
delete this._onDrain;
}
}
/**
* Returns a promise that resolves after the next flush occurs.
Expand All @@ -157,6 +167,15 @@ export abstract class MessageQueue {
}
return this._onFlush.promise;
}
/**
* Returns a promise that resolves when all in-flight messages have settled.
*/
onDrain(): Promise<void> {
if (!this._onDrain) {
this._onDrain = defer();
}
return this._onDrain.promise;
}
/**
* Set the batching options.
*
Expand Down
8 changes: 8 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,14 @@ export class Subscriber extends EventEmitter {
this._modAcks.flush();
}

if (this._acks.numInFlightRequests) {
promises.push(this._acks.onDrain());
}

if (this._modAcks.numInFlightRequests) {
promises.push(this._modAcks.onDrain());
}

await Promise.all(promises);
}
}
2 changes: 1 addition & 1 deletion system-test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ describe('pubsub', () => {
subscription.on('error', done);
subscription.on('message', message => {
// If we get the default message from before() then this fails.
assert.equal(message.data.toString(), testText);
assert.strictEqual(message.data.toString(), testText);
message.ack();
subscription.close(done);
});
Expand Down
49 changes: 47 additions & 2 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {Metadata, ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';
import defer = require('p-defer');

import * as messageTypes from '../src/message-queues';
import {BatchError} from '../src/message-queues';
Expand Down Expand Up @@ -75,14 +76,14 @@ describe('MessageQueues', () => {
// tslint:disable-next-line variable-name
let ModAckQueue: typeof messageTypes.ModAckQueue;

type QueuedMessages = Array<[string, number?]>;

before(() => {
const queues = proxyquire('../src/message-queues.js', {});

AckQueue = queues.AckQueue;
ModAckQueue = queues.ModAckQueue;

type QueuedMessages = Array<[string, number?]>;

MessageQueue = class MessageQueue extends queues.MessageQueue {
batches = [] as QueuedMessages[];
protected async _sendBatch(batch: QueuedMessages): Promise<void> {
Expand Down Expand Up @@ -209,6 +210,35 @@ describe('MessageQueues', () => {
setImmediate(() => messageQueue.flush());
return promise;
});

it('should resolve onDrain only after all in-flight messages have been flushed', async () => {
const log: string[] = [];
const sendDone = defer();
sandbox.stub(messageQueue, '_sendBatch').callsFake(async () => {
log.push('send:start');
await sendDone.promise;
log.push('send:end');
});

const message = new FakeMessage();
const deadline = 10;
const onDrainBeforeFlush = messageQueue
.onDrain()
.then(() => log.push('drain1'));
messageQueue.add(message as Message, deadline);
messageQueue.flush();
assert.deepStrictEqual(log, ['send:start']);
sendDone.resolve();
await messageQueue.onDrain().then(() => log.push('drain2'));
await onDrainBeforeFlush;

assert.deepStrictEqual(log, [
'send:start',
'send:end',
'drain1',
'drain2',
]);
});
});

describe('onFlush', () => {
Expand All @@ -226,6 +256,21 @@ describe('MessageQueues', () => {
});
});

describe('onDrain', () => {
it('should create a promise', () => {
const promise = messageQueue.onDrain();

assert(promise instanceof Promise);
});

it('should re-use existing promises', () => {
const promise1 = messageQueue.onDrain();
const promise2 = messageQueue.onDrain();

assert.strictEqual(promise1, promise2);
});
});

describe('setOptions', () => {
it('should default maxMessages to 3000', () => {
const stub = sandbox.stub(messageQueue, 'flush');
Expand Down
17 changes: 17 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ class FakeLeaseManager extends EventEmitter {
class FakeQueue {
options: BatchOptions;
numPendingRequests = 0;
numInFlightRequests = 0;
maxMilliseconds = 100;
constructor(sub: s.Subscriber, options: BatchOptions) {
this.options = options;
}
add(message: s.Message, deadline?: number): void {}
async flush(): Promise<void> {}
async onFlush(): Promise<void> {}
async onDrain(): Promise<void> {}
}

class FakeAckQueue extends FakeQueue {
Expand Down Expand Up @@ -386,6 +388,7 @@ describe('Subscriber', () => {

sandbox.stub(ackQueue, 'flush').rejects();
sandbox.stub(ackQueue, 'onFlush').rejects();
sandbox.stub(ackQueue, 'onDrain').rejects();

const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue');

Expand All @@ -394,6 +397,20 @@ describe('Subscriber', () => {

return subscriber.close();
});

it('should wait for in-flight messages to drain', async () => {
const ackQueue: FakeAckQueue = stubs.get('ackQueue');
const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue');
const ackOnDrain = sandbox.stub(ackQueue, 'onDrain').resolves();
const modAckOnDrain = sandbox.stub(modAckQueue, 'onDrain').resolves();

ackQueue.numInFlightRequests = 1;
modAckQueue.numInFlightRequests = 1;
await subscriber.close();

assert.strictEqual(ackOnDrain.callCount, 1);
assert.strictEqual(modAckOnDrain.callCount, 1);
});
});
});

Expand Down

0 comments on commit 4731535

Please sign in to comment.