Skip to content

Commit

Permalink
feat: add function parseStream (#16)
Browse files Browse the repository at this point in the history
* wip: parsing stream

* wip: first example with generator

* chore: allow to define CHUNK_SIZE in tests

* chore: fix parseStream chunk size

* wip: allow to join chunks

* wip: try to slice the data

* fix: throw error if no closing tag

* feat: add parseStream

* chore: test with node 18

* fix: only test parseStream with node 18 and greater
  • Loading branch information
lpatiny committed May 25, 2022
1 parent 21e3152 commit 554e133
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Expand Up @@ -10,5 +10,5 @@ jobs:
nodejs:
uses: zakodium/workflows/.github/workflows/nodejs.yml@nodejs-v1
with:
node-version-matrix: '[14, 16]'
node-version-matrix: '[14, 16, 18]'
lint-check-types: true
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -119,4 +119,6 @@ dist

lib
lib-esm
big.xml
big.xml

script/medline.xml
8 changes: 4 additions & 4 deletions package.json
Expand Up @@ -39,18 +39,18 @@
"homepage": "https://github.com/cheminfo/arraybuffer-xml-parser#readme",
"devDependencies": {
"@types/he": "^1.1.2",
"@types/jest": "^27.5.0",
"@types/jest": "^27.5.1",
"cheminfo-build": "^1.1.11",
"eslint": "^8.15.0",
"eslint": "^8.16.0",
"eslint-config-cheminfo-typescript": "^10.4.0",
"he": "^1.2.0",
"iobuffer": "^5.1.0",
"jest": "^28.1.0",
"pako": "^2.0.4",
"prettier": "^2.6.2",
"rimraf": "^3.0.2",
"ts-jest": "^28.0.2",
"typescript": "^4.6.4",
"ts-jest": "^28.0.3",
"typescript": "^4.7.2",
"uint8-base64": "^0.1.1"
},
"dependencies": {
Expand Down
19 changes: 19 additions & 0 deletions script/medline.mjs
@@ -0,0 +1,19 @@
import { parseStream } from '../lib/index.js';
import { open } from 'fs/promises';

/*
In order to test this script you should first build the package: `npm run prepack`
And you also need a (big) file from medline called 'medline.xml'
*/

async function doAll() {
const file = await open(new URL('medline.xml', import.meta.url), 'r');
const stream = file.readableWebStream();
let i = 0;
for await (const entry of parseStream(stream, 'PubmedArticle')) {
console.log(entry);
console.log(i++);
}
}

doAll();
63 changes: 63 additions & 0 deletions src/__tests__/parseStream.test.ts
@@ -0,0 +1,63 @@
import { open } from 'fs/promises';
import { join } from 'path';

import { parseStream } from '../parseStream';

describe('parseStream', () => {
it('simple case', async () => {
// eslint-disable-next-line jest/no-if
if (Number(process.versions.node.split('.')[0]) >= 18) {
const file = await open(join(__dirname, 'assets/sample.xml'), 'r');
const CHUNK_SIZE = 10;
const transformStream = new TransformStream({
start: function start() {}, // required.
transform: async function transform(chunk, controller) {
if (chunk === null) controller.terminate();
chunk = new Uint8Array(await chunk);
for (let i = 0; i < chunk.length; i += CHUNK_SIZE) {
controller.enqueue(chunk.slice(i, i + CHUNK_SIZE));
}
},
});

const results = [];
//@ts-expect-error feature is too new
const readableStream = file.readableWebStream();
for await (let entry of parseStream(
readableStream.pipeThrough(transformStream),
'address',
)) {
results.push(entry);
//console.log(entry);
}
expect(results).toMatchInlineSnapshot(`
Array [
Object {
"buildingNo": 1,
"city": "New York",
"flatNo": 1,
"street": "Park Ave",
},
Object {
"buildingNo": 33,
"city": "Boston",
"flatNo": 24,
"street": "Centre St",
},
Object {
"buildingNo": 1,
"city": "Moscow",
"flatNo": 2,
"street": "Kahovka",
},
Object {
"buildingNo": 3,
"city": "Tula",
"flatNo": 78,
"street": "Lenina",
},
]
`);
}
});
});
1 change: 1 addition & 0 deletions src/index.ts
@@ -1 +1,2 @@
export * from './parse';
export * from './parseStream';
25 changes: 25 additions & 0 deletions src/parseStream.ts
@@ -0,0 +1,25 @@
import {
defaultOptions,
StreamParseOptions,
} from './traversable/defaultOptions';
import { getTraversableGenerator } from './traversable/getTraversableGenerator';
import { traversableToJSON } from './traversableToJSON';

/**
* Parse a web stream representing an XML and emit objects
*/
export async function* parseStream(
readableStream: ReadableStream,
lookupTagName: string,
options: StreamParseOptions = {},
) {
options = { ...defaultOptions, ...options };

for await (const traversableEntry of getTraversableGenerator(
readableStream,
lookupTagName,
options,
)) {
yield traversableToJSON(traversableEntry, options);
}
}
11 changes: 7 additions & 4 deletions src/traversable/closingIndexForOpeningTag.ts
@@ -1,5 +1,11 @@
import { decoder } from './utils/utf8Decoder';

/**
* Search for the corresponding closing tag '>'
* @param data
* @param i
* @returns
*/
export function closingIndexForOpeningTag(
data: Uint8Array,
i: number,
Expand All @@ -25,8 +31,5 @@ export function closingIndexForOpeningTag(
}
endIndex++;
}
return {
data: decoder.decode(data.subarray(i, i + endIndex)),
index: 0,
};
throw new Error('Could not find closing tag');
}
15 changes: 15 additions & 0 deletions src/traversable/defaultOptions.ts
Expand Up @@ -7,6 +7,20 @@ export const decoder = {
return utf8Decoder.decode(array);
},
};

export interface StreamParseOptions extends ParseOptions {
/**
* What is the maximal size (in bytes) of an entry
* @default 1e7
*/
maxEntrySize?: number;
/**
* What is the maximal size for the buffer
* @default 2e8
*/
maxBufferSize?: number;
}

export interface ParseOptions {
/**
* should we remove ascii < 32
Expand Down Expand Up @@ -92,6 +106,7 @@ export interface ParseOptions {
*/
stopNodes?: string[];
}

export const defaultOptions: ParseOptions = {
trimValues: true,
attributeNamePrefix: '$',
Expand Down

0 comments on commit 554e133

Please sign in to comment.