Skip to content

Commit

Permalink
Send all outgoingMessages$ with the latest available open connection
Browse files Browse the repository at this point in the history
  • Loading branch information
jbmusso committed May 16, 2017
1 parent 24f9803 commit af3cd5f
Showing 1 changed file with 41 additions and 35 deletions.
76 changes: 41 additions & 35 deletions src/GremlinClient.js
Expand Up @@ -17,6 +17,22 @@ const hasCode = (filterCode) => ({ status: { code } }) => code === filterCode;

const isErrorMessage = ({ status: { code }}) => [200, 204, 206].indexOf(code) === -1;

const serializeToBinary = (message, accept) => {
let serializedMessage = accept + JSON.stringify(message);
serializedMessage = unescape(encodeURIComponent(serializedMessage));

// Let's start packing the message into binary
// mimeLength(1) + mimeType Length + serializedMessage Length
let binaryMessage = new Uint8Array(1 + serializedMessage.length);
binaryMessage[0] = accept.length;

for (let i = 0; i < serializedMessage.length; i++) {
binaryMessage[i + 1] = serializedMessage.charCodeAt(i);
}

return binaryMessage;
}

class GremlinClient extends EventEmitter {
constructor(port = 8182, host = 'localhost', options = {}) {
super();
Expand Down Expand Up @@ -49,38 +65,37 @@ class GremlinClient extends EventEmitter {

this.commands = {};

const connection = this.createConnection({
port,
host,
path: this.options.path
});

this.commands$ = new Rx.Subject();
this.commands$.subscribe((command) => {
const { message: { requestId } } = command;
this.commands[requestId] = command
});

this.registerConnection(connection);
}
const connection = this.createConnection({
port,
host,
path: this.options.path
});

createConnection({ port, host, path }) {
return new WebSocketGremlinConnection({ port, host, path });
}
const connections$ = Rx.Observable.create((observer) => observer.next(connection));

registerConnection(connection) {
this.connection = connection;
const open$ = connections$
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'open'));

const open$ = Rx.Observable.fromEvent(connection, 'open');
const error$ = Rx.Observable.fromEvent(connection, 'error');
const incomingMessages$ = Rx.Observable.fromEvent(connection, 'message')
const error$ = connections$
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'error'));

const incomingMessages$ = connections$
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'message'))
.map(({ data }) => {
const buffer = new Buffer(data, 'binary');
const rawMessage = JSON.parse(buffer.toString('utf-8'));

return rawMessage;
});
const close$ = Rx.Observable.fromEvent(connection, 'close');
const close$ = connections$
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'close'));

const canSend$ = Rx.Observable.merge(
open$.map(true),
Expand All @@ -97,11 +112,18 @@ class GremlinClient extends EventEmitter {
close$.subscribe((event) => this.handleDisconnection(event));

const outgoingMessages$ = this.commands$
.map(({ message }) => message)
.pausableBuffered(canSend$);
.map(({ message }) => serializeToBinary(message, this.options.accept))
.pausableBuffered(canSend$)
.combineLatest(connections$);

outgoingMessages$
.subscribe((message) => this.sendMessage(message));
.subscribe(([binaryMessage, connection]) =>
connection.sendMessage(binaryMessage)
);
}

createConnection({ port, host, path }) {
return new WebSocketGremlinConnection({ port, host, path });
}

closeConnection() {
Expand Down Expand Up @@ -190,22 +212,6 @@ class GremlinClient extends EventEmitter {
return message;
};

sendMessage(message) {
let serializedMessage = this.options.accept + JSON.stringify(message);
serializedMessage = unescape(encodeURIComponent(serializedMessage));

// Let's start packing the message into binary
// mimeLength(1) + mimeType Length + serializedMessage Length
let binaryMessage = new Uint8Array(1 + serializedMessage.length);
binaryMessage[0] = this.options.accept.length;

for (let i = 0; i < serializedMessage.length; i++) {
binaryMessage[i + 1] = serializedMessage.charCodeAt(i);
}

this.connection.sendMessage(binaryMessage);
};

/**
* Asynchronously send a script to Gremlin Server for execution and fire
* the provided callback when all results have been fetched.
Expand Down

0 comments on commit af3cd5f

Please sign in to comment.