Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous (concurrent-safe) predicates #325

Open
arcanis opened this issue Jun 21, 2022 · 8 comments
Open

Asynchronous (concurrent-safe) predicates #325

arcanis opened this issue Jun 21, 2022 · 8 comments
Assignees
Labels
concurrent module Issues related to Tau Prolog's Concurrent module question Questions request Requested features

Comments

@arcanis
Copy link

arcanis commented Jun 21, 2022

In Yarn, we offer a way to inspect information about the project dependencies via Prolog. At the moment this process is completely synchronous, but I'm considering adding some new predicates, one of which lets you obtain the latest release available for a given dependency range - which requires to query the registry. While it's reasonably fast in parallel (~10s for all dependencies in an average project), it's much slower when done sequentially (~2mns).

Do you see a way we could make it possible to parallelize some parts of the execution? Perhaps by having a "fork point" predicate or syntax from which the interpreter would compute all the possible answers up until now, fork them, continue the execution into each individual thread, and coalesce all those result sets into a single one?

@jariazavalverde
Copy link
Collaborator

The set_timeout/3 predicate from the os module allows you to call predicates "in parallel" (if goals perform asynchronous tasks):

:- use_module(library(os)).

top :-
    set_timeout(0, (sleep(3000), write(a)), _),
    set_timeout(0, (sleep(2000), write(b)), _),
    set_timeout(0, (sleep(1000), write(c)), _).
?- top.
true.
cba

If you don't need to collect the answers, this predicate should suffice. But, for what you ask, it is a good idea to add a promises module in the promises package. I will look at it this week.

@arcanis
Copy link
Author

arcanis commented Jun 22, 2022

One thing I tried was to make my custom module predicate return a promise identifier, which could be passed to an await/2 predicate to actually block on the promise. It worked, but wasn't very convenient to work with - parallelizing the work required something similar to this:

:-

% This `bagof` call forces evaluation of all the `suggested_package_range` answers,
% before the execution blocks on the subsequent `await`
findall(Promise, suggested_package_range(Cwd, Name, Range, Promise), PromiseList),

% Then we turn back the result list into individual answers
member(Promise, PromiseList),

% And finally we can await each of them individually (they are all running in the background,
% so blocking the execution isn't a problem)
await(Promise, Latest).

It'd be interesting if there was a meta predicate abstracting this writing so that it'd become:

parallel(suggested_package_range(Cwd, Ident, Range, Latest), Latest)

@jariazavalverde
Copy link
Collaborator

jariazavalverde commented Jun 22, 2022

I just added the basic predicates for the library:

  • future(?Template, +Goal, -Future)
  • await(+Future, ?Result)
  • future_all(+ListOfFutures, -Future)
  • future_any(+ListOfFutures, -Future)

Example: http://tau-prolog.org/sandbox/I6q4S1lY

:- use_module(library(os)).
:- use_module(library(concurrent)).

top(X) :-
    future(X, (sleep(3000), X = a), F1),
    future(X, (sleep(1000), X = b), F2),
    future(X, (sleep(1000), X = c), F3),
    get_time(T0),
    future_all([F1,F2,F3], F),
    await(F, X),
    get_time(T1),
    Time is T1 - T0,
    write(time(Time)).
?- top(X).
X = [a,b,c].
time(3003.0)

If the suggested_package_range/4 predicate returns futures on backtracking:

?- findall(Future, suggested_package_range(Cwd, Name, Range, Future), FutureList),
   future_all(FutureList, Future),
   await(Future, Latest).

@arcanis
Copy link
Author

arcanis commented Jun 23, 2022

Thanks for looking into that! I tried it with Yarn, it's a nice start, here are my observations:

  • The concurrent module doesn't wrap its pl extensions into an extend function (unlike what the promises module does), so it doesn't work on Node without modifications

  • Calling link.toJavaScript crashes on future - the alternative is to check that we're working with anything but a future and handle this case separately, but that's more complicated for lists of futures (or even deeper); perhaps turning them into null or the string "<Promise>" would be better

  • I expected future_all(FutureList, FutureAll), await(FutureAll, Result) to put a list of resolved values inside Result, but it seems to generate one answer for each value in the array (as if there was an implicit member(Result, ResultList)). Is it expected?

In terms of performance, I must be doing something wrong. To give you an idea, my last experiment (yarnpkg/berry#4566) involved creating a special operator (|). When encountered, the left side answers are all retrieved, then the right side is evaluated against each answer (inside a new thread). With this workflow, the following command (which you can test by just cloning my PR and running the command inside the repo) takes ~12s on my machine:

$ time yarn constraints query --json "workspace_has_dependency(Cwd, Ident, Range, _) | suggested_package_range(Cwd, Ident, Range, Future), await(Future, Latest)" | wc -l

599
12.22s user 1.79s system 111% cpu 12.577 total

On the other hand, using findall and future_all, I get much worse perfs (more than a minute):

$ time yarn constraints query --json "workspace_has_dependency(Cwd, Ident, Range, _), findall(Future, suggested_package_range(Cwd, Ident, Range, Future), FutureList), future_all(FutureList, FutureAll), await(FutureAll, Latest)" | wc -l

599
16.40s user 2.77s system 27% cpu 1:09.18 total

To evidence that the execution didn't start in parallel, we can see that the naive command (where we don't parallelize anything, we just immediately await all futures) takes as much time:

time yarn constraints query --json "workspace_has_dependency(Cwd, Ident, Range, _), suggested_package_range(Cwd, Ident, Range, Future), await(Future, Latest)" | wc -l

599
14.09s user 2.03s system 25% cpu 1:04.26 total

It's almost as if the findall had no effect - do you have any idea what I might be doing wrong? This is my implementation for suggested_package_range:

https://github.com/yarnpkg/berry/blob/d051f2e55e6a1b3131d3fa7a8c3bdf8be78b3f8d/packages/plugin-constraints/sources/tauModule.ts#L47-L89

@jariazavalverde
Copy link
Collaborator

jariazavalverde commented Jun 23, 2022

Oh, I think I misunderstood your predicates.

If suggested_package_range/4 provides an answer for each answer to workspace_has_dependency/4, you should do something like this:

?- findall(Future, (
       workspace_has_dependency(Cwd, Ident, Range, _),
       suggested_package_range(Cwd, Ident, Range, Future)
   ), FutureList),
   future_all(FutureList, FutureAll),
   await(FutureAll, Latest).

Furthermore, you don't need to directly use pl.type.Future in the implementation of suggested_package_range/4. Just make an async predicate that pushes a new state on the stack with the response when available. Then you will be able to create the future from Prolog:

[`suggested_package_range/4`]: (thread, point, atom) => {
  const [workspaceCwdAtom, packageIdentAtom, packageRangeAtom, suggestedRangeVar] = atom.args;

  if (!isAtom(workspaceCwdAtom) || !isAtom(packageIdentAtom) || !isAtom(packageRangeAtom) || !isVariable(suggestedRangeVar)) {
    thread.throw_error(pl.error.instantiation(atom.indicator));
    return undefined;
  }

  const promise = Promise.resolve().then(async () => {
    const project = getProject(thread);
    const workspace = project.getWorkspaceByCwd(workspaceCwdAtom.id as any);
    const cache = await getCache(thread);

    const ident = structUtils.parseIdent(packageIdentAtom.id);
    const range = packageRangeAtom.id;

    let updated: Descriptor | null;
    try {
      updated = await suggestUtils.fetchDescriptorFrom(ident, range, {
        project,
        cache,
        workspace
      });
    } catch {
      updated = null;
    }

    return updated?.range;
  });

  promise.then(result => {
    thread.prepend([new pl.type.State(
      point.goal.replace(new pl.type.Term(`=`, [suggestedRangeVar, new pl.type.Term(String(result))])),
      point.substitution,
      point
    )]);
    thread.again();
  }, error => {
    thread.throw_error(new pl.type.Term(String(error)));
    thread.again();
  });

  return true;
}
?- findall(Future, (
       workspace_has_dependency(Cwd, Ident, Range, _),
       future(X, suggested_package_range(Cwd, Ident, Range, X), Future)
   ), FutureList),
   future_all(FutureList, FutureAll),
   await(FutureAll, Latest).

The concurrent module doesn't wrap its pl extensions into an extend function (unlike what the promises module does), so it doesn't work on Node without modifications

Fixed.

Calling link.toJavaScript crashes on future - the alternative is to check that we're working with anything but a future and handle this case separately, but that's more complicated for lists of futures (or even deeper); perhaps turning them into null or the string "" would be better

I will look at it.

I expected future_all(FutureList, FutureAll), await(FutureAll, Result) to put a list of resolved values inside Result, but it seems to generate one answer for each value in the array (as if there was an implicit member(Result, ResultList)). Is it expected?

future_all/2 takes a list of futures and returns a single Future that resolves to a list of the results of the input futures. Similar to Promise.all. I thought that suggested_package_range/4 itself generated multiple answers on backtracking, but this is not the case.

@jariazavalverde
Copy link
Collaborator

jariazavalverde commented Jun 23, 2022

If the Promise object exists, Tau Prolog futures are translated to JavaScript promises, and JavaScript promises can be translated to Tau Prolog futures.

Example (from JavaScript):

var p = new Promise((resolve, reject) => setTimeout(() => resolve("done"), 10000));
var q = new Promise((resolve, reject) => setTimeout(() => reject("error"), 10000));
?- get_prop(p, FutureP), await(FutureP, Value).
Future = <future>(done), Value = done. % after x seconds (depending on when p is defined)

?- get_prop(q, FutureQ), await(FutureQ, Value).
% uncaught exception: error

Example (to JavaScript):

...
session.query("future(X, (sleep(10000), X = done), F).");
session.answer(answer => answer.lookup("F").toJavaScript().then(x => console.log(x))); // done (after 10 seconds)

@jariazavalverde jariazavalverde added the concurrent module Issues related to Tau Prolog's Concurrent module label Jun 24, 2022
@arcanis
Copy link
Author

arcanis commented Jun 27, 2022

If suggested_package_range/4 provides an answer for each answer to workspace_has_dependency/4, you should do something like this:

?- findall(Future, (
       workspace_has_dependency(Cwd, Ident, Range, _),
       suggested_package_range(Cwd, Ident, Range, Future)
   ), FutureList),
   future_all(FutureList, FutureAll),
   await(FutureAll, Latest).

Oh, I didn't realize the workspace_has_dependency had to be inside the meta-predicate rule, although that makes sense (I guess that otherwise the interpreter was spawning one separate future for each package resolution request, essentially nullifying the parallelism).

However, moving it inside findall prevents me from getting the values for Cwd, Ident, and Range. Do you see a way I could retrieve them? I tried replacing findall by bagof, but then the execution is slow again.

I think I'd need a predicate that would store the answers into a future opaque object, then another that would await this future and unpack the answers 🤔

@jariazavalverde
Copy link
Collaborator

You can add Cwd, Ident and Range to the template of the future/3 call (you must modify suggested_package_range/4 as indicated in the previous message):

?- findall(Future, (
       workspace_has_dependency(Cwd, Ident, Range, _),
       future(package(Cwd, Ident, Range, X), suggested_package_range(Cwd, Ident, Range, X), Future)
   ), FutureList),
   future_all(FutureList, FutureAll),
   await(FutureAll, Latest).

Now, Latest is a list of terms in the form package(_, _, _, _) that contains all the info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
concurrent module Issues related to Tau Prolog's Concurrent module question Questions request Requested features
Projects
None yet
Development

No branches or pull requests

2 participants