Skip to content

Commit

Permalink
refactor: use typed GAPIC methods (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jan 9, 2020
1 parent af3196f commit 69bd69a
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 325 deletions.
44 changes: 18 additions & 26 deletions dev/conformance/runner.ts
Expand Up @@ -15,7 +15,6 @@
const duplexify = require('duplexify');

import {expect} from 'chai';
import {CallOptions} from 'google-gax';
import * as path from 'path';
import * as protobufjs from 'protobufjs';
import * as through2 from 'through2';
Expand All @@ -36,10 +35,12 @@ import {
import {fieldsFromJson} from '../src/convert';
import {DocumentChangeType} from '../src/document-change';
import {QualifiedResourcePath} from '../src/path';
import {UnaryMethod} from '../src/types';
import {isObject} from '../src/util';
import {
ApiOverride,
createInstance as createInstanceHelper,
response,
} from '../test/util/helpers';

import api = proto.google.firestore.v1;
Expand Down Expand Up @@ -248,32 +249,23 @@ const convertProto = {
};

/** Request handler for _commit. */
function commitHandler(spec: ConformanceProto) {
return (
request: api.ICommitRequest,
options: CallOptions,
callback: (
err: Error | null | undefined,
resp?: api.ICommitResponse
) => void
) => {
try {
const actualCommit = COMMIT_REQUEST_TYPE.fromObject(request);
const expectedCommit = COMMIT_REQUEST_TYPE.fromObject(spec.request);
expect(actualCommit).to.deep.equal(expectedCommit);
const res: api.IWriteResponse = {
commitTime: {},
writeResults: [],
};
for (let i = 1; i <= request.writes!.length; ++i) {
res.writeResults!.push({
updateTime: {},
});
}
callback(null, res);
} catch (err) {
callback(err);
function commitHandler(
spec: ConformanceProto
): UnaryMethod<api.ICommitRequest, api.ICommitResponse> {
return request => {
const actualCommit = COMMIT_REQUEST_TYPE.fromObject(request);
const expectedCommit = COMMIT_REQUEST_TYPE.fromObject(spec.request);
expect(actualCommit).to.deep.equal(expectedCommit);
const res: api.ICommitResponse = {
commitTime: {},
writeResults: [],
};
for (let i = 1; i <= request.writes!.length; ++i) {
res.writeResults!.push({
updateTime: {},
});
}
return response(res);
};
}

Expand Down
118 changes: 60 additions & 58 deletions dev/src/index.ts
Expand Up @@ -41,7 +41,15 @@ import {DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
import {ApiMapValue, GapicClient, ReadOptions, Settings} from './types';
import {
ApiMapValue,
FirestoreStreamingMethod,
FirestoreUnaryMethod,
GapicClient,
ReadOptions,
Settings,
UnaryMethod,
} from './types';
import {Deferred, isPermanentRpcError, requestTag} from './util';
import {
validateBoolean,
Expand Down Expand Up @@ -392,7 +400,7 @@ export class Firestore {
this._settings.maxIdleChannels === undefined
? DEFAULT_MAX_IDLE_CHANNELS
: this._settings.maxIdleChannels;
this._clientPool = new ClientPool(
this._clientPool = new ClientPool<GapicClient>(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
Expand All @@ -409,7 +417,7 @@ export class Firestore {
logger('Firestore', null, 'Initialized Firestore GAPIC Client');
return client;
},
/* clientDestructor= */ (client: GapicClient) => client.close()
/* clientDestructor= */ client => client.close()
);

logger('Firestore', null, 'Initialized Firestore');
Expand Down Expand Up @@ -956,7 +964,7 @@ export class Firestore {
const self = this;

return self
.requestStream('batchGetDocuments', 'unidirectional', request, requestTag)
.requestStream('batchGetDocuments', request, requestTag)
.then(stream => {
return new Promise<DocumentSnapshot[]>((resolve, reject) => {
stream
Expand Down Expand Up @@ -1056,29 +1064,25 @@ export class Firestore {
this._settingsFrozen = true;

if (this._projectId === undefined) {
this._projectId = await this._clientPool.run(requestTag, gapicClient => {
return new Promise((resolve, reject) => {
gapicClient.getProjectId((err: Error, projectId: string) => {
if (err) {
logger(
'Firestore._detectProjectId',
null,
'Failed to detect project ID: %s',
err
);
reject(err);
} else {
logger(
'Firestore._detectProjectId',
null,
'Detected project ID: %s',
projectId
);
resolve(projectId);
}
});
});
});
try {
this._projectId = await this._clientPool.run(requestTag, gapicClient =>
gapicClient.getProjectId()
);
logger(
'Firestore.initializeIfNeeded',
null,
'Detected project ID: %s',
this._projectId
);
} catch (err) {
logger(
'Firestore.initializeIfNeeded',
null,
'Failed to detect project ID: %s',
err
);
return Promise.reject(err);
}
}
}

Expand Down Expand Up @@ -1263,7 +1267,7 @@ export class Firestore {

/**
* A funnel for all non-streaming API requests, assigning a project ID where
* necessary within the request options.
* necessary within the request options.
*
* @private
* @param methodName Name of the Veneer API endpoint that takes a request
Expand All @@ -1272,32 +1276,32 @@ export class Firestore {
* @param requestTag A unique client-assigned identifier for this request.
* @returns A Promise with the request result.
*/
request<T>(methodName: string, request: {}, requestTag: string): Promise<T> {
request<Req, Resp>(
methodName: FirestoreUnaryMethod,
request: Req,
requestTag: string
): Promise<Resp> {
const callOptions = this.createCallOptions();

return this._clientPool.run(requestTag, gapicClient => {
return new Promise((resolve, reject) => {
return this._clientPool.run(requestTag, async gapicClient => {
try {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
gapicClient[methodName](
request,
callOptions,
(err: GoogleError, result: T) => {
if (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
reject(err);
} else {
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
this._lastSuccessfulRequest = new Date().getTime();
resolve(result);
}
}
const [result] = await (gapicClient[methodName] as UnaryMethod<
Req,
Resp
>)(request, callOptions);
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
});
this._lastSuccessfulRequest = new Date().getTime();
return result;
} catch (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
return Promise.reject(err);
}
});
}

Expand All @@ -1311,19 +1315,18 @@ export class Firestore {
* @private
* @param methodName Name of the streaming Veneer API endpoint that
* takes a request and GAX options.
* @param mode Whether this a unidirectional or bidirectional call.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @returns A Promise with the resulting read-only stream.
*/
requestStream(
methodName: string,
mode: 'unidirectional' | 'bidirectional',
methodName: FirestoreStreamingMethod,
request: {},
requestTag: string
): Promise<Duplex> {
const callOptions = this.createCallOptions();

const bidrectional = methodName === 'listen';
const result = new Deferred<Duplex>();

this._clientPool.run(requestTag, gapicClient => {
Expand All @@ -1339,10 +1342,9 @@ export class Firestore {
'Sending request: %j',
request
);
const stream: Duplex =
mode === 'unidirectional'
? gapicClient[methodName](request, callOptions)
: gapicClient[methodName](callOptions);
const stream = bidrectional
? gapicClient[methodName](callOptions)
: gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.requestStream',
Expand All @@ -1362,7 +1364,7 @@ export class Firestore {
const resultStream = await this._initializeStream(
stream,
requestTag,
mode === 'bidirectional' ? request : undefined
bidrectional ? request : undefined
);

resultStream.on('end', () => stream.end());
Expand Down
14 changes: 11 additions & 3 deletions dev/src/reference.ts
Expand Up @@ -293,7 +293,11 @@ export class DocumentReference implements Serializable {
pageSize: Math.pow(2, 16) - 1,
};
return this._firestore
.request<string[]>('listCollectionIds', request, tag)
.request<api.IListCollectionIdsRequest, string[]>(
'listCollectionIds',
request,
tag
)
.then(collectionIds => {
const collections: CollectionReference[] = [];

Expand Down Expand Up @@ -1829,7 +1833,7 @@ export class Query {
this.firestore.initializeIfNeeded(tag).then(() => {
const request = this.toProto(transactionId);
this._firestore
.requestStream('runQuery', 'unidirectional', request, tag)
.requestStream('runQuery', request, tag)
.then(backendStream => {
backendStream.on('error', err => {
logger(
Expand Down Expand Up @@ -2060,7 +2064,11 @@ export class CollectionReference extends Query {
};

return this.firestore
.request<api.IDocument[]>('listDocuments', request, tag)
.request<api.IListDocumentsRequest, api.IDocument[]>(
'listDocuments',
request,
tag
)
.then(documents => {
// Note that the backend already orders these documents by name,
// so we do not need to manually sort them.
Expand Down
2 changes: 1 addition & 1 deletion dev/src/transaction.ts
Expand Up @@ -363,7 +363,7 @@ export class Transaction {
}

return this._firestore
.request<api.IBeginTransactionResponse>(
.request<api.IBeginTransactionRequest, api.IBeginTransactionResponse>(
'beginTransaction',
request,
this._requestTag
Expand Down
62 changes: 59 additions & 3 deletions dev/src/types.ts
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/

import {CallOptions} from 'google-gax';
import {Duplex} from 'stream';

import {google} from '../protos/firestore_v1_proto_api';
import {FieldPath} from './path';
import {Timestamp} from './timestamp';
Expand All @@ -27,9 +30,62 @@ export interface ApiMapValue {
[k: string]: google.firestore.v1.IValue;
}

// We don't have type information for the JavaScript GapicClient.
// tslint:disable-next-line:no-any
export type GapicClient = any;
/**
* The subset of methods we use from FirestoreClient.
*
* We don't depend on the actual Gapic client to avoid loading the GAX stack at
* module initialization time.
*/
export interface GapicClient {
getProjectId(): Promise<string>;
beginTransaction(
request: api.IBeginTransactionRequest,
options?: CallOptions
): Promise<[api.IBeginTransactionResponse, unknown, unknown]>;
commit(
request: api.ICommitRequest,
options?: CallOptions
): Promise<[api.ICommitResponse, unknown, unknown]>;
rollback(
request: api.IRollbackRequest,
options?: CallOptions
): Promise<[google.protobuf.IEmpty, unknown, unknown]>;
batchGetDocuments(
request?: api.IBatchGetDocumentsRequest,
options?: CallOptions
): Duplex;
runQuery(request?: api.IRunQueryRequest, options?: CallOptions): Duplex;
listDocuments(
request: api.IListDocumentsRequest,
options?: CallOptions
): Promise<[api.IDocument[], unknown, unknown]>;
listCollectionIds(
request: api.IListCollectionIdsRequest,
options?: CallOptions
): Promise<[string[], unknown, unknown]>;
listen(options?: CallOptions): Duplex;
close(): Promise<void>;
}

/** Request/response methods used in the Firestore SDK. */
export type FirestoreUnaryMethod =
| 'listDocuments'
| 'listCollectionIds'
| 'rollback'
| 'beginTransaction'
| 'commit';

/** Streaming methods used in the Firestore SDK. */
export type FirestoreStreamingMethod =
| 'listen'
| 'runQuery'
| 'batchGetDocuments';

/** Type signature for the unary methods in the GAPIC layer. */
export type UnaryMethod<Req, Resp> = (
request: Req,
callOptions: CallOptions
) => Promise<[Resp, unknown, unknown]>;

// We don't have type information for the npm package
// `functional-red-black-tree`.
Expand Down

0 comments on commit 69bd69a

Please sign in to comment.