Skip to content

Commit

Permalink
Implement async parsers (#1352)
Browse files Browse the repository at this point in the history
  • Loading branch information
msujew committed Feb 14, 2024
1 parent 87715d1 commit e78aeba
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 5 deletions.
2 changes: 2 additions & 0 deletions packages/langium/src/default-module.ts
Expand Up @@ -34,6 +34,7 @@ import { DefaultCommentProvider } from './documentation/comment-provider.js';
import { LangiumParserErrorMessageProvider } from './parser/langium-parser.js';
import { DefaultAsyncParser } from './parser/async-parser.js';
import { DefaultWorkspaceLock } from './workspace/workspace-lock.js';
import { DefaultHydrator } from './serializer/hydrator.js';

/**
* Context required for creating the default language-specific dependency injection module.
Expand Down Expand Up @@ -75,6 +76,7 @@ export function createDefaultCoreModule(context: DefaultCoreModuleContext): Modu
References: (services) => new DefaultReferences(services)
},
serializer: {
Hydrator: (services) => new DefaultHydrator(services),
JsonSerializer: (services) => new DefaultJsonSerializer(services)
},
validation: {
Expand Down
7 changes: 5 additions & 2 deletions packages/langium/src/grammar/langium-grammar-module.ts
Expand Up @@ -67,10 +67,12 @@ export const LangiumGrammarModule: Module<LangiumGrammarServices, PartialLangium
*
* @param context Shared module context, used to create additional shared modules
* @param sharedModule Existing shared module to inject together with new shared services
* @param module Additional/modified service implementations for the language services
* @returns Shared services enriched with LSP services + Grammar services, per usual
*/
export function createLangiumGrammarServices(context: DefaultSharedModuleContext,
sharedModule?: Module<LangiumSharedServices, PartialLangiumSharedServices>): {
sharedModule?: Module<LangiumSharedServices, PartialLangiumSharedServices>,
module?: Module<LangiumServices, PartialLangiumServices>): {
shared: LangiumSharedServices,
grammar: LangiumGrammarServices
} {
Expand All @@ -82,7 +84,8 @@ export function createLangiumGrammarServices(context: DefaultSharedModuleContext
const grammar = inject(
createDefaultModule({ shared }),
LangiumGrammarGeneratedModule,
LangiumGrammarModule
LangiumGrammarModule,
module
);
addTypeCollectionPhase(shared, grammar);
shared.ServiceRegistry.register(grammar);
Expand Down
1 change: 1 addition & 0 deletions packages/langium/src/node/index.ts
Expand Up @@ -5,3 +5,4 @@
******************************************************************************/

export * from './node-file-system-provider.js';
export * from './worker-thread-async-parser.js';
41 changes: 41 additions & 0 deletions packages/langium/src/node/worker-thread-async-parser.ts
@@ -0,0 +1,41 @@
/******************************************************************************
* Copyright 2023 TypeFox GmbH
* This program and the accompanying materials are made available under the
* terms of the MIT License, which is available in the project root.
******************************************************************************/

import type { LangiumCoreServices } from '../services.js';
import { ParserWorker } from '../parser/async-parser.js';
import { AbstractThreadedAsyncParser } from '../parser/async-parser.js';
import { Worker } from 'node:worker_threads';

export class WorkerThreadAsyncParser extends AbstractThreadedAsyncParser {

protected workerPath: string | (() => string);

constructor(services: LangiumCoreServices, workerPath: string | (() => string)) {
super(services);
this.workerPath = workerPath;
}

protected override createWorker(): ParserWorker {
const path = typeof this.workerPath === 'function' ? this.workerPath() : this.workerPath;
const worker = new Worker(path);
const parserWorker = new WorkerThreadParserWorker(worker);
return parserWorker;
}

}

export class WorkerThreadParserWorker extends ParserWorker {

constructor(worker: Worker) {
super(
(message) => worker.postMessage(message),
cb => worker.on('message', cb),
cb => worker.on('error', cb),
() => worker.terminate()
);
}

}
157 changes: 157 additions & 0 deletions packages/langium/src/parser/async-parser.ts
Expand Up @@ -8,6 +8,10 @@ import type { CancellationToken } from '../utils/cancellation.js';
import type { LangiumCoreServices } from '../services.js';
import type { AstNode } from '../syntax-tree.js';
import type { LangiumParser, ParseResult } from './langium-parser.js';
import type { Hydrator } from '../serializer/hydrator.js';
import type { Event } from '../utils/event.js';
import { Deferred, OperationCancelled } from '../utils/promise-utils.js';
import { Emitter } from '../utils/event.js';

/**
* Async parser that allows to cancel the current parsing process.
Expand Down Expand Up @@ -37,3 +41,156 @@ export class DefaultAsyncParser implements AsyncParser {
return Promise.resolve(this.syncParser.parse<T>(text));
}
}

export abstract class AbstractThreadedAsyncParser implements AsyncParser {

/**
* The thread count determines how many threads are used to parse files in parallel.
* The default value is 8. Decreasing this value increases startup performance, but decreases parallel parsing performance.
*/
protected threadCount = 8;
/**
* The termination delay determines how long the parser waits for a thread to finish after a cancellation request.
* The default value is 200(ms).
*/
protected terminationDelay = 200;
protected workerPool: ParserWorker[] = [];
protected queue: Array<Deferred<ParserWorker>> = [];

protected readonly hydrator: Hydrator;

constructor(services: LangiumCoreServices) {
this.hydrator = services.serializer.Hydrator;
}

protected initializeWorkers(): void {
while (this.workerPool.length < this.threadCount) {
const worker = this.createWorker();
worker.onReady(() => {
if (this.queue.length > 0) {
const deferred = this.queue.shift();
if (deferred) {
worker.lock();
deferred.resolve(worker);
}
}
});
this.workerPool.push(worker);
}
}

async parse<T extends AstNode>(text: string, cancelToken: CancellationToken): Promise<ParseResult<T>> {
const worker = await this.acquireParserWorker(cancelToken);
const deferred = new Deferred<ParseResult<T>>();
let timeout: NodeJS.Timeout | undefined;
// If the cancellation token is requested, we wait for a certain time before terminating the worker.
// Since the cancellation token lives longer than the parsing process, we need to dispose the event listener.
// Otherwise, we might accidentally terminate the worker after the parsing process has finished.
const cancellation = cancelToken.onCancellationRequested(() => {
timeout = setTimeout(() => {
this.terminateWorker(worker);
}, this.terminationDelay);
});
worker.parse(text).then(result => {
result.value = this.hydrator.hydrate(result.value);
deferred.resolve(result as ParseResult<T>);
}).catch(err => {
deferred.reject(err);
}).finally(() => {
cancellation.dispose();
clearTimeout(timeout);
});
return deferred.promise;
}

protected terminateWorker(worker: ParserWorker): void {
worker.terminate();
const index = this.workerPool.indexOf(worker);
if (index >= 0) {
this.workerPool.splice(index, 1);
}
}

protected async acquireParserWorker(cancelToken: CancellationToken): Promise<ParserWorker> {
this.initializeWorkers();
for (const worker of this.workerPool) {
if (worker.ready) {
worker.lock();
return worker;
}
}
const deferred = new Deferred<ParserWorker>();
cancelToken.onCancellationRequested(() => {
const index = this.queue.indexOf(deferred);
if (index >= 0) {
this.queue.splice(index, 1);
}
deferred.reject('OperationCancelled');
});
this.queue.push(deferred);
return deferred.promise;
}

protected abstract createWorker(): ParserWorker;
}

export type WorkerMessagePost = (message: unknown) => void;
export type WorkerMessageCallback = (cb: (message: unknown) => void) => void;

export class ParserWorker {

protected readonly sendMessage: WorkerMessagePost;
protected readonly _terminate: () => void;
protected readonly onReadyEmitter = new Emitter<void>();

protected deferred = new Deferred<ParseResult>();
protected _ready = true;
protected _parsing = false;

get ready(): boolean {
return this._ready;
}

get onReady(): Event<void> {
return this.onReadyEmitter.event;
}

constructor(sendMessage: WorkerMessagePost, onMessage: WorkerMessageCallback, onError: WorkerMessageCallback, terminate: () => void) {
this.sendMessage = sendMessage;
this._terminate = terminate;
onMessage(result => {
const parseResult = result as ParseResult;
this.deferred.resolve(parseResult);
this.unlock();
});
onError(error => {
this.deferred.reject(error);
this.unlock();
});
}

terminate(): void {
this.deferred.reject(OperationCancelled);
this._terminate();
}

lock(): void {
this._ready = false;
}

unlock(): void {
this._parsing = false;
this._ready = true;
this.onReadyEmitter.fire();
}

parse(text: string): Promise<ParseResult> {
if (this._parsing) {
throw new Error('Parser worker is busy');
}
this._parsing = true;
this.deferred = new Deferred();
this.sendMessage(text);
return this.deferred.promise;
}
}

0 comments on commit e78aeba

Please sign in to comment.