Skip to content

Commit

Permalink
Update to eio.0.7 and catch IO errors
Browse files Browse the repository at this point in the history
  • Loading branch information
patricoferris committed Jan 10, 2023
1 parent 1b0e377 commit ecfabf4
Show file tree
Hide file tree
Showing 50 changed files with 316 additions and 315 deletions.
21 changes: 11 additions & 10 deletions src/irmin-chunk/irmin_chunk.ml
Expand Up @@ -151,9 +151,7 @@ struct
| Chunk.Index i ->
List.fold_left
(fun acc key ->
match CA.find t.db key with
| None -> acc
| Some v -> aux acc v)
match CA.find t.db key with None -> acc | Some v -> aux acc v)
acc i
in
aux [] root |> List.rev
Expand All @@ -178,7 +176,9 @@ struct
else List.length l
in
match list_partition n l with
| [ i ] -> AO.add t.db key (index t i); key
| [ i ] ->
AO.add t.db key (index t i);
key
| l -> Fiber.List.map (fun i -> CA.add t.db (index t i)) l |> aux)
in
aux l
Expand Down Expand Up @@ -212,16 +212,18 @@ struct
let k' = H.hash (pre_hash_value v) in
if equal_key k k' then ()
else
Fmt.kstr failwith "corrupted value: got %a, expecting %a"
pp_key k' pp_key k
Fmt.kstr failwith "corrupted value: got %a, expecting %a" pp_key k' pp_key
k

let find t key =
match find_leaves t key with
| None -> None
| Some bufs -> (
let buf = String.concat "" bufs in
match value_of_bin_string buf with
| Ok va -> check_hash key va; Some va
| Ok va ->
check_hash key va;
Some va
| Error _ -> None)

let list_range ~init ~stop ~step =
Expand All @@ -232,10 +234,9 @@ struct

let unsafe_add_buffer t key buf =
let len = String.length buf in
if len <= t.max_data then begin
if len <= t.max_data then (
AO.add t.db key (data t buf);
[%log.debug "add -> %a (no split)" pp_key key]
end
[%log.debug "add -> %a (no split)" pp_key key])
else
let offs = list_range ~init:0 ~stop:len ~step:t.max_data in
let aux off =
Expand Down
2 changes: 1 addition & 1 deletion src/irmin-containers/linked_log.ml
Expand Up @@ -89,7 +89,7 @@ module type S = sig
type cursor

val get_cursor : path:Store.path -> Store.t -> cursor
val read : num_items:int -> cursor -> (value list * cursor)
val read : num_items:int -> cursor -> value list * cursor
end

module Make
Expand Down
2 changes: 1 addition & 1 deletion src/irmin-containers/linked_log.mli
Expand Up @@ -33,7 +33,7 @@ module type S = sig
val get_cursor : path:Store.path -> Store.t -> cursor
(** Create a new cursor over the log entires at the given path *)

val read : num_items:int -> cursor -> (value list * cursor)
val read : num_items:int -> cursor -> value list * cursor
(** Read at most [num_items] entries from the cursor. If the number specified
is greater than the number of log entries from the cursor, the log is read
till the end. If the input cursor has already reached the end, then an
Expand Down
4 changes: 1 addition & 3 deletions src/irmin-containers/lww_register.ml
Expand Up @@ -41,9 +41,7 @@ module type S = sig
type value

val read : path:Store.path -> Store.t -> value option

val write :
?info:Store.Info.f -> path:Store.path -> Store.t -> value -> unit
val write : ?info:Store.Info.f -> path:Store.path -> Store.t -> value -> unit
end

module Make (Backend : Irmin.KV_maker) (T : Time.S) (V : Irmin.Type.S) = struct
Expand Down
3 changes: 1 addition & 2 deletions src/irmin-containers/lww_register.mli
Expand Up @@ -34,8 +34,7 @@ module type S = sig
val read : path:Store.path -> Store.t -> value option
(** Reads the value from the register. Returns [None] if no value is written *)

val write :
?info:Store.Info.f -> path:Store.path -> Store.t -> value -> unit
val write : ?info:Store.Info.f -> path:Store.path -> Store.t -> value -> unit
(** Writes the provided value to the register *)
end

Expand Down
22 changes: 13 additions & 9 deletions src/irmin-fs/irmin_fs.ml
Expand Up @@ -307,11 +307,11 @@ module Obj = struct
let file_of_key k =
let pre = String.with_range k ~len:2 in
let suf = String.with_range k ~first:2 in
let ( / ) = Filename.concat in
let ( / ) = Filename.concat in
"objects" / pre / suf

let key_of_file path =
let ( / ) = Filename.concat in
let ( / ) = Filename.concat in
let path = string_chop_prefix ~prefix:("objects" / "") path in
let path = String.cuts ~sep:Filename.dir_sep path in
let path = String.concat ~sep:"" path in
Expand Down Expand Up @@ -372,7 +372,8 @@ module IO_mem = struct

let rec_files (_, dir) =
Hashtbl.fold
(fun ((_, k) as v) _ acc -> if String.is_prefix ~affix:dir k then v :: acc else acc)
(fun ((_, k) as v) _ acc ->
if String.is_prefix ~affix:dir k then v :: acc else acc)
t.files []

let file_exists file = Hashtbl.mem t.files file
Expand Down Expand Up @@ -427,9 +428,12 @@ let run (fs : Fs.dir Path.t) fn =
Switch.run @@ fun sw ->
Irmin.Backend.Watch.set_watch_switch sw;
let open Effect.Deep in
try_with fn () {
effc = fun (type a) (e : a Effect.t) ->
match e with
| Irmin.Backend.Conf.Env.Fs -> Some (fun (k : (a, _) continuation) -> continue k fs)
| _ -> None
}
try_with fn ()
{
effc =
(fun (type a) (e : a Effect.t) ->
match e with
| Irmin.Backend.Conf.Env.Fs ->
Some (fun (k : (a, _) continuation) -> continue k fs)
| _ -> None);
}
2 changes: 1 addition & 1 deletion src/irmin-fs/irmin_fs.mli
Expand Up @@ -109,4 +109,4 @@ module IO_mem : sig
val set_listen_hook : unit -> unit
end

val run : Eio.Fs.dir Eio.Path.t -> (unit -> 'a) -> 'a
val run : Eio.Fs.dir Eio.Path.t -> (unit -> 'a) -> 'a
118 changes: 62 additions & 56 deletions src/irmin-fs/unix/eio_pool.ml
Expand Up @@ -18,39 +18,45 @@ type 'a t = {
list : 'a Queue.t;
(* Available pool members. *)
waiters : ('a, exn) result Promise.u Stream.t;
(* Promise resolvers waiting for a free member. *)
(* Promise resolvers waiting for a free member. *)
}

let create m ?(validate = fun _ -> true) ?(check = fun _ f -> f true) ?(dispose = fun _ -> ()) create =
{ max = m;
create = create;
validate = validate;
check = check;
dispose = dispose;
let create m ?(validate = fun _ -> true) ?(check = fun _ f -> f true)
?(dispose = fun _ -> ()) create =
{
max = m;
create;
validate;
check;
dispose;
cleared = ref (ref false);
count = 0;
list = Queue.create ();
waiters = Stream.create m }
waiters = Stream.create m;
}
(* Create a pool member. *)
let create_member p =
try
(* Must be done before p.create to prevent other resolvers from
creating new members if the limit is reached. *)
p.count <- p.count + 1;
p.create ()
(* Must be done before p.create to prevent other resolvers from
creating new members if the limit is reached. *)
p.count <- p.count + 1;
p.create ()
with exn ->
(* Creation failed, so don't increment count. *)
p.count <- p.count - 1;
raise exn
(* Creation failed, so don't increment count. *)
p.count <- p.count - 1;
raise exn
(* Release a pool member. *)
let release p c =
match Stream.take_nonblocking p.waiters with
| Some wakener ->
(* A promise resolver is waiting, give it the pool member. *)
Promise.resolve_ok wakener c
(* A promise resolver is waiting, give it the pool member. *)
Promise.resolve_ok wakener c
| None ->
(* No one is waiting, queue it. *)
Queue.push c p.list
(* No one is waiting, queue it. *)
Queue.push c p.list
(* Dispose of a pool member. *)
let dispose p c =
p.dispose c;
Expand All @@ -61,81 +67,80 @@ let dispose p c =
let replace_disposed p =
match Stream.take_nonblocking p.waiters with
| None ->
(* No one is waiting, do not create a new member to avoid
losing an error if creation fails. *)
()
| Some wakener ->
match p.create () with
| c -> Promise.resolve_ok wakener c
| exception exn ->
(* Creation failed, notify the waiter of the failure. *)
Promise.resolve_error wakener exn
(* No one is waiting, do not create a new member to avoid
losing an error if creation fails. *)
()
| Some wakener -> (
match p.create () with
| c -> Promise.resolve_ok wakener c
| exception exn ->
(* Creation failed, notify the waiter of the failure. *)
Promise.resolve_error wakener exn)
(* Verify a member is still valid before using it. *)
let validate_and_return p c =
match p.validate c with
| true -> c
| false ->
(* Remove this member and create a new one. *)
dispose p c;
create_member p
| exception e ->
(* Validation failed: create a new member if at least one
resolver is waiting. *)
dispose p c;
replace_disposed p;
raise e
| true -> c
| false ->
(* Remove this member and create a new one. *)
dispose p c;
create_member p
| exception e ->
(* Validation failed: create a new member if at least one
resolver is waiting. *)
dispose p c;
replace_disposed p;
raise e
(* Acquire a pool member. *)
let acquire p =
if Queue.is_empty p.list then
(* No more available member. *)
if p.count < p.max then
if Queue.is_empty p.list then (
if (* No more available member. *)
p.count < p.max then
(* Limit not reached: create a new one. *)
create_member p
else
(* Limit reached: wait for a free one. *)
let promise, resolver = Promise.create () in
let promise, resolver = Promise.create () in
Stream.add p.waiters resolver;
validate_and_return p (Promise.await_exn promise)
(* (Lwt.add_task_r [@ocaml.warning "-3"]) p.waiters >>= validate_and_return p *)
(* (Lwt.add_task_r [@ocaml.warning "-3"]) p.waiters >>= validate_and_return p *))
else
(* Take the first free member and validate it. *)
let c = Queue.take p.list in
validate_and_return p c
(* Release a member when use resulted in failed promise if the member
is still valid. *)
let check_and_release p c cleared =
let ok = ref false in
p.check c (fun result -> ok := result);
if cleared || not !ok then (
if cleared || not !ok then
(* Element is not ok or the pool was cleared - dispose of it *)
dispose p c
)
else (
(* Element is ok - release it back to the pool *)
else (* Element is ok - release it back to the pool *)
release p c
)
let use p f =
let c = acquire p in
(* Capture the current cleared state so we can see if it changes while this
element is in use *)
let cleared = !(p.cleared) in
let promise () =
try f c with
| e ->
try f c
with e ->
check_and_release p c !cleared;
raise e
in
let r = promise () in
if !cleared then (
(* p was cleared while promise was resolving - dispose of this element *)
dispose p c;
r
)
r)
else (
release p c;
r
)
r)
let clear p =
let elements = Queue.fold (fun l element -> element :: l) [] p.list in
Queue.clear p.list;
Expand All @@ -144,4 +149,5 @@ let clear p =
old_cleared := true;
p.cleared := ref false;
List.iter (dispose p) elements
let wait_queue_length p = Stream.length p.waiters
let wait_queue_length p = Stream.length p.waiters

0 comments on commit ecfabf4

Please sign in to comment.