Skip to content

Commit

Permalink
refactor: use grpc from google-gax (#994)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-fenster committed May 8, 2020
1 parent 2e973a5 commit d3517ee
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 132 deletions.
15 changes: 12 additions & 3 deletions bin/benchwrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

const grpc = require('grpc');
const {grpc} = require('google-gax');
const protoLoader = require('@grpc/proto-loader');
const {PubSub} = require('../build/src');

Expand Down Expand Up @@ -67,5 +67,14 @@ server.addService(pubsubBenchWrapper['PubsubBenchWrapper']['service'], {
Recv: recv,
});
console.log(`starting on localhost:${argv.port}`);
server.bind(`0.0.0.0:${argv.port}`, grpc.ServerCredentials.createInsecure());
server.start();
server.bindAsync(
`0.0.0.0:${argv.port}`,
grpc.ServerCredentials.createInsecure(),
err => {
if (err) {
throw err;
} else {
server.start();
}
}
);
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"protobufjs": "^6.8.1"
},
"devDependencies": {
"@grpc/proto-loader": "^0.5.2",
"@grpc/proto-loader": "^0.5.4",
"@types/execa": "^0.9.0",
"@types/extend": "^3.0.0",
"@types/lodash.snakecase": "^4.1.6",
Expand All @@ -79,7 +79,6 @@
"c8": "^7.0.0",
"codecov": "^3.0.0",
"execa": "^4.0.0",
"grpc": "^1.24.0",
"gts": "^2.0.0",
"jsdoc": "^3.6.2",
"jsdoc-fresh": "^1.0.1",
Expand Down
7 changes: 3 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,9 @@ export {

if (process.env.DEBUG_GRPC) {
console.info('gRPC logging set to verbose');
// eslint-disable-next-line
const {setLogger, setLogVerbosity, logVerbosity} = require('@grpc/grpc-js');
setLogger(console);
setLogVerbosity(logVerbosity.DEBUG);
const grpc = require('google-gax').grpc;
grpc.setLogger(console);
grpc.setLogVerbosity(grpc.logVerbosity.DEBUG);
}
import * as protos from '../protos/protos';
export {protos};
12 changes: 5 additions & 7 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
* limitations under the License.
*/

import {CallOptions} from 'google-gax';
// eslint-disable-next-line node/no-extraneous-import
import {Metadata, ServiceError, status} from '@grpc/grpc-js';
import {CallOptions, grpc} from 'google-gax';
import defer = require('p-defer');

import {Message, Subscriber} from './subscriber';
Expand All @@ -37,12 +35,12 @@ export interface BatchOptions {
* @param {string} message The error message.
* @param {ServiceError} err The grpc service error.
*/
export class BatchError extends Error implements ServiceError {
export class BatchError extends Error implements grpc.ServiceError {
ackIds: string[];
code: status;
code: grpc.status;
details: string;
metadata: Metadata;
constructor(err: ServiceError, ackIds: string[], rpc: string) {
metadata: grpc.Metadata;
constructor(err: grpc.ServiceError, ackIds: string[], rpc: string) {
super(
`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${
process.env.DEBUG_GRPC ? err.stack : err.message
Expand Down
39 changes: 17 additions & 22 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,7 @@
*/

import {promisify} from '@google-cloud/promisify';
import {ClientStub} from 'google-gax';
import {
ClientDuplexStream,
Metadata,
ServiceError,
status,
StatusObject,
// eslint-disable-next-line node/no-extraneous-import
} from '@grpc/grpc-js';
import {ClientStub, grpc} from 'google-gax';
import * as isStreamEnded from 'is-stream-ended';
import {PassThrough} from 'stream';

Expand Down Expand Up @@ -59,7 +51,10 @@ interface StreamState {

type StreamingPullRequest = google.pubsub.v1.IStreamingPullRequest;
type PullResponse = google.pubsub.v1.IPullResponse;
type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse> & {
type PullStream = grpc.ClientDuplexStream<
StreamingPullRequest,
PullResponse
> & {
_readableState: StreamState;
};

Expand All @@ -70,11 +65,11 @@ type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse> & {
*
* @param {object} status The gRPC status object.
*/
export class StatusError extends Error implements ServiceError {
code: status;
export class StatusError extends Error implements grpc.ServiceError {
code: grpc.status;
details: string;
metadata: Metadata;
constructor(status: StatusObject) {
metadata: grpc.Metadata;
constructor(status: grpc.StatusObject) {
super(status.details);
this.code = status.code;
this.details = status.details;
Expand All @@ -89,21 +84,21 @@ export class StatusError extends Error implements ServiceError {
*
* @param {Error} err The original error.
*/
export class ChannelError extends Error implements ServiceError {
code: status;
export class ChannelError extends Error implements grpc.ServiceError {
code: grpc.status;
details: string;
metadata: Metadata;
metadata: grpc.Metadata;
constructor(err: Error) {
super(
`Failed to connect to channel. Reason: ${
process.env.DEBUG_GRPC ? err.stack : err.message
}`
);
this.code = err.message.includes('deadline')
? status.DEADLINE_EXCEEDED
: status.UNKNOWN;
? grpc.status.DEADLINE_EXCEEDED
: grpc.status.UNKNOWN;
this.details = err.message;
this.metadata = new Metadata();
this.metadata = new grpc.Metadata();
}
}

Expand Down Expand Up @@ -287,7 +282,7 @@ export class MessageStream extends PassThrough {
* @param {Duplex} stream The ended stream.
* @param {object} status The stream status.
*/
private _onEnd(stream: PullStream, status: StatusObject): void {
private _onEnd(stream: PullStream, status: grpc.StatusObject): void {
this._removeStream(stream);

if (this._fillHandle) {
Expand Down Expand Up @@ -331,7 +326,7 @@ export class MessageStream extends PassThrough {
* @param {stream} stream The stream that was closed.
* @param {object} status The status message stating why it was closed.
*/
private _onStatus(stream: PullStream, status: StatusObject): void {
private _onStatus(stream: PullStream, status: grpc.StatusObject): void {
if (this.destroyed) {
return;
}
Expand Down
22 changes: 10 additions & 12 deletions src/publisher/publish-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// eslint-disable-next-line node/no-extraneous-import
import {ServiceError, Metadata, status} from '@grpc/grpc-js';
import {grpc} from 'google-gax';

/**
* Exception to be thrown during failed ordered publish.
*
* @class
* @extends Error
*/
export class PublishError extends Error implements ServiceError {
code: status;
export class PublishError extends Error implements grpc.ServiceError {
code: grpc.status;
details: string;
metadata: Metadata;
metadata: grpc.Metadata;
orderingKey: string;
error: ServiceError;
constructor(key: string, err: ServiceError) {
error: grpc.ServiceError;
constructor(key: string, err: grpc.ServiceError) {
super(`Unable to publish for key "${key}". Reason: ${err.message}`);

/**
* The gRPC status code.
* The gRPC grpc.status code.
*
* @name PublishError#code
* @type {number}
*/
this.code = err.code;

/**
* The gRPC status details.
* The gRPC grpc.status details.
*
* @name PublishError#details
* @type {string}
*/
this.details = err.details;

/**
* The gRPC metadata object.
* The gRPC grpc.Metadata object.
*
* @name PublishError#metadata
* @name PublishError#grpc.Metadata
* @type {object}
*/
this.metadata = err.metadata;
Expand Down
18 changes: 7 additions & 11 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {GoogleAuth} from 'google-auth-library';
import * as gax from 'google-gax';
// eslint-disable-next-line node/no-extraneous-import
import * as grpc from '@grpc/grpc-js';
// eslint-disable-next-line node/no-extraneous-import
import {ServiceError, ChannelCredentials} from '@grpc/grpc-js';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const PKG = require('../../package.json');
Expand Down Expand Up @@ -64,7 +60,7 @@ export interface ClientConfig extends gax.GrpcClientOptions {
apiEndpoint?: string;
servicePath?: string;
port?: string | number;
sslCreds?: ChannelCredentials;
sslCreds?: gax.grpc.ChannelCredentials;
}

export interface PageOptions {
Expand Down Expand Up @@ -132,7 +128,7 @@ export interface RequestConfig extends GetClientConfig {

export interface ResourceCallback<Resource, Response> {
(
err: ServiceError | null,
err: gax.grpc.ServiceError | null,
resource?: Resource | null,
response?: Response | null
): void;
Expand All @@ -143,12 +139,12 @@ export type RequestCallback<T, R = void> = R extends void
: PagedCallback<T, R>;

export interface NormalCallback<TResponse> {
(err: ServiceError | null, res?: TResponse | null): void;
(err: gax.grpc.ServiceError | null, res?: TResponse | null): void;
}

export interface PagedCallback<Item, Response> {
(
err: ServiceError | null,
err: gax.grpc.ServiceError | null,
results?: Item[] | null,
nextQuery?: {} | null,
response?: Response | null
Expand Down Expand Up @@ -559,7 +555,7 @@ export class PubSub {
return;
}

const grpcInstance = this.options.grpc || grpc;
const grpcInstance = this.options.grpc || gax.grpc;
const baseUrl = apiEndpoint || process.env.PUBSUB_EMULATOR_HOST;
const leadingProtocol = new RegExp('^https*://');
const trailingSlashes = new RegExp('/*$');
Expand Down Expand Up @@ -1009,13 +1005,13 @@ export class PubSub {
};
const err = new Error(statusObject.details);
Object.assign(err, statusObject);
callback(err as ServiceError);
callback(err as gax.grpc.ServiceError);
return;
}

this.getClient_(config, (err, client) => {
if (err) {
callback(err as ServiceError);
callback(err as gax.grpc.ServiceError);
return;
}
let reqOpts = extend(true, {}, config.reqOpts);
Expand Down
30 changes: 16 additions & 14 deletions src/pull-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// eslint-disable-next-line node/no-extraneous-import
import {StatusObject, status} from '@grpc/grpc-js';
import {grpc} from 'google-gax';

/*!
* retryable status codes
* retryable grpc.status codes
*/
export const RETRY_CODES: status[] = [
status.DEADLINE_EXCEEDED,
status.RESOURCE_EXHAUSTED,
status.ABORTED,
status.INTERNAL,
status.UNAVAILABLE,
export const RETRY_CODES: grpc.status[] = [
grpc.status.DEADLINE_EXCEEDED,
grpc.status.RESOURCE_EXHAUSTED,
grpc.status.ABORTED,
grpc.status.INTERNAL,
grpc.status.UNAVAILABLE,
];

/**
Expand All @@ -51,7 +50,7 @@ export class PullRetry {
return Math.pow(2, this.failures) * 1000 + Math.floor(Math.random() * 1000);
}
/**
* Determines if a request status should be retried.
* Determines if a request grpc.status should be retried.
*
* Deadlines behave kind of unexpectedly on streams, rather than using it as
* an indicator of when to give up trying to connect, it actually dictates
Expand All @@ -60,18 +59,21 @@ export class PullRetry {
* the server closing the stream or if we timed out waiting for a connection.
*
* @private
* @param {object} status The request status.
* @param {object} grpc.status The request grpc.status.
* @returns {boolean}
*/
retry(err: StatusObject): boolean {
if (err.code === status.OK || err.code === status.DEADLINE_EXCEEDED) {
retry(err: grpc.StatusObject): boolean {
if (
err.code === grpc.status.OK ||
err.code === grpc.status.DEADLINE_EXCEEDED
) {
this.failures = 0;
} else {
this.failures += 1;
}

if (
err.code === status.UNAVAILABLE &&
err.code === grpc.status.UNAVAILABLE &&
err.details &&
err.details.match(/Server shutdownNow invoked/)
) {
Expand Down
12 changes: 5 additions & 7 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import * as assert from 'assert';
import {describe, it, before, beforeEach, afterEach} from 'mocha';
import {EventEmitter} from 'events';
import {CallOptions} from 'google-gax';
// eslint-disable-next-line node/no-extraneous-import
import {Metadata, ServiceError} from '@grpc/grpc-js';
import {CallOptions, grpc} from 'google-gax';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';
Expand Down Expand Up @@ -373,9 +371,9 @@ describe('MessageQueues', () => {

const ackIds = messages.map(message => message.ackId);

const fakeError = new Error('Err.') as ServiceError;
const fakeError = new Error('Err.') as grpc.ServiceError;
fakeError.code = 2;
fakeError.metadata = new Metadata();
fakeError.metadata = new grpc.Metadata();

const expectedMessage =
'Failed to "acknowledge" for 3 message(s). Reason: Err.';
Expand Down Expand Up @@ -498,9 +496,9 @@ describe('MessageQueues', () => {

const ackIds = messages.map(message => message.ackId);

const fakeError = new Error('Err.') as ServiceError;
const fakeError = new Error('Err.') as grpc.ServiceError;
fakeError.code = 2;
fakeError.metadata = new Metadata();
fakeError.metadata = new grpc.Metadata();

const expectedMessage =
'Failed to "modifyAckDeadline" for 3 message(s). Reason: Err.';
Expand Down

0 comments on commit d3517ee

Please sign in to comment.