Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Queue.js #233

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
160 changes: 85 additions & 75 deletions lib/protocol/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,28 @@ var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE -
module.exports = Connection;

util.inherits(Connection, EventEmitter);

function Connection(settings) {
EventEmitter.call(this);

var self = this;
// public
this.connectOptions = new part.ConnectOptions();
this.clientContextOptions = new part.ClientContextOptions();
this.protocolVersion = undefined;
// private
this._clientInfo = new ClientInfo();
for(var key in settings) {
if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) {
for (var key in settings) {
if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) {
var sv_key = key.substring(key.indexOf(":") + 1);
var sv_value = settings[key];
if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) {
if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) {
this._clientInfo.setProperty(sv_key, sv_value);
}
delete settings[key];
}
}
this._settings = settings || {};
this._socket = undefined;
this._queue = new util.Queue().pause();
this._socketWriteQueue = [];
this._state = new ConnectionState();
this._statementContext = undefined;
this._transaction = new Transaction();
Expand Down Expand Up @@ -149,7 +147,7 @@ Object.defineProperties(Connection.prototype, {
case MessageType.DISCONNECT:
return 'disconnecting';
default:
// do nothing
// do nothing
}
if (this._state.sessionId === -1) {
return 'disconnected';
Expand Down Expand Up @@ -229,28 +227,41 @@ Connection.prototype._addListeners = function _addListeners(socket) {

// register listerners on socket
function ondata(chunk) {
if (!packet.data) {
self._socketWriteQueue.shift()
self._socketWriteNext()
}

packet.push(chunk);
if (packet.isReady()) {
const packetCount = packet.header.packetCount;
if (self._state.sessionId !== packet.header.sessionId) {
self._state.sessionId = packet.header.sessionId;
self._state.packetCount = -1;
}
var buffer = packet.getData();
packet.clear();
var cb = self._state.receive;
var cb = self._state.receives[packetCount];
self._state.receive = undefined;
self._state.messageType = undefined;
self._state.receives[packetCount] = undefined;
self.receive(buffer, cb);
}
}
socket.on('data', ondata);

function onerror(err) {
var cb = self._state && self._state.receive;
if (cb) {
self._state.receive = null; // a callback should be called only once
cb(err);
} else if (self.listeners('error').length) {
let called = false
for (const packetCount in self._state.receives) {
const cb = self._state.receives[packetCount]
if (cb) {
self._state.receives[packetCount] = undefined
called = true
cb(err)
}
}
if (called) { return }
if (self.listeners('error').length) {
self.emit('error', err);
} else {
debug('onerror', err);
Expand Down Expand Up @@ -293,6 +304,7 @@ Connection.prototype._clearQueue = function _clearQueue(err) {
Connection.prototype.send = function send(message, receive) {
if (this._statementContext) {
message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions());
this._statementContext = undefined
}
if (this._clientInfo.shouldSend(message.type)) {
message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties());
Expand All @@ -301,35 +313,57 @@ Connection.prototype.send = function send(message, receive) {
debug('send', message);
trace('REQUEST', message);

var size = MAX_PACKET_SIZE - PACKET_HEADER_LENGTH;
var buffer = message.toBuffer(size);
var packet = new Buffer(PACKET_HEADER_LENGTH + buffer.length);
buffer.copy(packet, PACKET_HEADER_LENGTH);
const buffers = message.toBuffer();
var bufferLength = PACKET_HEADER_LENGTH + buffers.byteLength

var packet = Buffer.allocUnsafe(bufferLength);

var state = this._state;
state.messageType = message.type;

// state.messageType = message.type;
state.receive = receive;

// Increase packet count
state.packetCount++;
const packetCount = state.packetCount;

state.receives[packetCount] = receive;

// Session identifier
bignum.writeUInt64LE(packet, state.sessionId, 0);
// Packet sequence number in this session
// Packets with the same sequence number belong to one request / reply pair
packet.writeUInt32LE(state.packetCount, 8);
packet.writeUInt32LE(packetCount, 8);
// Used space in this packet
packet.writeUInt32LE(buffer.length, 12);
packet.writeUInt32LE(bufferLength - PACKET_HEADER_LENGTH, 12);
// Total space in this packet
packet.writeUInt32LE(size, 16);
packet.writeUInt32LE(bufferLength, 16);
// Number of segments in this packet
packet.writeUInt16LE(1, 20);
// Filler
packet.fill(0x00, 22, PACKET_HEADER_LENGTH);
// Write request packet to socket
if (this._socket) {
this._socket.write(packet);

let offset = PACKET_HEADER_LENGTH
let remaining = bufferLength - PACKET_HEADER_LENGTH

message.updateBuffer(buffers, remaining)
for (let i = 0; i < buffers.length; i++) {
const buffer = buffers[i]
buffer.copy(packet, offset)
offset += buffer.length
}

this._socketWriteQueue.push(packet)
if (this._socketWriteQueue.length === 1) {
this._socketWriteNext()
}
};

Connection.prototype._socketWriteNext = function _socketWriteNext() {
if (this._socketWriteQueue.length === 0) return
const drain = this._socketWriteQueue[0]
this._socket.write(drain)
}

Connection.prototype.getClientInfo = function getClientInfo() {
return this._clientInfo;
Expand Down Expand Up @@ -390,15 +424,13 @@ Connection.prototype.receive = function receive(buffer, cb) {
if (error && error.fatal) {
this.destroy(error);
}
if(cb) {
if (cb) {
cb(error, reply);
}
};

Connection.prototype.enqueue = function enqueue(task, cb) {
var queueable;

if (!this._socket || !this._queue || this.readyState === 'closed') {
if (!this._socket || this.readyState === 'closed') {
var err = new Error('Connection closed');
err.code = 'EHDBCLOSE';
if (cb) {
Expand All @@ -408,18 +440,15 @@ Connection.prototype.enqueue = function enqueue(task, cb) {
}
}
if (util.isFunction(task)) {
queueable = this._queue.createTask(task, cb);
queueable.name = task.name;
} else if (util.isObject(task)) {
return task(cb)
}
if (util.isObject(task)) {
if (task instanceof request.Segment) {
queueable = this._queue.createTask(this.send.bind(this, task), cb);
queueable.name = MessageTypeName[task.type];
} else if (util.isFunction(task.run)) {
queueable = task;
return this.send(task, cb)
}
if (util.isFunction(task.run)) {
return task.run()
}
}
if (queueable) {
this._queue.push(queueable);
}
};

Expand All @@ -428,26 +457,26 @@ Connection.prototype._createAuthenticationManager = auth.createManager;
Connection.prototype.connect = function connect(options, cb) {
var self = this;
var manager;
for(var key in options) {
if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) {
for (var key in options) {
if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) {
var sv_key = key.substring(key.indexOf(":") + 1);
var sv_value = options[key];
if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) {
if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) {
this._clientInfo.setProperty(sv_key, sv_value);
}
delete options[key];
}
}
this.connectOptions.setOptions([{
name : common.ConnectOption.OS_USER,
value : this._clientInfo.getUser()
name: common.ConnectOption.OS_USER,
value: this._clientInfo.getUser()
}]);
this.clientContextOptions.setOptions([{
name : common.ClientContextOption.CLIENT_APPLICATION_PROGRAM,
value : this._clientInfo.getApplication()
name: common.ClientContextOption.CLIENT_APPLICATION_PROGRAM,
value: this._clientInfo.getApplication()
}]);
if(options["disableCloudRedirect"] == true) {
this._redirectType = common.RedirectType.REDIRECTION_DISABLED;
if (options["disableCloudRedirect"] == true) {
this._redirectType = common.RedirectType.REDIRECTION_DISABLED;
}
try {
manager = this._createAuthenticationManager(options);
Expand All @@ -469,15 +498,14 @@ Connection.prototype.connect = function connect(options, cb) {
if (manager.sessionCookie) {
self._settings.sessionCookie = manager.sessionCookie;
}
self._queue.resume();
cb(null, reply);
}

function authReceive(err, reply) {
if (err) {
return cb(err, reply);
}
manager.initialize(reply.authentication, function(err) {
manager.initialize(reply.authentication, function (err) {
if (err) return cb(err);
var redirectOptions = []
if (typeof self._initialHost === 'undefined') {
Expand Down Expand Up @@ -536,7 +564,7 @@ Connection.prototype.connect = function connect(options, cb) {
useCesu8: self.useCesu8
}

if(this._redirectType == common.RedirectType.REDIRECTION_NONE) {
if (this._redirectType == common.RedirectType.REDIRECTION_NONE) {
authOptions.dbConnectInfo = true;
}

Expand All @@ -551,14 +579,7 @@ Connection.prototype.disconnect = function disconnect(cb) {
cb(err, reply);
}

function enqueueDisconnect() {
self.enqueue(request.disconnect(), done);
}

if (this.isIdle()) {
return enqueueDisconnect();
}
this._queue.once('drain', enqueueDisconnect);
return this.send(request.disconnect(), done)
};

Connection.prototype.executeDirect = function executeDirect(options, cb) {
Expand Down Expand Up @@ -634,14 +655,14 @@ Connection.prototype.rollback = function rollback(options, cb) {
this.enqueue(request.rollback(options), cb);
};

// The function doesn't use the queue. It's used before the queue starts running
// The function doesn't use the queue. It's used before the queue starts running
Connection.prototype.fetchDbConnectInfo = function (options, cb) {
if (this.readyState == 'closed') {
var err = new Error('Connection unexpectedly closed');
err.code = 'EHDBCLOSE';
return cb(err)
}
this.send(request.dbConnectInfo(options), function(err, reply) {
this.send(request.dbConnectInfo(options), function (err, reply) {
if (err) {
return cb(err);
}
Expand All @@ -659,19 +680,11 @@ Connection.prototype._closeSilently = function _closeSilently() {
};

Connection.prototype.close = function close() {
var self = this;

function closeConnection() {
debug('close');
self.destroy();
}
if (this.readyState === 'closed') {
return;
}
if (this.isIdle()) {
return closeConnection();
}
this._queue.once('drain', closeConnection);
debug('close');
this.destroy();
};

Connection.prototype.destroy = function destroy(err) {
Expand All @@ -680,10 +693,6 @@ Connection.prototype.destroy = function destroy(err) {
}
};

Connection.prototype.isIdle = function isIdle() {
return this._queue.empty && !this._queue.busy;
};

Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) {
this._transaction.autoCommit = autoCommit;
};
Expand Down Expand Up @@ -711,6 +720,7 @@ function ConnectionState() {
this.packetCount = -1;
this.messageType = undefined;
this.receive = undefined;
this.receives = {};
}

function Version(major, minor) {
Expand Down Expand Up @@ -738,6 +748,6 @@ InitializationReply.read = function readInitializationReply(buffer) {
return new InitializationReply(productVersion, protocolVersion);
};

var initializationRequestBuffer = new Buffer([
var initializationRequestBuffer = Buffer.from([
0xff, 0xff, 0xff, 0xff, 4, 20, 0, 4, 1, 0, 0, 1, 1, 1
]);
3 changes: 1 addition & 2 deletions lib/protocol/ExecuteTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ ExecuteTask.create = function createExecuteTask(connection, options, cb) {
return new ExecuteTask(connection, options, cb);
};

ExecuteTask.prototype.run = function run(next) {
ExecuteTask.prototype.run = function run() {
var self = this;

function done(err) {
self.end(err);
next();
}

function finalize(err) {
Expand Down
9 changes: 6 additions & 3 deletions lib/protocol/request/Part.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ function Part(options) {
this.buffer = undefined;
}

Part.prototype.getLength = function () {
return PART_HEADER_LENGTH + util.alignLength(this.buffer.length, 8)
}

Part.prototype.toBuffer = function toBuffer(size) {
Part.prototype.toBuffer = function toBuffer(remaining) {
var byteLength = util.alignLength(this.buffer.length, 8);
var buffer = new Buffer(PART_HEADER_LENGTH + byteLength);
var buffer = Buffer.allocUnsafe(PART_HEADER_LENGTH + byteLength);
// Part kind, specifies nature of part data
buffer.writeInt8(this.kind, 0);
// Further attributes of part
Expand All @@ -43,7 +46,7 @@ Part.prototype.toBuffer = function toBuffer(size) {
// Length of part buffer in bytes
buffer.writeInt32LE(this.buffer.length, 8);
// Length in packet remaining without this part.
buffer.writeInt32LE(size, 12);
buffer.writeInt32LE(remaining, 12);
this.buffer.copy(buffer, PART_HEADER_LENGTH);
if (this.buffer.length < byteLength) {
buffer.fill(0x00, PART_HEADER_LENGTH + this.buffer.length);
Expand Down