Skip to content

Commit

Permalink
Implement Lock and LockManager
Browse files Browse the repository at this point in the history
Refs: #416
  • Loading branch information
tshemsedinov committed Mar 28, 2019
1 parent 161590c commit 465e50b
Showing 1 changed file with 94 additions and 33 deletions.
127 changes: 94 additions & 33 deletions lib/locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const { Worker, isMainThread, parentPort } = require('worker_threads');

const threads = new Set();
const resources = new Map();

const LOCKED = 0;
const UNLOCKED = 1;
Expand All @@ -17,62 +18,95 @@ const sendMessage = message => {
}
};

class Lock {
constructor(name, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
const { mode, ifAvailable, steal } = options;
this.name = name;
this.mode = mode || 'exclusive';
this.ifAvailable = ifAvailable || false;
this.steal = steal || false;
this.callback = callback;
}
}

class Mutex {
constructor(resourceName, shared, initial = false) {
this.resourceName = resourceName;
this.lock = new Int32Array(shared, 0, 1);
if (initial) Atomics.store(this.lock, 0, UNLOCKED);
constructor(resourceName, buffer, initial = false) {
this.name = resourceName;
this.flag = new Int32Array(buffer, 0, 1);
if (initial) Atomics.store(this.flag, 0, UNLOCKED);
this.owner = false;
this.trying = false;
this.callback = null;
this.queue = [];
this.current = null;
}

enter(callback) {
this.callback = callback;
enter(lock) {
this.queue.push(lock);
this.trying = true;
this.tryEnter();
return this.tryEnter();
}

tryEnter() {
if (!this.callback) return;
const prev = Atomics.exchange(this.lock, 0, LOCKED);
if (prev === UNLOCKED) {
this.owner = true;
this.trying = false;
this.callback(this).then(() => {
this.leave();
});
this.callback = null;
}
if (this.queue.length === 0) return;
const prev = Atomics.exchange(this.flag, 0, LOCKED);
if (prev === LOCKED) return;
this.owner = true;
this.trying = false;
const lock = this.queue.shift();
this.current = lock;
return lock.callback(lock).then(() => {
this.leave();
});
}

enterIfAvailable(lock) {
if (this.owner) return lock.callback();
const prev = Atomics.exchange(this.flag, 0, LOCKED);
if (prev === LOCKED) return lock.callback();
this.owner = true;
this.trying = false;
this.current = lock;
return lock.callback(lock).then(() => {
this.leave();
});
}

leave() {
if (!this.owner) return;
Atomics.store(this.lock, 0, UNLOCKED);
Atomics.store(this.flag, 0, UNLOCKED);
this.owner = false;
sendMessage({ kind: 'leave', resourceName: this.resourceName });
this.current = null;
sendMessage({ kind: 'leave', resourceName: this.name });
this.tryEnter();
}
}

const resources = new Map();

const request = (resourceName, callback) => {
let lock = resources.get(resourceName);
if (!lock) {
const request = (resourceName, options, callback) => {
const lock = new Lock(resourceName, options, callback);
let mutex = resources.get(resourceName);
if (!mutex) {
const buffer = new SharedArrayBuffer(4);
lock = new Mutex(resourceName, buffer, true);
resources.set(resourceName, lock);
mutex = new Mutex(resourceName, buffer, true);
resources.set(resourceName, mutex);
sendMessage({ kind: 'create', resourceName, buffer });
}
lock.enter(callback);
return lock;
if (lock.ifAvailable) return mutex.enterIfAvailable(lock);
return mutex.enter(lock);
};

const receiveMessage = message => {
const { kind, resourceName, buffer } = message;
if (kind === 'create') {
const lock = new Mutex(resourceName, buffer);
resources.set(resourceName, lock);
const mutex = new Mutex(resourceName, buffer);
resources.set(resourceName, mutex);
} else if (kind === 'leave') {
for (const mutex of resources) {
if (mutex.trying) mutex.tryEnter();
}
}
};

Expand All @@ -96,6 +130,33 @@ class Thread {
}
}

const locks = { resources, request, sendMessage, receiveMessage, Thread };
class LockManagerSnapshot {
constructor() {
const held = [];
const pending = [];
this.held = held;
this.pending = pending;

for (const mutex of resources) {
if (mutex.queue.length > 0) {
pending.push(...mutex.queue);
}
if (mutex.current) {
held.push(mutex.current);
}
}
}
}

class LockManager {
constructor() {
this.request = request;
this.Thread = Thread;
}
query() {
const snapshot = new LockManagerSnapshot();
return Promise.resolve(snapshot);
}
}

module.exports = { locks };
module.exports = { locks: new LockManager() };

0 comments on commit 465e50b

Please sign in to comment.