Skip to content

Commit

Permalink
Web Locks API initial implementation
Browse files Browse the repository at this point in the history
Refs: #416
  • Loading branch information
tshemsedinov committed Mar 28, 2019
1 parent 8ad8ab6 commit aefe306
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
1 change: 1 addition & 0 deletions .eslintignore
@@ -1 +1,2 @@
dist
locks.js
101 changes: 101 additions & 0 deletions lib/locks.js
@@ -0,0 +1,101 @@
'use strict';

const { Worker, isMainThread, parentPort } = require('worker_threads');

const threads = new Set();

const LOCKED = 0;
const UNLOCKED = 1;

const sendMessage = message => {
if (isMainThread) {
for (const thread of threads) {
thread.worker.postMessage(message);
}
} else {
parentPort.postMessage(message);
}
};

class Mutex {
constructor(resourceName, shared, initial = false) {
this.resourceName = resourceName;
this.lock = new Int32Array(shared, 0, 1);
if (initial) Atomics.store(this.lock, 0, UNLOCKED);
this.owner = false;
this.trying = false;
this.callback = null;
}

enter(callback) {
this.callback = callback;
this.trying = true;
this.tryEnter();
}

tryEnter() {
if (!this.callback) return;
const prev = Atomics.exchange(this.lock, 0, LOCKED);
if (prev === UNLOCKED) {
this.owner = true;
this.trying = false;
this.callback(this).then(() => {
this.leave();
});
this.callback = null;
}
}

leave() {
if (!this.owner) return;
Atomics.store(this.lock, 0, UNLOCKED);
this.owner = false;
sendMessage({ kind: 'leave', resourceName: this.resourceName });
}
}

const resources = new Map();

const request = (resourceName, callback) => {
let lock = resources.get(resourceName);
if (!lock) {
const buffer = new SharedArrayBuffer(4);
lock = new Mutex(resourceName, buffer, true);
resources.set(resourceName, lock);
sendMessage({ kind: 'create', resourceName, buffer });
}
lock.enter(callback);
return lock;
};

const receiveMessage = message => {
const { kind, resourceName, buffer } = message;
if (kind === 'create') {
const lock = new Mutex(resourceName, buffer);
resources.set(resourceName, lock);
}
};

if (!isMainThread) {
parentPort.on('message', receiveMessage);
}

class Thread {
constructor() {
const worker = new Worker(__filename);
this.worker = worker;
threads.add(this);
worker.on('message', message => {
for (const thread of threads) {
if (thread.worker !== worker) {
thread.worker.postMessage(message);
}
}
receiveMessage(message);
});
}
}

const locks = { resources, request, sendMessage, receiveMessage, Thread };

module.exports = { locks };
4 changes: 4 additions & 0 deletions metasync.js
Expand Up @@ -22,5 +22,9 @@ if (nodeVerion >= 10) {
submodules.push(require('./lib/async-iterator'));
}

if (nodeVerion >= 11) {
submodules.push(require('./lib/locks'));
}

const { compose } = submodules[0];
module.exports = Object.assign(compose, ...submodules);

0 comments on commit aefe306

Please sign in to comment.