Skip to content

Commit

Permalink
#1 support localized message handling for bi-directional streams
Browse files Browse the repository at this point in the history
implements sendWithCallback so that a local function can handle a
message sent over a bi-directional stream. also, expose onError to the
client (rather than hard-end the websocket) to give the client better
control over how to handle errors
  • Loading branch information
0x6a77 committed Jan 20, 2020
1 parent cd3cae9 commit 06be553
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
3 changes: 2 additions & 1 deletion client/grpc-web/package.json
Expand Up @@ -8,7 +8,8 @@
"scripts": {
"clean": "rm -rf dist",
"postbootstrap": "npm run lib:build",
"lib:build": "npm run clean && webpack"
"lib:build": "npm run clean && webpack",
"lib:build-dev": "npm run clean && webpack -d --mode development"
},
"publishConfig": {
"access": "public"
Expand Down
18 changes: 15 additions & 3 deletions client/grpc-web/src/client.ts
Expand Up @@ -19,12 +19,14 @@ export interface ClientRpcOptions extends RpcOptions {
export interface Client<TRequest extends ProtobufMessage, TResponse extends ProtobufMessage> {
start(metadata?: Metadata.ConstructorArg): void;
send(message: TRequest): void;
sendWithCallback(message: TRequest, callback: (message: TResponse) => void): void;
finishSend(): void;
close(): void;

onHeaders(callback: (headers: Metadata) => void): void;
onMessage(callback: (message: TResponse) => void): void;
onEnd(callback: (code: Code, message: string, trailers: Metadata) => void): void;
onError(callback: (code: Code, message: string, trailers: Metadata) => void): void;
}

export function client<TRequest extends ProtobufMessage, TResponse extends ProtobufMessage, M extends MethodDefinition<TRequest, TResponse>>(methodDescriptor: M, props: ClientRpcOptions): Client<TRequest, TResponse> {
Expand All @@ -44,6 +46,7 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
onHeadersCallbacks: Array<(headers: Metadata) => void> = [];
onMessageCallbacks: Array<(res: TResponse) => void> = [];
onEndCallbacks: Array<(code: Code, message: string, trailers: Metadata) => void> = [];
onErrorCallbacks: Array<(code: Code, message: string, trailers: Metadata) => void> = [];

transport: Transport;
parser = new ChunkParser();
Expand All @@ -67,6 +70,7 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
onHeaders: this.onTransportHeaders.bind(this),
onChunk: this.onTransportChunk.bind(this),
onEnd: this.onTransportEnd.bind(this),
onError: this.onTransportEnd.bind(this)
};

if (this.props.transport) {
Expand Down Expand Up @@ -228,9 +232,7 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes

rawOnError(code: Code, msg: string, trailers: Metadata = new Metadata()) {
this.props.debug && debug("rawOnError", code, msg);
if (this.completed) return;
this.completed = true;
this.onEndCallbacks.forEach(callback => {
this.onErrorCallbacks.forEach(callback => {
if (this.closed) return;
try {
callback(code, msg, trailers);
Expand Down Expand Up @@ -269,6 +271,10 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
this.onEndCallbacks.push(callback);
}

onError(callback: (code: Code, message: string, trailers: Metadata) => void) {
this.onErrorCallbacks.push(callback);
}

start(metadata?: Metadata.ConstructorArg) {
if (this.started) {
throw new Error("Client already started - cannot .start()");
Expand Down Expand Up @@ -301,6 +307,12 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
this.transport.sendMessage(msgBytes);
}

sendWithCallback(message: TRequest, callback: (message: TResponse) => void) {
this.onMessageCallbacks = [];
this.onMessageCallbacks.push(callback);
this.send(message);
}

finishSend() {
if (!this.started) {
throw new Error("Client not started - .finishSend() must be called before .close()");
Expand Down
1 change: 1 addition & 0 deletions client/grpc-web/src/transports/Transport.ts
Expand Up @@ -27,6 +27,7 @@ export interface TransportOptions {
onHeaders: (headers: Metadata, status: number) => void;
onChunk: (chunkBytes: Uint8Array, flush?: boolean) => void;
onEnd: (err?: Error) => void;
onError: (err?: Error) => void;
}

export interface TransportFactory {
Expand Down
1 change: 1 addition & 0 deletions client/grpc-web/src/transports/websocket/websocket.ts
Expand Up @@ -72,6 +72,7 @@ function websocketRequest(options: TransportOptions): Transport {

ws.onerror = function (error) {
options.debug && debug("websocketRequest.onerror", error);
options.onError();
};

ws.onmessage = function (e) {
Expand Down

0 comments on commit 06be553

Please sign in to comment.