Skip to content

Commit

Permalink
Skip useless tasks in the worker to improve fast scrolling with scann…
Browse files Browse the repository at this point in the history
…ed books (bug 1866296)

When a page rendering is cancelled, a task is sent to the worker but before it's executed
the rendering task is: the cancel task is more or less useless in this case.
So in using the fact that draining the message queue has a higher priority
than draining the event one, it's possible to get all the current tasks, hence
it's possible to cancel some tasks which are before a cancel task.
  • Loading branch information
calixteman committed Dec 12, 2023
1 parent 76e3e52 commit a85b8bc
Showing 1 changed file with 112 additions and 57 deletions.
169 changes: 112 additions & 57 deletions src/shared/message_handler.js
Expand Up @@ -70,6 +70,14 @@ function wrapReason(reason) {
}

class MessageHandler {
#cancelledStreamIds = new Set();

#executorRunning = false;

#isPostponed = false;

#queue = [];

constructor(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
Expand All @@ -81,71 +89,116 @@ class MessageHandler {
this.callbackCapabilities = Object.create(null);
this.actionHandler = Object.create(null);

this._onComObjOnMessage = event => {
const data = event.data;
if (data.targetName !== this.sourceName) {
return;
}
if (data.stream) {
this.#processStreamMessage(data);
return;
}
if (data.callback) {
const callbackId = data.callbackId;
const capability = this.callbackCapabilities[callbackId];
if (!capability) {
throw new Error(`Cannot resolve callback ${callbackId}`);
this._onComObjOnMessage = ({ data }) => {
if (data.targetName === this.sourceName) {
// The meesages in the worker queue are processed with a
// higher priority than the tasks in the event queue.
// So, postponing the task execution, will ensure that the message
// queue is drained.
// If at some point we've a cancelled task (e.g. GetOperatorList),
// we're able to skip the task execution with the same pageId.
this.#queue.push(data);
this.#isPostponed ||= data.action === "GetOperatorList";
if (data.stream === StreamKind.CANCEL) {
this.#cancelledStreamIds.add(data.streamId);
}
delete this.callbackCapabilities[callbackId];

if (data.callback === CallbackKind.DATA) {
capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
throw new Error("Unexpected callback case");
if (!this.#executorRunning) {
this.#executorRunning = true;
this.#postponeExecution();
}
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
};
comObj.addEventListener("message", this._onComObjOnMessage);
}

#postponeExecution() {
if (this.#isPostponed) {
setTimeout(this.#executor.bind(this), 0);
} else {
this.#executor();
}
}

#executor() {
if (this.#queue.length === 0) {
this.#cancelledStreamIds.clear();
this.#executorRunning = false;
return;
}

const data = this.#queue.shift();
const { stream, streamId } = data;

if (stream) {
if (
stream === StreamKind.CANCEL ||
!this.#cancelledStreamIds.has(streamId)
) {
this.#processStreamMessage(data);
}
if (data.callbackId) {
const cbSourceName = this.sourceName;
const cbTargetName = data.sourceName;
this.#postponeExecution();
return;
}

new Promise(function (resolve) {
resolve(action(data.data));
}).then(
function (result) {
comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.DATA,
callbackId: data.callbackId,
data: result,
});
},
function (reason) {
comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.ERROR,
callbackId: data.callbackId,
reason: wrapReason(reason),
});
}
);
return;
if (streamId && this.#cancelledStreamIds.has(streamId)) {
this.#postponeExecution();
return;
}

if (data.callback) {
const callbackId = data.callbackId;
const capability = this.callbackCapabilities[callbackId];
if (!capability) {
throw new Error(`Cannot resolve callback ${callbackId}`);
}
if (data.streamId) {
this.#createStreamSink(data);
return;
delete this.callbackCapabilities[callbackId];

if (data.callback === CallbackKind.DATA) {
capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
throw new Error("Unexpected callback case");
}
this.#postponeExecution();
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
}
if (data.callbackId) {
const cbSourceName = this.sourceName;
const cbTargetName = data.sourceName;

new Promise(function (resolve) {
resolve(action(data.data));
}).then(
result => {
this.comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.DATA,
callbackId: data.callbackId,
data: result,
});
},
reason => {
this.comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.ERROR,
callbackId: data.callbackId,
reason: wrapReason(reason),
});
}
);
} else if (data.streamId) {
this.#createStreamSink(data);
} else {
action(data.data);
};
comObj.addEventListener("message", this._onComObjOnMessage);
}
this.#postponeExecution();
}

on(actionName, handler) {
Expand Down Expand Up @@ -224,6 +277,7 @@ class MessageHandler {
sourceName = this.sourceName,
targetName = this.targetName,
comObj = this.comObj;
const pageId = data?.pageId;

return new ReadableStream(
{
Expand Down Expand Up @@ -276,6 +330,7 @@ class MessageHandler {
targetName,
stream: StreamKind.CANCEL,
streamId,
pageId,
reason: wrapReason(reason),
});
// Return Promise to signal success or failure.
Expand Down

0 comments on commit a85b8bc

Please sign in to comment.