Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: irmin-pack.unix: Measure IO activity. #2250

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/irmin-pack/unix/io.ml
Expand Up @@ -158,6 +158,7 @@ module Unix = struct
with Unix.Unix_error (e, s1, s2) -> Error (`Io_misc (e, s1, s2)))

let write_exn t ~off ~len s =
Stats.(incr_io t.path @@ Io.Activity.write len);
if String.length s < len then raise (Errors.Pack_error `Invalid_argument);
match (t.closed, t.readonly) with
| true, _ -> raise Errors.Closed
Expand All @@ -168,7 +169,6 @@ module Unix = struct
usage is safe. *)
let buf = Bytes.unsafe_of_string s in
let () = Util.really_write t.fd off buf 0 len in
Index.Stats.add_write len;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered that the stats here are used as a quick and dirty way to aggregate IO activity for the benchmark summary.

We need to double check the "official" summary printer but you can see how it is used for the bench/irmin-pack tree replay.

pb ~f:(`RM, `RM) "Disk bytes read" (fun s -> s.index.bytes_read);
pb ~f:(`RM, `RM) "Disk bytes written" (fun s -> s.index.bytes_written);
pb ~f:(`RM, `RM) "Disk bytes both" (fun s -> s.index.bytes_both);
`Spacer;
pb "Disk reads" (fun s -> s.index.nb_reads);
pb "Disk writes" (fun s -> s.index.nb_writes);
pb "Disk both" (fun s -> s.index.nb_both);

I am definitely in favor of cleaning this up and making it more clear that these are not "index" stats. Perhaps your new Io stats can help the summary aggregate the index activity as well.

()

let write_string t ~off s =
Expand All @@ -189,12 +189,12 @@ module Unix = struct
with Unix.Unix_error (e, s1, s2) -> Error (`Io_misc (e, s1, s2)))

let read_exn t ~off ~len buf =
Stats.(incr_io t.path @@ Io.Activity.read len);
if len > Bytes.length buf then raise (Errors.Pack_error `Invalid_argument);
match t.closed with
| true -> raise Errors.Closed
| false ->
let nread = Util.really_read t.fd off len buf in
Index.Stats.add_read nread;
if nread <> len then
(* didn't manage to read the desired amount; in this case the interface seems to
require we return `Read_out_of_bounds FIXME check this, because it is unusual
Expand Down Expand Up @@ -224,18 +224,20 @@ module Unix = struct
let buf = Buffer.create 0 in
let len = page_size in
let bytes = Bytes.create len in
let rec aux ~off =
let rec aux ~off count =
let nread =
Syscalls.pread ~fd:t.fd ~fd_offset:off ~buffer:bytes ~buffer_offset:0
~length:len
in
if nread > 0 then (
Index.Stats.add_read nread;
Buffer.add_subbytes buf bytes 0 nread;
if nread = len then aux ~off:Int63.(add off (of_int nread)))
if nread = len then aux ~off:Int63.(add off (of_int nread)) count
else count)
else count
in
try
aux ~off:Int63.zero;
let count = aux ~off:Int63.zero 0 in
Stats.(incr_io t.path @@ Io.Activity.read ~nb:count len);
Ok (Buffer.contents buf)
with Unix.Unix_error (e, s1, s2) -> Error (`Io_misc (e, s1, s2))

Expand Down
75 changes: 75 additions & 0 deletions src/irmin-pack/unix/stats.ml
Expand Up @@ -198,6 +198,77 @@ module File_manager = struct
Metrics.update t (Metrics.Mutate f)
end

module Io = struct
module Activity = struct
type t = {
bytes_read : int;
nb_reads : int;
bytes_written : int;
nb_writes : int;
}
[@@deriving irmin]

let zero =
{ bytes_read = 0; nb_reads = 0; bytes_written = 0; nb_writes = 0 }

let read ?(nb = 1) b =
{ bytes_read = b; nb_reads = nb; bytes_written = 0; nb_writes = 0 }

let write ?(nb = 1) b =
{ bytes_read = 0; nb_reads = 0; bytes_written = b; nb_writes = nb }

let sum2 a b =
{
bytes_read = a.bytes_read + b.bytes_read;
nb_reads = a.nb_reads + b.nb_reads;
bytes_written = a.bytes_written + b.bytes_written;
nb_writes = a.nb_writes + b.nb_writes;
}

let sum ts = Seq.fold_left sum2 zero ts

let diff a b =
{
bytes_read = a.bytes_read - b.bytes_read;
nb_reads = a.nb_reads - b.nb_reads;
bytes_written = a.bytes_written - b.bytes_written;
nb_writes = a.nb_writes - b.nb_writes;
}
end

type path = string

module PathMap = Map.Make (String)

(* Some Repr ceremony *)
module Repr_pathmap = Repr.Of_map (struct
include PathMap

let key_t = Repr.string
end)

type t = Activity.t PathMap.t
type Metrics.origin += Io
type stat = t Metrics.t

let init () =
Metrics.v ~origin:Io ~name:"io_stats" ~initial_state:PathMap.empty
(Repr_pathmap.t Activity.t)

let clear stat =
Metrics.update stat (Metrics.Replace (Fun.const PathMap.empty))

let update stat path activity =
Metrics.update stat
(Metrics.Replace
(PathMap.update path (function
| None -> Some activity
| Some previous_activity ->
Some (Activity.sum2 previous_activity activity))))

let export stat = Metrics.state stat
end

module Latest_gc = struct
include Stats_intf.Latest_gc

Expand Down Expand Up @@ -253,6 +324,7 @@ type t = {
index : Index.stat;
file_manager : File_manager.stat;
latest_gc : Latest_gc.stat;
io : Io.stat;
}

let s =
Expand All @@ -261,13 +333,15 @@ let s =
index = Index.init ();
file_manager = File_manager.init ();
latest_gc = Latest_gc.init ();
io = Io.init ();
}

let reset_stats () =
Pack_store.clear s.pack_store;
Index.clear s.index;
File_manager.clear s.file_manager;
Latest_gc.clear s.latest_gc;
Io.clear s.io;
()

let get () = s
Expand Down Expand Up @@ -301,4 +375,5 @@ let get_offset_stats () =
}

let incr_fm_field field = File_manager.update ~field s.file_manager
let incr_io path activity = Io.update s.io path activity
let report_latest_gc x = Latest_gc.update x s.latest_gc
52 changes: 52 additions & 0 deletions src/irmin-pack/unix/stats_intf.ml
Expand Up @@ -209,6 +209,53 @@ module type Sigs = sig
val export : stat -> t
end

module Io : sig
(** IO statistics *)

module Activity : sig
(** IO activity *)

type t = {
bytes_read : int;
nb_reads : int;
bytes_written : int;
nb_writes : int;
}
[@@deriving irmin]

val zero : t
(** [zero] is an activity record with all fields set to 0. *)

val read : ?nb:int -> int -> t
(** [read b] returns the activity for a read of [b] bytes with [nb] system
calls. If [nb] is not provided it will default to 1 (a single system
call). *)

val write : ?nb:int -> int -> t
(** [write b] returns the activity for a single write of [b] bytes with
[nb] system calls. If [nb] is not provided it will default to 1 (a
single system call). *)

val sum : t Seq.t -> t
(** [sum ts] returns the sum of IO activity for all stats in the sequence
[ts]. *)

val diff : t -> t -> t
(** [diff a b] returns the difference between IO activity, i.e. a - b. *)
end

type path = string

module PathMap : Map.S with type key = path

type t = Activity.t PathMap.t
(** Map from file path to IO activity. *)

type stat

val export : stat -> t
end

module Latest_gc : sig
include
module type of Latest_gc
Expand Down Expand Up @@ -246,6 +293,7 @@ module type Sigs = sig
index : Index.stat;
file_manager : File_manager.stat;
latest_gc : Latest_gc.stat;
io : Io.stat;
}
(** Record type for all statistics that will be collected. There is a single
instance (which we refer to as "the instance" below) which is returned by
Expand Down Expand Up @@ -298,6 +346,10 @@ module type Sigs = sig
(** [incr_fm_field field] increments the chosen stats field for the
{!File_manager} *)

val incr_io : Io.path -> Io.Activity.t -> unit
(** [incr_io path io activity] increments the IO activity counters for [path]
by [activity]. *)

val report_latest_gc : Latest_gc.stats -> unit
(** [report_latest_gc gc_stats] sets [(get ()).latest_gc] to the stats of the
latest successful GC. *)
Expand Down