Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for spanner client resource base routing #780

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3b5c4de
feat: added method to get endPointUris and related changes
Dec 5, 2019
241fe62
fix: unit test cases was failing
Dec 6, 2019
d4d50b7
pass instanceId externally in each SpannerClient Request
Dec 6, 2019
87fd04a
feat: added flag to check weather resource based routing is enable or…
Dec 6, 2019
1ba9e16
move getInstanceEndPointUris to instance.ts file
Dec 9, 2019
2ac95bf
test: added unit test cases
Dec 10, 2019
f504666
fix: removed end point uris mapping chaching
Dec 11, 2019
cc5e72f
test:added more unit test cases
Dec 11, 2019
dfb8100
provided an option to enable resource based routing to user
Dec 13, 2019
9d4f4af
pass the instance id insted of formmatedName_ in each method
Dec 17, 2019
67f74af
fix: unit test cases
Dec 17, 2019
15f261d
fix: unit test cases
Dec 17, 2019
fe0e1a5
test:added system test
Dec 19, 2019
d46a13c
test:added unit test for instance permission
Dec 30, 2019
c4c12d2
fix:review changes
Dec 31, 2019
aae7311
refactor: simplify and clean up
Dec 31, 2019
0b7e83e
fix: reverse the resource based routing condition
Jan 1, 2020
59cdb4f
test: added unit test for multiple endpointUri
Jan 2, 2020
06fba33
Merge branch 'master' into resource-based-routing
Jan 10, 2020
600d5f1
fix:correct spell
Jan 10, 2020
2c99834
Merge branch 'master' into resource-based-routing
Jan 16, 2020
846cb78
refactor: replace getInstanceEndpointUris with getMetadata
Jan 16, 2020
1e83f37
Merge branch 'master' into resource-based-routing
Jan 22, 2020
57687ba
Merge branch 'master' into resource-based-routing
AVaksman Jan 23, 2020
eefc95d
Merge branch 'master' into resource-based-routing
Jan 24, 2020
4c2f6aa
fix:review changes
Jan 24, 2020
48085fb
fix: unit-test case fail issue
Jan 24, 2020
eb358f3
refactor: added enum to check the permission status
Jan 27, 2020
06dc72d
Merge branch 'master' into resource-based-routing
Jan 28, 2020
e5b02c5
merge:merge from master
Feb 6, 2020
710bc9a
fix: review changes
Feb 6, 2020
66eac38
Merge branch 'master' into resource-based-routing
Feb 10, 2020
a464043
test: added unit test and system-test cases
Feb 12, 2020
f3b976c
Merge branch 'master' into resource-based-routing
Feb 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/batch-transaction.ts
Expand Up @@ -20,7 +20,7 @@ import * as extend from 'extend';
import * as is from 'is';
import {Snapshot} from './transaction';
import {google} from '../proto/spanner';
import {Session} from '.';
import {Session, Instance} from '.';

export interface TransactionIdentifier {
session: string | Session;
Expand Down Expand Up @@ -130,6 +130,7 @@ class BatchTransaction extends Snapshot {
{
client: 'SpannerClient',
method: 'partitionQuery',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
gaxOpts: query.gaxOptions,
},
Expand Down Expand Up @@ -218,6 +219,7 @@ class BatchTransaction extends Snapshot {
{
client: 'SpannerClient',
method: 'partitionRead',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
gaxOpts: options.gaxOptions,
},
Expand Down
8 changes: 8 additions & 0 deletions src/common.ts
Expand Up @@ -68,3 +68,11 @@ export type PagedRequest<P> = P & {
maxApiCalls?: number;
gaxOptions?: CallOptions;
};

export const PERMISSION_DENIED_WARNING_MESSAGE = `The client library attempted to connect to
an endpoint closer to your Cloud Spanner data but was unable to
do so. The client library will fallback to the API endpoint given
in the client options, which may result in increased latency.
We recommend including the scope
https://www.googleapis.com/auth/spanner.admin
so that the client library can get an instance-specific endpoint and efficiently route requests.`;
3 changes: 3 additions & 0 deletions src/database.ts
Expand Up @@ -360,6 +360,7 @@ class Database extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'batchCreateSessions',
instanceId: ((this.parent as {}) as Instance).id,
reqOpts,
},
(err, resp) => {
Expand Down Expand Up @@ -602,6 +603,7 @@ class Database extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'createSession',
instanceId: ((this.parent as {}) as Instance).id,
reqOpts,
gaxOpts,
},
Expand Down Expand Up @@ -1131,6 +1133,7 @@ class Database extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'listSessions',
instanceId: ((this.parent as {}) as Instance).id,
reqOpts,
gaxOpts,
},
Expand Down
111 changes: 96 additions & 15 deletions src/index.ts
Expand Up @@ -21,7 +21,7 @@ import {PreciseDate} from '@google-cloud/precise-date';
import {replaceProjectIdToken} from '@google-cloud/projectify';
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library';
import {GoogleAuth} from 'google-auth-library';
import * as is from 'is';
import * as path from 'path';
import {common as p} from 'protobufjs';
Expand All @@ -35,13 +35,13 @@ import {SessionPool} from './session-pool';
import {Table} from './table';
import {PartitionedDml, Snapshot, Transaction} from './transaction';
import {GrpcClientOptions} from 'google-gax';
import {ChannelCredentials} from 'grpc';
import {ChannelCredentials, status} from 'grpc';
import {
createGcpApiConfig,
gcpCallInvocationTransformer,
gcpChannelFactoryOverride,
} from 'grpc-gcp';
import {google} from '../proto/spanner';
import {PERMISSION_DENIED_WARNING_MESSAGE} from './common';

const grpc = require('grpc');

Expand All @@ -57,10 +57,12 @@ export interface SpannerOptions extends GrpcClientOptions {
servicePath?: string;
port?: number;
sslCreds?: ChannelCredentials;
enableResourceBasedRouting?: boolean;
}
export interface RequestConfig {
client: string;
method: string;
instanceId?: string;
// tslint:disable-next-line: no-any
reqOpts: any;
gaxOpts?: {};
Expand Down Expand Up @@ -197,7 +199,7 @@ export interface RequestConfig {
* @param {ClientConfig} [options] Configuration options.
*/
class Spanner extends GrpcService {
options: GoogleAuthOptions;
options: SpannerOptions;
auth: GoogleAuth;
clients_: Map<string, {}>;
instances_: Map<string, Instance>;
Expand Down Expand Up @@ -302,7 +304,11 @@ class Spanner extends GrpcService {
this.auth = new GoogleAuth(this.options);
this.clients_ = new Map();
this.instances_ = new Map();

this.options.enableResourceBasedRouting =
typeof options!.enableResourceBasedRouting === 'boolean'
? options.enableResourceBasedRouting
: process.env.GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING ===
'true';
/**
* Get a list of {@link Instance} objects as a readable object stream.
*
Expand Down Expand Up @@ -771,20 +777,95 @@ class Spanner extends GrpcService {
callback(err);
return;
}
const clientName = config.client;
this.getGaxClient_(config, (err, gaxClient) => {
if (err) {
callback(err);
return;
}
let reqOpts = extend(true, {}, config.reqOpts);
reqOpts = replaceProjectIdToken(reqOpts, projectId!);
const requestFn = gaxClient[config.method].bind(
gaxClient,
reqOpts,
config.gaxOpts
);
callback(null, requestFn);
});
});
}

/**
* get GAX client. This will retrieve the end point uris for specific instance and cache the client accordingly.
*
* @private
*
* @param {string} projectId Request config
* @param {object} config Request config
* @param {function} callback Callback function
*/
getGaxClient_(config, callback) {
let clientName = config.client;
if (
clientName !== 'SpannerClient' ||
this.options.apiEndpoint ||
!this.options.enableResourceBasedRouting
) {
if (!this.clients_.has(clientName)) {
this.clients_.set(clientName, new gapic.v1[clientName](this.options));
this.setSpannerClient(clientName, config, this.options);
}
const gaxClient = this.clients_.get(clientName)!;
let reqOpts = extend(true, {}, config.reqOpts);
reqOpts = replaceProjectIdToken(reqOpts, projectId!);
const requestFn = gaxClient[config.method].bind(
gaxClient,
reqOpts,
config.gaxOpts
callback(null, gaxClient);
return;
}

if (!config.instanceId) {
const error = new Error('instanceId is required.');
callback(error);
return;
}
const instanceId = config.instanceId;
clientName = `${clientName}-${instanceId}`;

if (!this.clients_.has(clientName)) {
this.instances_.get(instanceId)!.getMetadata(
{
fieldNames: ['endpointUris'],
},
(err, instance) => {
if (err) {
if (err.code === status.PERMISSION_DENIED) {
process.emitWarning(PERMISSION_DENIED_WARNING_MESSAGE);
this.setSpannerClient(clientName, config, this.options);
callback(null, this.clients_.get(clientName)!);
return;
} else {
callback(err);
return;
}
}
const options = Object.assign({}, this.options);
if ((instance as Instance)!.endpointUris!.length) {
options.apiEndpoint = (instance as Instance)!.endpointUris![0];
}
this.setSpannerClient(clientName, config, options);
callback(null, this.clients_.get(clientName)!);
}
);
callback(null, requestFn);
});
} else {
callback(null, this.clients_.get(clientName)!);
}
}
/**
* Cache the GAX client with given name and configuration.
*
* @private
*
* @param {string} clientName Client name to cache.
* @param {object} config Request config
* @param {object} options Spanner options
*/
setSpannerClient(clientName: string, config, options: SpannerOptions) {
this.clients_.set(clientName, new gapic.v1[config.client](options));
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/session.ts
Expand Up @@ -36,6 +36,7 @@ import {
} from './database';
import {ServiceObjectConfig, DeleteCallback} from '@google-cloud/common';
import {NormalCallback} from './common';
import {Instance} from '.';
import {ServiceError} from 'grpc';

export type GetSessionResponse = [Session, r.Response];
Expand Down Expand Up @@ -275,6 +276,7 @@ export class Session extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'deleteSession',
instanceId: ((this.parent.parent as {}) as Instance).id,
skuruppu marked this conversation as resolved.
Show resolved Hide resolved
reqOpts,
},
callback!
Expand Down Expand Up @@ -325,6 +327,7 @@ export class Session extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'getSession',
instanceId: ((this.parent.parent as {}) as Instance).id,
reqOpts,
},
callback!
Expand Down Expand Up @@ -354,6 +357,7 @@ export class Session extends GrpcServiceObject {
{
client: 'SpannerClient',
method: 'executeSql',
instanceId: ((this.parent.parent as {}) as Instance).id,
reqOpts,
},
callback!
Expand Down
7 changes: 7 additions & 0 deletions src/transaction.ts
Expand Up @@ -36,6 +36,7 @@ import {Key} from './table';
import {SpannerClient as s} from './v1';
import {google as spannerClient} from '../proto/spanner';
import {NormalCallback} from './common';
import {Instance} from '.';

export type Rows = Array<Row | Json>;

Expand Down Expand Up @@ -299,6 +300,7 @@ export class Snapshot extends EventEmitter {
{
client: 'SpannerClient',
method: 'beginTransaction',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
},
(
Expand Down Expand Up @@ -498,6 +500,7 @@ export class Snapshot extends EventEmitter {
return this.requestStream({
client: 'SpannerClient',
method: 'streamingRead',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
});
Expand Down Expand Up @@ -887,6 +890,7 @@ export class Snapshot extends EventEmitter {
return this.requestStream({
client: 'SpannerClient',
method: 'executeStreamingSql',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
});
Expand Down Expand Up @@ -1280,6 +1284,7 @@ export class Transaction extends Dml {
{
client: 'SpannerClient',
method: 'executeBatchDml',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
},
(err: null | ServiceError, resp: s.ExecuteBatchDmlResponse) => {
Expand Down Expand Up @@ -1374,6 +1379,7 @@ export class Transaction extends Dml {
{
client: 'SpannerClient',
method: 'commit',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
},
(err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => {
Expand Down Expand Up @@ -1585,6 +1591,7 @@ export class Transaction extends Dml {
{
client: 'SpannerClient',
method: 'rollback',
instanceId: ((this.session.parent.parent as {}) as Instance).id,
reqOpts,
},
(err: null | ServiceError) => {
Expand Down