Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async parsers #1352

Merged
merged 9 commits into from Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/langium/src/default-module.ts
Expand Up @@ -34,6 +34,7 @@ import { DefaultCommentProvider } from './documentation/comment-provider.js';
import { LangiumParserErrorMessageProvider } from './parser/langium-parser.js';
import { DefaultAsyncParser } from './parser/async-parser.js';
import { DefaultWorkspaceLock } from './workspace/workspace-lock.js';
import { DefaultHydrator } from './serializer/hydrator.js';

/**
* Context required for creating the default language-specific dependency injection module.
Expand Down Expand Up @@ -75,6 +76,7 @@ export function createDefaultCoreModule(context: DefaultCoreModuleContext): Modu
References: (services) => new DefaultReferences(services)
},
serializer: {
Hydrator: (services) => new DefaultHydrator(services),
JsonSerializer: (services) => new DefaultJsonSerializer(services)
},
validation: {
Expand Down
7 changes: 5 additions & 2 deletions packages/langium/src/grammar/langium-grammar-module.ts
Expand Up @@ -67,10 +67,12 @@ export const LangiumGrammarModule: Module<LangiumGrammarServices, PartialLangium
*
* @param context Shared module context, used to create additional shared modules
* @param sharedModule Existing shared module to inject together with new shared services
* @param module Additional/modified service implementations for the language services
* @returns Shared services enriched with LSP services + Grammar services, per usual
*/
export function createLangiumGrammarServices(context: DefaultSharedModuleContext,
sharedModule?: Module<LangiumSharedServices, PartialLangiumSharedServices>): {
sharedModule?: Module<LangiumSharedServices, PartialLangiumSharedServices>,
module?: Module<LangiumServices, PartialLangiumServices>): {
shared: LangiumSharedServices,
grammar: LangiumGrammarServices
} {
Expand All @@ -82,7 +84,8 @@ export function createLangiumGrammarServices(context: DefaultSharedModuleContext
const grammar = inject(
createDefaultModule({ shared }),
LangiumGrammarGeneratedModule,
LangiumGrammarModule
LangiumGrammarModule,
module
);
addTypeCollectionPhase(shared, grammar);
shared.ServiceRegistry.register(grammar);
Expand Down
1 change: 1 addition & 0 deletions packages/langium/src/node/index.ts
Expand Up @@ -5,3 +5,4 @@
******************************************************************************/

export * from './node-file-system-provider.js';
export * from './worker-thread-async-parser.js';
41 changes: 41 additions & 0 deletions packages/langium/src/node/worker-thread-async-parser.ts
@@ -0,0 +1,41 @@
/******************************************************************************
* Copyright 2023 TypeFox GmbH
* This program and the accompanying materials are made available under the
* terms of the MIT License, which is available in the project root.
******************************************************************************/

import type { LangiumCoreServices } from '../services.js';
import { ParserWorker } from '../parser/async-parser.js';
import { AbstractThreadedAsyncParser } from '../parser/async-parser.js';
import { Worker } from 'node:worker_threads';

export class WorkerThreadAsyncParser extends AbstractThreadedAsyncParser {

protected workerPath: string | (() => string);

constructor(services: LangiumCoreServices, workerPath: string | (() => string)) {
super(services);
this.workerPath = workerPath;
}

protected override createWorker(): ParserWorker {
const path = typeof this.workerPath === 'function' ? this.workerPath() : this.workerPath;
const worker = new Worker(path);
const parserWorker = new WorkerThreadParserWorker(worker);
return parserWorker;
}

}

export class WorkerThreadParserWorker extends ParserWorker {

constructor(worker: Worker) {
super(
(message) => worker.postMessage(message),
cb => worker.on('message', cb),
cb => worker.on('error', cb),
() => worker.terminate()
);
}

}
157 changes: 157 additions & 0 deletions packages/langium/src/parser/async-parser.ts
Expand Up @@ -8,6 +8,10 @@ import type { CancellationToken } from '../utils/cancellation.js';
import type { LangiumCoreServices } from '../services.js';
import type { AstNode } from '../syntax-tree.js';
import type { LangiumParser, ParseResult } from './langium-parser.js';
import type { Hydrator } from '../serializer/hydrator.js';
import type { Event } from '../utils/event.js';
import { Deferred, OperationCancelled } from '../utils/promise-utils.js';
import { Emitter } from '../utils/event.js';

/**
* Async parser that allows to cancel the current parsing process.
Expand Down Expand Up @@ -37,3 +41,156 @@ export class DefaultAsyncParser implements AsyncParser {
return Promise.resolve(this.syncParser.parse<T>(text));
}
}

export abstract class AbstractThreadedAsyncParser implements AsyncParser {

/**
* The thread count determines how many threads are used to parse files in parallel.
* The default value is 8. Decreasing this value increases startup performance, but decreases parallel parsing performance.
*/
protected threadCount = 8;
/**
* The termination delay determines how long the parser waits for a thread to finish after a cancellation request.
* The default value is 200(ms).
*/
protected terminationDelay = 200;
protected workerPool: ParserWorker[] = [];
protected queue: Array<Deferred<ParserWorker>> = [];

protected readonly hydrator: Hydrator;

constructor(services: LangiumCoreServices) {
this.hydrator = services.serializer.Hydrator;
}

protected initializeWorkers(): void {
while (this.workerPool.length < this.threadCount) {
const worker = this.createWorker();
worker.onReady(() => {
if (this.queue.length > 0) {
const deferred = this.queue.shift();
if (deferred) {
worker.lock();
deferred.resolve(worker);
}
}
});
this.workerPool.push(worker);
}
}

async parse<T extends AstNode>(text: string, cancelToken: CancellationToken): Promise<ParseResult<T>> {
const worker = await this.acquireParserWorker(cancelToken);
const deferred = new Deferred<ParseResult<T>>();
let timeout: NodeJS.Timeout | undefined;
// If the cancellation token is requested, we wait for a certain time before terminating the worker.
// Since the cancellation token lives longer than the parsing process, we need to dispose the event listener.
// Otherwise, we might accidentally terminate the worker after the parsing process has finished.
const cancellation = cancelToken.onCancellationRequested(() => {
timeout = setTimeout(() => {
this.terminateWorker(worker);
}, this.terminationDelay);
});
worker.parse(text).then(result => {
result.value = this.hydrator.hydrate(result.value);
deferred.resolve(result as ParseResult<T>);
}).catch(err => {
deferred.reject(err);
}).finally(() => {
cancellation.dispose();
clearTimeout(timeout);
});
return deferred.promise;
}

protected terminateWorker(worker: ParserWorker): void {
worker.terminate();
const index = this.workerPool.indexOf(worker);
if (index >= 0) {
this.workerPool.splice(index, 1);
}
}

protected async acquireParserWorker(cancelToken: CancellationToken): Promise<ParserWorker> {
this.initializeWorkers();
for (const worker of this.workerPool) {
if (worker.ready) {
worker.lock();
return worker;
}
}
const deferred = new Deferred<ParserWorker>();
cancelToken.onCancellationRequested(() => {
const index = this.queue.indexOf(deferred);
if (index >= 0) {
this.queue.splice(index, 1);
}
deferred.reject('OperationCancelled');
});
this.queue.push(deferred);
return deferred.promise;
}

protected abstract createWorker(): ParserWorker;
}

export type WorkerMessagePost = (message: unknown) => void;
export type WorkerMessageCallback = (cb: (message: unknown) => void) => void;

export class ParserWorker {

protected readonly sendMessage: WorkerMessagePost;
protected readonly _terminate: () => void;
protected readonly onReadyEmitter = new Emitter<void>();

protected deferred = new Deferred<ParseResult>();
protected _ready = true;
protected _parsing = false;

get ready(): boolean {
return this._ready;
}

get onReady(): Event<void> {
return this.onReadyEmitter.event;
}

constructor(sendMessage: WorkerMessagePost, onMessage: WorkerMessageCallback, onError: WorkerMessageCallback, terminate: () => void) {
this.sendMessage = sendMessage;
this._terminate = terminate;
onMessage(result => {
const parseResult = result as ParseResult;
this.deferred.resolve(parseResult);
this.unlock();
});
onError(error => {
this.deferred.reject(error);
this.unlock();
});
}

terminate(): void {
this.deferred.reject(OperationCancelled);
this._terminate();
}

lock(): void {
this._ready = false;
}

unlock(): void {
this._parsing = false;
this._ready = true;
this.onReadyEmitter.fire();
}

parse(text: string): Promise<ParseResult> {
if (this._parsing) {
throw new Error('Parser worker is busy');
}
this._parsing = true;
this.deferred = new Deferred();
this.sendMessage(text);
return this.deferred.promise;
Lotes marked this conversation as resolved.
Show resolved Hide resolved
}
}