Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable packet size maximum #234

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 26 additions & 6 deletions lib/protocol/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ var debug = util.debuglog('hdb');
var trace = util.tracelog();

var EMPTY_BUFFER = common.EMPTY_BUFFER;
var MAX_PACKET_SIZE = common.MAX_PACKET_SIZE;
var DEFAULT_PACKET_SIZE = common.DEFAULT_PACKET_SIZE;
var MINIMUM_PACKET_SIZE = common.MINIMUM_PACKET_SIZE;
var MAXIMUM_PACKET_SIZE = common.MAXIMUM_PACKET_SIZE;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;
var PART_HEADER_LENGTH = common.PART_HEADER_LENGTH;
var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE -
PACKET_HEADER_LENGTH - SEGMENT_HEADER_LENGTH - PART_HEADER_LENGTH;

module.exports = Connection;

Expand Down Expand Up @@ -161,6 +161,22 @@ Object.defineProperties(Connection.prototype, {
get: function shouldUseCesu8() {
return (this._settings.useCesu8 === true);
}
},
packetSize: {
get: function getPacketSize() {
let size = this._settings.packetSize || DEFAULT_PACKET_SIZE;
size = Math.min(size, MAXIMUM_PACKET_SIZE);
size = Math.max(size, MINIMUM_PACKET_SIZE);
return size;
}
},
packetSizeLimit: {
get: function getPacketSizeLimit() {
let limit = this._settings.packetSizeLimit || this.packetSize;
limit = Math.min(limit, MAXIMUM_PACKET_SIZE);
limit = Math.max(limit, this.packetSize);
return limit;
}
}
});

Expand Down Expand Up @@ -301,8 +317,11 @@ Connection.prototype.send = function send(message, receive) {
debug('send', message);
trace('REQUEST', message);

var size = MAX_PACKET_SIZE - PACKET_HEADER_LENGTH;
var size = this.packetSizeLimit - PACKET_HEADER_LENGTH;
var buffer = message.toBuffer(size);
if(buffer.length > size) {
return receive(new Error('Packet size limit exceeded'));
}
var packet = new Buffer(PACKET_HEADER_LENGTH + buffer.length);
buffer.copy(packet, PACKET_HEADER_LENGTH);

Expand Down Expand Up @@ -346,8 +365,9 @@ Connection.prototype.setStatementContext = function setStatementContext(options)
}
};

Connection.prototype.getAvailableSize = function getAvailableSize() {
var availableSize = MAX_AVAILABLE_SIZE;
Connection.prototype.getAvailableSize = function getAvailableSize(forLobs = false) {
var totalSize = forLobs ? this.packetSize : this.packetSizeLimit;
var availableSize = totalSize - PACKET_HEADER_LENGTH - SEGMENT_HEADER_LENGTH - PART_HEADER_LENGTH;
if (this._statementContext) {
availableSize -= this._statementContext.size;
}
Expand Down
20 changes: 13 additions & 7 deletions lib/protocol/ExecuteTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ ExecuteTask.prototype.pushReply = function pushReply(reply) {
}
};

ExecuteTask.prototype.getParameters = function getParameters(availableSize, cb) {
ExecuteTask.prototype.getParameters = function getParameters(availableSize, availableSizeForLOBs, cb) {
var self = this;
var bytesWritten = 0;
var row = 0;
Expand Down Expand Up @@ -176,24 +176,30 @@ ExecuteTask.prototype.getParameters = function getParameters(availableSize, cb)
}
}
var remainingSize = availableSize - bytesWritten;
var remainingSizeForLOBs = availableSizeForLOBs - bytesWritten;
if (self.writer.length > remainingSize) {
if (self.autoCommit) {
self.needFinializeTransaction = true;
self.autoCommit = false;
}
return cb(null, args);
if(args.length === 0) {
self.needFinializeTransaction = false;
return cb(new Error('Failed to set parameters, maximum packet size exceeded.'));
} else {
return cb(null, args);
}
}
self.writer.getParameters(remainingSize, handleParameters);
self.writer.getParameters(remainingSizeForLOBs, handleParameters);
}

next();
};

ExecuteTask.prototype.sendExecute = function sendExecute(cb) {
var self = this;
var availableSize = self.connection.getAvailableSize();
availableSize -= STATEMENT_ID_PART_LENGTH;
self.getParameters(availableSize, function send(err, parameters) {
var availableSize = self.connection.getAvailableSize(false) - STATEMENT_ID_PART_LENGTH;
var availableSizeForLOBs = self.connection.getAvailableSize(true) - STATEMENT_ID_PART_LENGTH;
self.getParameters(availableSize, availableSizeForLOBs, function send(err, parameters) {
if (err) {
return cb(err);
}
Expand All @@ -210,7 +216,7 @@ ExecuteTask.prototype.sendExecute = function sendExecute(cb) {

ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) {
var self = this;
var availableSize = self.connection.getAvailableSize();
var availableSize = self.connection.getAvailableSize(true);
self.writer.getWriteLobRequest(availableSize, function send(err, buffer) {
if (err) {
return cb(err);
Expand Down
2 changes: 1 addition & 1 deletion lib/protocol/Statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ function isInputParameter(metadata) {

function isOutputParameter(metadata) {
return metadata.ioType === IoType.OUTPUT || metadata.ioType === IoType.IN_OUT;
}
}
18 changes: 9 additions & 9 deletions lib/protocol/Writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function storeErrorOnStream (err) {
}

Writer.prototype.finializeParameters = function finializeParameters(
bytesRemaining, cb) {
bytesRemainingForLOBs, cb) {
var self = this;
var stream, header;
this._streamErrorListeners = [];
Expand Down Expand Up @@ -155,14 +155,14 @@ Writer.prototype.finializeParameters = function finializeParameters(

function onreadable() {
/* jshint validthis:true */
var chunk = this.read(bytesRemaining);
var chunk = this.read(bytesRemainingForLOBs);
if (chunk === null) {
chunk = this.read();
}
if (chunk === null) {
return;
}
if (chunk.length > bytesRemaining) {
if (chunk.length > bytesRemainingForLOBs) {
cleanup();
return cb(createReadStreamError());
}
Expand All @@ -172,9 +172,9 @@ Writer.prototype.finializeParameters = function finializeParameters(
header.writeInt32LE(length, 2);
// push chunk
self.push(chunk);
bytesRemaining -= chunk.length;
bytesRemainingForLOBs -= chunk.length;
// stop appending if there is no remaining space
if (bytesRemaining === 0) {
if (bytesRemainingForLOBs === 0) {
cleanup();
// finalize lob if the stream has already ended
// because of cleanup we don't get end event in this case
Expand All @@ -188,7 +188,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
}

function next() {
if (!self._lobs.length || bytesRemaining <= 0) {
if (!self._lobs.length || bytesRemainingForLOBs <= 0) {
return cb(null);
}
// set readable stream
Expand All @@ -210,7 +210,7 @@ Writer.prototype.finializeParameters = function finializeParameters(
next();
};

Writer.prototype.getParameters = function getParameters(bytesAvailable, cb) {
Writer.prototype.getParameters = function getParameters(bytesAvailableForLOBs, cb) {
var self = this;

function done(err) {
Expand All @@ -223,8 +223,8 @@ Writer.prototype.getParameters = function getParameters(bytesAvailable, cb) {
cb(null, buffer);
});
}
var bytesRemaining = bytesAvailable - this._bytesWritten;
this.finializeParameters(bytesRemaining, done);
var bytesRemainingForLOBs = bytesAvailableForLOBs - this._bytesWritten;
this.finializeParameters(bytesRemainingForLOBs, done);
};

Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest(
Expand Down
4 changes: 3 additions & 1 deletion lib/protocol/common/Constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ module.exports = {
PACKET_HEADER_LENGTH: 32,
SEGMENT_HEADER_LENGTH: 24,
PART_HEADER_LENGTH: 16,
MAX_PACKET_SIZE: Math.pow(2, 17),
DEFAULT_PACKET_SIZE: Math.pow(2, 17),
MAXIMUM_PACKET_SIZE: Math.pow(2, 30) - 1,
MINIMUM_PACKET_SIZE: Math.pow(2, 16),
MAX_RESULT_SET_SIZE: Math.pow(2, 20),
EMPTY_BUFFER: new Buffer(0),
DATA_LENGTH_MAX1BYTE_LENGTH: 245,
Expand Down
4 changes: 2 additions & 2 deletions lib/protocol/reply/Segment.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var common = require('../common');
var FunctionCode = common.FunctionCode;
var SegmentKind = common.SegmentKind;
var PartKindName = common.PartKindName;
var MAX_PACKET_SIZE = common.MAX_PACKET_SIZE;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var DEFAULT_SEGMENT_SIZE = common.DEFAULT_PACKET_SIZE - PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;

module.exports = Segment;
Expand Down Expand Up @@ -82,7 +82,7 @@ function readSegment(buffer, offset) {
}

Segment.prototype.toBuffer = function toBuffer(size) {
size = size || (MAX_PACKET_SIZE - PACKET_HEADER_LENGTH);
size = size || DEFAULT_SEGMENT_SIZE;
var remainingSize = size - SEGMENT_HEADER_LENGTH;
var length = SEGMENT_HEADER_LENGTH;
var buffers = this.parts.map(function getPartBuffer(part) {
Expand Down
6 changes: 3 additions & 3 deletions lib/protocol/request/Segment.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var common = require('../common');
var Part = require('./Part');
var MessageType = common.MessageType;
var SegmentKind = common.SegmentKind;

var MAX_SEGMENT_SIZE = common.MAX_PACKET_SIZE - common.PACKET_HEADER_LENGTH;
var PACKET_HEADER_LENGTH = common.PACKET_HEADER_LENGTH;
var DEFAULT_SEGMENT_SIZE = common.DEFAULT_PACKET_SIZE - PACKET_HEADER_LENGTH;
var SEGMENT_HEADER_LENGTH = common.SEGMENT_HEADER_LENGTH;

module.exports = Segment;
Expand Down Expand Up @@ -73,7 +73,7 @@ Segment.prototype.add = function add(kind, args) {
};

Segment.prototype.toBuffer = function toBuffer(size) {
size = size || MAX_SEGMENT_SIZE;
size = size || DEFAULT_SEGMENT_SIZE;
var remainingSize = size - SEGMENT_HEADER_LENGTH;
var length = SEGMENT_HEADER_LENGTH;
var buffers = [];
Expand Down
68 changes: 64 additions & 4 deletions test/lib.Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,72 @@ describe('Lib', function () {
});

it('should get the available message buffer size', function () {
var connection = createConnection();
var maxAvailableSize = connection.getAvailableSize();
connection._statementContext = {
var totalHeaderLength = 32 + 24 + 16; // PACKET_HEADER_LENGTH + SEGMENT_HEADER_LENGTH + PART_HEADER_LENGTH
var packetSizeMin = Math.pow(2, 16);
var packetSizeMax = Math.pow(2, 30) - 1;
var packetSizeDefault = Math.pow(2, 17);
var c1 = createConnection();
c1._statementContext = {
size: 32
};
connection.getAvailableSize().should.equal(maxAvailableSize - 32);
c1.getAvailableSize().should.equal(packetSizeDefault - totalHeaderLength - 32);
c1.getAvailableSize(true).should.equal(packetSizeDefault - totalHeaderLength - 32);

// packetSize defined, packetSizeLimit undefined
var ps = Math.pow(2, 18);
var psl = undefined;
var c2 = createConnection({packetSize : ps, packetSizeLimit : psl})
c2.getAvailableSize(false).should.equal(ps - totalHeaderLength);
c2.getAvailableSize(true).should.equal(ps - totalHeaderLength);

// packetSize undefined, packetSizeLimit defined
ps = undefined
psl = Math.pow(2, 19);
var c3 = createConnection({packetSize : ps, packetSizeLimit : psl})
c3.getAvailableSize(false).should.equal(psl - totalHeaderLength);
c3.getAvailableSize(true).should.equal(packetSizeDefault - totalHeaderLength);

// packetSizeLimit is larger than packetSize
ps = Math.pow(2, 20);
psl = Math.pow(2, 21);
var c4 = createConnection({packetSize : ps, packetSizeLimit : psl})
c4.getAvailableSize(false).should.equal(psl - totalHeaderLength);
c4.getAvailableSize(true).should.equal(ps - totalHeaderLength);

// packetSizeLimit is smaller than packetSize
ps = Math.pow(2, 20);
psl = Math.pow(2, 19);
var c4 = createConnection({packetSize : ps, packetSizeLimit : psl})
c4.getAvailableSize(false).should.equal(ps - totalHeaderLength);
c4.getAvailableSize(true).should.equal(ps - totalHeaderLength);

// packetSize below minimum
ps = Math.pow(2, 10);
psl = Math.pow(2, 20);
var c5 = createConnection({packetSize : ps, packetSizeLimit : psl})
c5.getAvailableSize(false).should.equal(psl - totalHeaderLength);
c5.getAvailableSize(true).should.equal(packetSizeMin - totalHeaderLength);

// packetSizeLimit above maximum
ps = Math.pow(2, 20);
psl = Math.pow(2, 31);
var c6 = createConnection({packetSize : ps, packetSizeLimit : psl})
c6.getAvailableSize(false).should.equal(packetSizeMax - totalHeaderLength);
c6.getAvailableSize(true).should.equal(ps - totalHeaderLength);

// packetSize and packetSizeLimit below minimum
ps = Math.pow(2, 10);
psl = Math.pow(2, 9);
var c7 = createConnection({packetSize : ps, packetSizeLimit : psl})
c7.getAvailableSize(false).should.equal(packetSizeMin - totalHeaderLength);
c7.getAvailableSize(true).should.equal(packetSizeMin - totalHeaderLength);

// packetSize and packetSizeLimit above maximum
ps = Math.pow(2, 32);
psl = Math.pow(2, 31);
var c8 = createConnection({packetSize : ps, packetSizeLimit : psl})
c8.getAvailableSize(false).should.equal(packetSizeMax - totalHeaderLength);
c8.getAvailableSize(true).should.equal(packetSizeMax - totalHeaderLength);
});

it('should parse a reply', function () {
Expand Down