Skip to content

Commit

Permalink
Merge pull request #45 from Sudha247/named_pools2
Browse files Browse the repository at this point in the history
Add named pools
  • Loading branch information
kayceesrk committed Sep 30, 2021
2 parents 85569fb + 340773a commit bd7de32
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 45 deletions.
77 changes: 63 additions & 14 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ type task_msg =
Task : 'a task * 'a promise -> task_msg
| Quit : task_msg

type pool =
{domains : unit Domain.t array;
task_chan : task_msg Multi_channel.t}
type pool_data = {
domains : unit Domain.t array;
task_chan : task_msg Multi_channel.t;
name: string option
}

type pool = pool_data option Atomic.t

let do_task f p =
try
Expand All @@ -22,7 +26,15 @@ let do_task f p =
| TasksActive -> raise e
| _ -> ()

let setup_pool ~num_additional_domains =
let named_pools = Hashtbl.create 8

let named_pools_mutex = Mutex.create ()

let setup_pool ?name ~num_additional_domains () =
if num_additional_domains < 0 then
raise (Invalid_argument
"Task.setup_pool: num_additional_domains must be at least 0")
else
let task_chan = Multi_channel.make (num_additional_domains+1) in
let rec worker () =
match Multi_channel.recv task_chan with
Expand All @@ -32,19 +44,34 @@ let setup_pool ~num_additional_domains =
worker ()
in
let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in
{domains; task_chan}
let p = Atomic.make (Some {domains; task_chan; name}) in
begin match name with
| None -> ()
| Some x ->
Mutex.lock named_pools_mutex;
Hashtbl.add named_pools x p;
Mutex.unlock named_pools_mutex
end;
p

let get_pool_data p =
match Atomic.get p with
| None -> raise (Invalid_argument "pool already torn down")
| Some p -> p

let async pool task =
let pd = get_pool_data pool in
let p = Atomic.make None in
Multi_channel.send pool.task_chan (Task(task,p));
Multi_channel.send pd.task_chan (Task(task,p));
p

let rec await pool promise =
let pd = get_pool_data pool in
match Atomic.get promise with
| None ->
begin
try
match Multi_channel.recv_poll pool.task_chan with
match Multi_channel.recv_poll pd.task_chan with
| Task (t, p) -> do_task t p
| Quit -> raise TasksActive
with
Expand All @@ -55,16 +82,37 @@ let rec await pool promise =
| Some (Error e) -> raise e

let teardown_pool pool =
for _i=1 to Array.length pool.domains do
Multi_channel.send pool.task_chan Quit
let pd = get_pool_data pool in
for _i=1 to Array.length pd.domains do
Multi_channel.send pd.task_chan Quit
done;
Multi_channel.clear_local_state ();
Array.iter Domain.join pool.domains
Array.iter Domain.join pd.domains;
(* Remove the pool from the table *)
begin match pd.name with
| None -> ()
| Some n ->
Mutex.lock named_pools_mutex;
Hashtbl.remove named_pools n;
Mutex.unlock named_pools_mutex
end;
Atomic.set pool None

let lookup_pool name =
Mutex.lock named_pools_mutex;
let p = Hashtbl.find_opt named_pools name in
Mutex.unlock named_pools_mutex;
p

let get_num_domains pool =
let pd = get_pool_data pool in
Array.length pd.domains + 1

let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
let pd = get_pool_data pool in
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_domains = (Array.length pd.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
Expand All @@ -88,9 +136,10 @@ let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun ini
reduce_fun init (work start finish)

let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
let pd = get_pool_data pool in
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_domains = (Array.length pd.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
Expand All @@ -109,7 +158,7 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
work pool body start finish

let parallel_scan pool op elements =

let pd = get_pool_data pool in
let scan_part op elements prefix_sum start finish =
assert (Array.length elements > (finish - start));
for i = (start + 1) to finish do
Expand All @@ -123,7 +172,7 @@ let parallel_scan pool op elements =
done
in
let n = Array.length elements in
let p = (Array.length pool.domains) + 1 in
let p = (Array.length pd.domains) + 1 in
let prefix_s = Array.copy elements in

parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1)
Expand Down
14 changes: 12 additions & 2 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ type 'a promise
type pool
(** Type of task pool *)

val setup_pool : num_additional_domains:int -> pool
val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool
(** Sets up a task execution pool with [num_additional_domains + 1] domains
* including the current domain *)
* including the current domain. If [name] is provided, the pool is mapped to
* [name] which can be looked up later with [lookup_pool name].
* Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *)

exception TasksActive

val teardown_pool : pool -> unit
(** Tears down the task execution pool.
* Raises [TasksActive] exception if any tasks are currently active. *)

val lookup_pool : string -> pool option
(** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or
* returns [None] if no value is associated to it. *)

val get_num_domains : pool -> int
(** [get_num_domains pool] returns the total number of domains in [pool]
* including the parent domain. *)

val async : pool -> 'a task -> 'a promise
(** [async p t] runs the task [t] asynchronously in the pool [p]. The function
* returns a promise [r] in which the result of the task [t] will be stored.
Expand Down
2 changes: 1 addition & 1 deletion test/LU_decomposition_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ let lup pool (a0 : float array) =
a

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let a = parallel_create pool
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
let lu = lup pool a in
Expand Down
2 changes: 1 addition & 1 deletion test/enumerate_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100
module T = Domainslib.Task

let _ =
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i ->
print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i);
T.teardown_pool p
2 changes: 1 addition & 1 deletion test/fib_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let rec fib_par pool n =
T.await pool a + T.await pool b

let main =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let res = fib_par pool n in
T.teardown_pool pool;
Printf.printf "fib(%d) = %d\n" n res
2 changes: 1 addition & 1 deletion test/game_of_life_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let rec repeat pool n =
| _-> next pool; repeat pool (n-1)

let ()=
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
print !rg;
repeat pool n_times;
print !rg;
Expand Down
2 changes: 1 addition & 1 deletion test/prefix_sum.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*)
let prefix_sum pool = T.parallel_scan pool (+)

let _ =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let arr = gen n in
let t = Unix.gettimeofday() in
let _ = prefix_sum pool arr in
Expand Down
2 changes: 1 addition & 1 deletion test/spectralnorm2_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v =
eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let u = Array.make n 1.0 and v = Array.make n 0.0 in
for _i = 0 to 9 do
eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
Expand Down
6 changes: 3 additions & 3 deletions test/sum_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module T = Domainslib.Task

let _ =
(* use parallel_for_reduce *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum =
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
~finish:(n-1) ~body:(fun _i -> 1)
Expand All @@ -16,7 +16,7 @@ let _ =

let _ =
(* explictly use empty pool and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:0 in
let p = T.setup_pool ~num_additional_domains:0 () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand All @@ -27,7 +27,7 @@ let _ =

let _ =
(* configured num_domains and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand Down
2 changes: 1 addition & 1 deletion test/summed_area_table.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let calc_table pool mat =
let _ =
let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*)
in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let _ = calc_table pool m in

(* for i = 0 to size-1 do
Expand Down
2 changes: 1 addition & 1 deletion test/task_exn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module T = Domainslib.Task
exception E

let _ =
let pool = T.setup_pool ~num_additional_domains:3 in
let pool = T.setup_pool ~num_additional_domains:3 () in

let p1 = T.async pool (fun () ->
let p2 = T.async pool (fun () -> raise E) in
Expand Down
2 changes: 1 addition & 1 deletion test/task_throughput.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ end
let _ =
Printf.printf "n_iterations: %d n_units: %d n_domains: %d\n"
n_iterations n_tasks n_domains;
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) () in

let hist = TimingHist.make 5 25 in
for _ = 1 to n_iterations do
Expand Down
49 changes: 32 additions & 17 deletions test/test_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,35 @@ let prefix_sum pool = fun () ->


let () =
let pool = Task.setup_pool ~num_additional_domains:3 in
modify_arr pool 0 ();
modify_arr pool 25 ();
modify_arr pool 100 ();
inc_ctr pool 0 ();
inc_ctr pool 16 ();
inc_ctr pool 32 ();
inc_ctr pool 1000 ();
sum_sequence pool 0 0 ();
sum_sequence pool 10 10 ();
sum_sequence pool 1 0 ();
sum_sequence pool 1 10 ();
sum_sequence pool 100 10 ();
sum_sequence pool 100 100 ();
prefix_sum pool ();
Task.teardown_pool pool;
print_endline "ok"
let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in
let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in
let p1 = Option.get @@ Task.lookup_pool "pool1" in
modify_arr pool1 0 ();
modify_arr pool1 25 ();
modify_arr pool1 100 ();
inc_ctr p1 0 ();
inc_ctr p1 16 ();
inc_ctr p1 32 ();
inc_ctr p1 1000 ();
let p2 = Option.get @@ Task.lookup_pool "pool2" in
sum_sequence pool2 0 0 ();
sum_sequence pool2 10 10 ();
sum_sequence pool2 1 0 ();
sum_sequence p2 1 10 ();
sum_sequence p2 100 10 ();
sum_sequence p2 100 100 ();
prefix_sum p2 ();
Task.teardown_pool pool1;
Task.teardown_pool pool2;

try
sum_sequence pool2 0 0 ();
assert false
with Invalid_argument _ -> ();

assert (Task.lookup_pool "pool1" = None);

try
let _ = Task.setup_pool ~num_additional_domains:(-1) () in ()
with Invalid_argument _ -> ();
print_endline "ok"

0 comments on commit bd7de32

Please sign in to comment.