Skip to content

Commit

Permalink
Add support for multi-byte characters to split, join and replace streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Sami Turcotte committed Dec 4, 2018
1 parent 48d261d commit 9af673f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 28 deletions.
85 changes: 82 additions & 3 deletions src/index.spec.ts
Expand Up @@ -479,7 +479,7 @@ test.cb("split() splits chunks using the specified separator", t => {
let i = 0;
source
.pipe(split("|"))
.on("data", part => {
.on("data", (part: string) => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
Expand All @@ -496,7 +496,7 @@ test.cb("split() splits chunks using the specified separator", t => {
});

test.cb(
"split() splits utf-8 encoded buffers using the specified separator",
"split() splits utf8 encoded buffers using the specified separator",
t => {
t.plan(3);
const expectedElements = ["a", "b", "c"];
Expand All @@ -519,6 +519,30 @@ test.cb(
},
);

test.cb(
"split() splits utf8 encoded buffers with multi-byte characters using the specified separator",
t => {
t.plan(3);
const expectedElements = ["一", "一", "一"];
let i = 0;
const through = split(",");
const buf = Buffer.from("一,一,一"); // Those spaces are multi-byte utf8 characters (code: 4E00)
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);

for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);

test.cb("join() joins chunks using the specified separator", t => {
t.plan(9);
const source = new Readable({ objectMode: true });
Expand All @@ -542,6 +566,35 @@ test.cb("join() joins chunks using the specified separator", t => {
source.push(null);
});

test.cb(
"join() joins chunks using the specified separator without breaking up multi-byte characters " +
"spanning multiple chunks",
t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedParts = ["ø", "|", "ö", "|", "一"];
let i = 0;
source
.pipe(join("|"))
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);

source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push(Buffer.from("一").slice(0, 1)); // 3-byte character spanning three chunks
source.push(Buffer.from("一").slice(1, 2));
source.push(Buffer.from("一").slice(2, 3));
source.push(null);
},
);

test.cb(
"replace() replaces occurrences of the given string in the streamed elements with the specified " +
"replacement string",
Expand Down Expand Up @@ -592,6 +645,32 @@ test.cb(
},
);

test.cb(
"replace() replaces occurrences of the given multi-byte character even if it spans multiple chunks",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["ø", "O", "a"];
let i = 0;
source
.pipe(replace("ö", "O"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);

source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push("a");
source.push(null);
},
);

test.cb("parse() parses the streamed elements as JSON", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
Expand Down Expand Up @@ -623,7 +702,7 @@ test.cb("parse() emits errors on invalid JSON", t => {
.on("end", t.end);

source.push("{}");
source.push("");
source.push({});
source.push([]);
source.push(null);
});
Expand Down
75 changes: 50 additions & 25 deletions src/index.ts
@@ -1,5 +1,6 @@
import { Transform, Readable, Writable, Duplex } from "stream";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";

export interface ThroughOptions {
objectMode?: boolean;
Expand All @@ -8,6 +9,9 @@ export interface TransformOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export interface WithEncoding {
encoding: string;
}

/**
* Convert an array into a Readable stream of its elements
Expand Down Expand Up @@ -200,19 +204,22 @@ export function reduce<T, R>(
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered: string = "";
let buffered = "";
const decoder = new StringDecoder(options.encoding);

return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: string | Buffer, encoding, callback) {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (buffered.length > 0 && splitted.length > 1) {
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
Expand All @@ -221,28 +228,35 @@ export function split(
callback();
},
flush(callback) {
callback(undefined, buffered);
callback(undefined, buffered + decoder.end());
},
});
}

/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(separator: string): NodeJS.ReadWriteStream {
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: string | Buffer, encoding, callback) {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
if (!isFirstChunk) {
this.push(separator);
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
this.push(asString);
isFirstChunk = false;
callback();
},
});
Expand All @@ -253,33 +267,44 @@ export function join(separator: string): NodeJS.ReadWriteStream {
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: string | Buffer, encoding, callback) {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
callback(undefined, asString.replace(searchValue, replaceValue));
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}

/**
* Return a ReadWrite stream that parses the streamed chunks as JSON
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
*/
export function parse(): NodeJS.ReadWriteStream {
const decoder = new StringDecoder("utf8"); // JSON must be utf8
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: string | Buffer, encoding, callback) {
async transform(chunk: Buffer, encoding, callback) {
try {
const asString =
chunk instanceof Buffer ? chunk.toString(encoding) : chunk;
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
Expand Down

0 comments on commit 9af673f

Please sign in to comment.