Skip to content

Commit

Permalink
Merge pull request #2242 from metanivek/gc_in_lower
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek committed May 3, 2023
2 parents e1ef987 + 7a53b0b commit 1f046dd
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Expand Up @@ -4,6 +4,8 @@

- **irmin-pack**
- Fix issue when migrating v2 stores to use lower layer (@metanivek, #2241)
- Fix issue when calling GC for a commit in the lower after migration
(@metanivek, #2242)

## 3.7.0 (2023-04-26)

Expand Down
2 changes: 1 addition & 1 deletion src/irmin-pack/unix/errors.ml
Expand Up @@ -60,7 +60,7 @@ type base_error =
| `Only_minimal_indexing_strategy_allowed
| `Commit_key_is_dangling of string
| `Dangling_key of string
| `Gc_disallowed
| `Gc_disallowed of string
| `Node_or_contents_key_is_indexed of string
| `Gc_process_error of string
| `Corrupted_gc_result_file of string
Expand Down
60 changes: 42 additions & 18 deletions src/irmin-pack/unix/gc.ml
Expand Up @@ -44,6 +44,7 @@ module Make (Args : Gc_args.S) = struct

let v ~root ~lower_root ~new_files_path ~generation ~unlink ~dispatcher ~fm
~contents ~node ~commit commit_key =
let open Result_syntax in
let new_suffix_start_offset, latest_gc_target_offset =
let state : _ Pack_key.state = Pack_key.inspect commit_key in
match state with
Expand All @@ -54,6 +55,29 @@ module Make (Args : Gc_args.S) = struct
(* The caller of this function lifted the key to a direct one. *)
assert false
in
let status = Fm.control fm |> Fm.Control.payload |> fun p -> p.status in
(* Ensure we are calling GC on a commit strictly newer than last GC commit *)
let* () =
match status with
| Gced previous
when Int63.Syntax.(
previous.latest_gc_target_offset >= latest_gc_target_offset) ->
Error
(`Gc_disallowed
(Fmt.str "%a is less than or equal to previous GC offset of %a"
Int63.pp latest_gc_target_offset Int63.pp
previous.latest_gc_target_offset))
| _ -> Ok ()
in
(* Since we can call GC on commits in the lower, ensure we do not provide a
[new_suffix_start_offset] that is older than our current starting offset. *)
let new_suffix_start_offset =
match status with
| Gced { suffix_start_offset; _ }
when Int63.Syntax.(suffix_start_offset >= latest_gc_target_offset) ->
suffix_start_offset
| _ -> new_suffix_start_offset
in
let partial_stats =
let commit_offset = latest_gc_target_offset in
let before_suffix_start_offset =
Expand Down Expand Up @@ -91,24 +115,24 @@ module Make (Args : Gc_args.S) = struct
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "before finalise"
in

{
root;
generation;
unlink;
new_suffix_start_offset;
task;
promise;
resolver;
dispatcher;
fm;
contents;
node;
commit;
partial_stats;
resulting_stats = None;
latest_gc_target_offset;
}
Ok
{
root;
generation;
unlink;
new_suffix_start_offset;
task;
promise;
resolver;
dispatcher;
fm;
contents;
node;
commit;
partial_stats;
resulting_stats = None;
latest_gc_target_offset;
}

let swap_and_purge t (gc_results : Worker.gc_results) =
let removable_chunk_num = List.length gc_results.removable_chunk_idxs in
Expand Down
2 changes: 1 addition & 1 deletion src/irmin-pack/unix/gc.mli
Expand Up @@ -35,7 +35,7 @@ module Make (Args : Gc_args.S) : sig
node:read Args.Node_store.t ->
commit:read Args.Commit_store.t ->
Args.key ->
t
(t, [> `Gc_disallowed of string ]) result
(** Creates and starts a new GC process. *)

val finalise :
Expand Down
1 change: 1 addition & 0 deletions src/irmin-pack/unix/gc_worker.ml
Expand Up @@ -313,6 +313,7 @@ module Make (Args : Gc_args.S) = struct
"suffix: calculate new values";
let suffix = Fm.suffix fm in
let soff = Dispatcher.soff_of_offset dispatcher new_suffix_start_offset in
assert (Int63.Syntax.(soff >= Int63.zero));
(* Step 6.1. Calculate chunks that we have GCed. *)
let open struct
type chunk = { idx : int; end_suffix_off : int63 }
Expand Down
2 changes: 1 addition & 1 deletion src/irmin-pack/unix/io_errors.ml
Expand Up @@ -61,7 +61,7 @@ module Make (Io : Io.S) : S with module Io = Io = struct
| `Only_minimal_indexing_strategy_allowed
| `Commit_key_is_dangling of string
| `Dangling_key of string
| `Gc_disallowed
| `Gc_disallowed of string
| `Node_or_contents_key_is_indexed of string
| `Gc_process_error of string
| `Corrupted_gc_result_file of string
Expand Down
7 changes: 6 additions & 1 deletion src/irmin-pack/unix/lower.ml
Expand Up @@ -232,7 +232,12 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
Irmin_pack.Layout.V5.Volume.control_gc_tmp ~generation ~root
in
let control = Irmin_pack.Layout.V5.Volume.control ~root in
Io.move_file ~src:control_tmp ~dst:control
match Io.classify_path control_tmp with
| `File -> Io.move_file ~src:control_tmp ~dst:control
| `No_such_file_or_directory ->
[%log.info "No tmp volume control file to swap. %s" control];
Ok ()
| `Directory | `Other -> assert false

let cleanup ~generation t =
let clean filename =
Expand Down
6 changes: 4 additions & 2 deletions src/irmin-pack/unix/store.ml
Expand Up @@ -250,12 +250,14 @@ module Maker (Config : Conf.S) = struct
let* commit_key = direct_commit_key t commit_key in
let root = Conf.root t.config in
let* () =
if not (is_allowed t) then Error `Gc_disallowed else Ok ()
if not (is_allowed t) then
Error (`Gc_disallowed "Store does not support GC")
else Ok ()
in
let current_generation = File_manager.generation t.fm in
let next_generation = current_generation + 1 in
let lower_root = Conf.lower_root t.config in
let gc =
let* gc =
Gc.v ~root ~lower_root ~generation:next_generation ~unlink
~dispatcher:t.dispatcher ~fm:t.fm ~contents:t.contents
~node:t.node ~commit:t.commit ~new_files_path commit_key
Expand Down
32 changes: 19 additions & 13 deletions test/irmin-pack/test_gc.ml
Expand Up @@ -782,24 +782,25 @@ module Gc_archival = struct
(S.Gc.is_allowed t.repo) false;
S.Repo.close t.repo

(* TODO re-enable when lower migration is implemented *)
let _gc_reachability_old () =
let gc_reachability_old () =
let root = create_v1_test_env () in
let lower_root = create_lower_root () in
[%log.debug "Open v1 store to trigger migration"];
let* t = init ~root ~fresh:false ~lower_root:(Some lower_root) () in
let* main = S.main t.repo in
[%log.debug "Run GC on commit that is now in lower"];
let* head = S.Head.get main in
let* () = start_gc t head in
let check_ex = function
| `Gc_process_error msg ->
msg = "Archival finalization not yet implemented"
| _ -> false
in
let* () =
Alcotest.check_raises_pack_error
"archiving gc should run until finalization on old stores" check_ex
(fun () -> finalise_gc t)
let () =
match Irmin_pack_unix.Pack_key.inspect (S.Commit.key head) with
| Direct { volume_identifier; _ } ->
Alcotest.(check bool)
"after migration, head is in lower"
(Option.is_some volume_identifier)
true
| _ -> assert false
in
let* () = start_gc t head in
let* () = finalise_gc t in
S.Repo.close t.repo

module B = struct
Expand Down Expand Up @@ -841,6 +842,11 @@ module Gc_archival = struct
let* () = B.check_gced t c2 "gced c2" in
let* () = B.check_removed t c3 "gced c3" in
let* () = B.check_gced t c4 "gced c4" in
let* () =
Alcotest.check_raises_pack_error "Cannot GC on commit older than c5"
(function `Gc_disallowed _ -> true | _ -> false)
(fun () -> start_gc t c4)
in
S.Repo.close t.repo

module Gc_common_tests = Gc_common (B)
Expand All @@ -853,7 +859,7 @@ module Gc_archival = struct
gc_availability_old;
tc "Test archiving twice on different volumes"
gc_archival_multiple_volumes;
(* tc "Test reachability on old stores" gc_reachability_old; *)
tc "Test reachability on old stores" gc_reachability_old;
]
@ Gc_common_tests.tests
end
Expand Down
24 changes: 24 additions & 0 deletions test/irmin-pack/test_lower.ml
Expand Up @@ -393,6 +393,29 @@ module Store_tc = struct
let* _ = read_everything repo in
Store.Repo.close repo

let test_migrate_then_gc_in_lower () =
let root, lower_root = fresh_roots () in
(* Create without a lower *)
let* repo = Store.Repo.v (config ~fresh:true root) in
Alcotest.(check int) "volume_num is 0" 0 (count_volumes repo);
let* main = Store.main repo in
let info () = Store.Info.v ~author:"test" Int64.zero in
let* () = Store.set_exn ~info main [ "a" ] "a" in
let* a_commit = Store.Head.get main in
let* () = Store.set_exn ~info main [ "b" ] "b" in
let* () = Store.Repo.close repo in
(* Reopen with a lower to trigger the migration *)
let* repo = Store.Repo.v (config ~lower_root root) in
Alcotest.(check int) "volume_num is 1" 1 (count_volumes repo);
(* [a] is now in the lower but GC should still succeed
Important: we call GC on a commit that is not the latest in
the lower (ie [b]) to ensure its offset is not equal to the start
offset of the upper. *)
let* _ = Store.Gc.start_exn repo (Store.Commit.key a_commit) in
let* _ = Store.Gc.finalise_exn ~wait:true repo in
Store.Repo.close repo

let test_volume_data_locality () =
let root, lower_root = fresh_roots () in
let* repo = Store.Repo.v (config ~fresh:true ~lower_root root) in
Expand Down Expand Up @@ -508,6 +531,7 @@ module Store = struct
quick_tc "migrate v2" test_migrate_v2;
quick_tc "migrate v3" test_migrate_v3;
quick_tc "migrate then gc" test_migrate_then_gc;
quick_tc "migrate then gc in lower" test_migrate_then_gc_in_lower;
quick_tc "test data locality" test_volume_data_locality;
quick_tc "test cleanup" test_cleanup;
]
Expand Down
5 changes: 3 additions & 2 deletions test/irmin-pack/test_upgrade.ml
Expand Up @@ -533,8 +533,9 @@ let gc_rw t =
| From_v2, _ | _, `always ->
let* () =
Alcotest.check_raises_lwt "GC on V2/always"
(Irmin_pack_unix.Errors.Pack_error `Gc_disallowed) (fun () ->
Store.gc repo)
(Irmin_pack_unix.Errors.Pack_error
(`Gc_disallowed "Store does not support GC"))
(fun () -> Store.gc repo)
in
raise Skip_the_rest_of_that_test
| (From_v3 | From_scratch | From_v3_c0_gced), `minimal -> Store.gc repo
Expand Down

0 comments on commit 1f046dd

Please sign in to comment.