Skip to content

Commit

Permalink
feat: add a close() method to PubSub, and a flush() method to Topic/P…
Browse files Browse the repository at this point in the history
…ublisher (#916)

* docs: fix a typo in a comment

* feat: allow manually closing the server connections in the PubSub object

* feat: add flush() method for Topic objects

* tests: add tests for new flush() and close() methods

* build: update github checkout action to v2 to fix spurious retry errors

* fix: set isOpen to false before trying to close it so that all usage will stop
  • Loading branch information
feywind committed Mar 26, 2020
1 parent a57458d commit 4097995
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 13 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
matrix:
node: [8, 10, 12, 13]
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
Expand All @@ -30,7 +30,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand All @@ -39,7 +39,7 @@ jobs:
docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand All @@ -48,7 +48,7 @@ jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 12
Expand Down
36 changes: 32 additions & 4 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

import {promisifyAll} from '@google-cloud/promisify';
import {promisify, promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';

import {MessageBatch, BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue} from './message-queues';
import {BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue, PublishDone} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback} from '../pubsub';
import {RequestCallback, EmptyCallback, EmptyResponse} from '../pubsub';
import {google} from '../../proto/pubsub';
import {defaultOptions} from '../default-options';

Expand Down Expand Up @@ -72,6 +72,34 @@ export class Publisher {
this.queue = new Queue(this);
this.orderedQueues = new Map();
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @private
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback ? callback : () => {};

const publishes = [promisify(this.queue.publish)()];
Array.from(this.orderedQueues.values()).forEach(q =>
publishes.push(promisify(q.publish)())
);
const allPublishes = Promise.all(publishes);

allPublishes
.then(() => {
definedCallback(null);
})
.catch(definedCallback);
}
publish(data: Buffer, attributes?: Attributes): Promise<string>;
publish(data: Buffer, callback: PublishCallback): void;
publish(
Expand Down
9 changes: 6 additions & 3 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class Queue extends MessageQueue {
/**
* Cancels any pending publishes and calls _publish immediately.
*/
publish(): void {
publish(callback?: PublishDone): void {
const {messages, callbacks} = this.batch;

this.batch = new MessageBatch(this.batchOptions);
Expand All @@ -141,7 +141,7 @@ export class Queue extends MessageQueue {
delete this.pending;
}

this._publish(messages, callbacks);
this._publish(messages, callbacks, callback);
}
}

Expand Down Expand Up @@ -259,7 +259,8 @@ export class OrderedQueue extends MessageQueue {
*
* @fires OrderedQueue#drain
*/
publish(): void {
publish(callback?: PublishDone): void {
const definedCallback = callback || (() => {});
this.inFlight = true;

if (this.pending) {
Expand All @@ -274,10 +275,12 @@ export class OrderedQueue extends MessageQueue {

if (err) {
this.handlePublishFailure(err);
definedCallback(err);
} else if (this.batches.length) {
this.beginNextPublish();
} else {
this.emit('drain');
definedCallback(null);
}
});
}
Expand Down
60 changes: 59 additions & 1 deletion src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class PubSub {
getTopicsStream = paginator.streamify('getTopics') as () => ObjectStream<
Topic
>;
isOpen = true;

constructor(options?: ClientConfig) {
options = options || {};
Expand Down Expand Up @@ -281,6 +282,33 @@ export class PubSub {
}
}

close(): Promise<void>;
close(callback: EmptyCallback): void;
/**
* Closes out this object, releasing any server connections. Note that once
* you close a PubSub object, it may not be used again. Any pending operations
* (e.g. queued publish messages) will fail. If you have topic or subscription
* objects that may have pending operations, you should call close() on those
* first if you want any pending messages to be delivered correctly. The
* PubSub class doesn't track those.
*
* @callback EmptyCallback
* @returns {Promise<void>}
*/
close(callback?: EmptyCallback): Promise<void> | void {
const definedCallback = callback || (() => {});
if (this.isOpen) {
this.isOpen = false;
this.closeAllClients_()
.then(() => {
definedCallback(null);
})
.catch(definedCallback);
} else {
definedCallback(null);
}
}

createSubscription(
topic: Topic | string,
name: string,
Expand Down Expand Up @@ -941,6 +969,23 @@ export class PubSub {

return gaxClient;
}
/**
* Close all open client objects.
*
* @private
*
* @returns {Promise}
*/
async closeAllClients_(): Promise<void> {
const promises = [];
for (const clientConfig of Object.keys(this.api)) {
const gaxClient = this.api[clientConfig];
promises.push(gaxClient.close());
delete this.api[clientConfig];
}

await Promise.all(promises);
}
/**
* Funnel all API requests through this method, to be sure we have a project
* ID.
Expand All @@ -954,6 +999,19 @@ export class PubSub {
* @param {function} [callback] The callback function.
*/
request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>) {
// This prevents further requests, in case any publishers were hanging around.
if (!this.isOpen) {
const statusObject = {
code: 0,
details: 'Cannot use a closed PubSub object.',
metadata: null,
};
const err = new Error(statusObject.details);
Object.assign(err, statusObject);
callback(err as ServiceError);
return;
}

this.getClient_(config, (err, client) => {
if (err) {
callback(err as ServiceError);
Expand Down Expand Up @@ -1137,7 +1195,7 @@ export class PubSub {

/*! Developer Documentation
*
* These methods can be agto-paginated.
* These methods can be auto-paginated.
*/
paginator.extend(PubSub, ['getSnapshots', 'getSubscriptions', 'getTopics']);

Expand Down
16 changes: 16 additions & 0 deletions src/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ export class Topic {
this.iam = new IAM(pubsub, this.name);
}

flush(): Promise<void>;
flush(callback: EmptyCallback): void;
/**
* Immediately sends all remaining queued data. This is mostly useful
* if you are planning to call close() on the PubSub object that holds
* the server connections.
*
* @param {EmptyCallback} [callback] Callback function.
* @returns {Promise<EmptyResponse>}
*/
flush(callback?: EmptyCallback): Promise<void> | void {
// It doesn't matter here if callback is undefined; the Publisher
// flush() will handle it.
this.publisher.flush(callback!);
}

create(gaxOpts?: CallOptions): Promise<CreateTopicResponse>;
create(callback: CreateTopicCallback): void;
create(gaxOpts: CallOptions, callback: CreateTopicCallback): void;
Expand Down
49 changes: 49 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class FakeQueue extends EventEmitter {
this.publisher = publisher;
}
add(message: p.PubsubMessage, callback: p.PublishCallback): void {}
publish(callback: (err: Error | null) => void) {}
}

class FakeOrderedQueue extends FakeQueue {
Expand All @@ -57,6 +58,7 @@ class FakeOrderedQueue extends FakeQueue {
this.orderingKey = key;
}
resumePublishing(): void {}
publish(callback: (err: Error | null) => void) {}
}

describe('Publisher', () => {
Expand Down Expand Up @@ -239,6 +241,34 @@ describe('Publisher', () => {

assert.strictEqual(publisher.orderedQueues.size, 0);
});

it('should drain any ordered queues on flush', done => {
// We have to stub out the regular queue as well, so that the flush() operation finishes.
sandbox
.stub(FakeQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
callback(null);
});

sandbox
.stub(FakeOrderedQueue.prototype, 'publish')
.callsFake((callback: (err: Error | null) => void) => {
const queue = (publisher.orderedQueues.get(
orderingKey
) as unknown) as FakeOrderedQueue;
queue.emit('drain');
callback(null);
});

publisher.orderedQueues.clear();
publisher.publishMessage(fakeMessage, spy);

publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(publisher.orderedQueues.size, 0);
done();
});
});
});
});

Expand Down Expand Up @@ -315,4 +345,23 @@ describe('Publisher', () => {
assert.strictEqual(publisher.settings.batching!.maxMessages, 1000);
});
});

describe('flush', () => {
// The ordered queue drain test is above with the ordered queue tests.
it('should drain the main publish queue', done => {
sandbox.stub(publisher.queue, 'publish').callsFake(cb => {
if (cb) {
cb(null);
}
});
publisher.flush(err => {
assert.strictEqual(err, null);
assert.strictEqual(
!publisher.queue.batch || publisher.queue.batch.messages.length === 0,
true
);
done();
});
});
});
});

0 comments on commit 4097995

Please sign in to comment.