Skip to content

Commit

Permalink
Merge pull request #4753 from MichaelEischer/remove-cleanup-handlers
Browse files Browse the repository at this point in the history
Replace cleanup handlers with context based command cancelation
  • Loading branch information
MichaelEischer committed Apr 24, 2024
2 parents 3f9d508 + 484dbb1 commit a7b5e09
Show file tree
Hide file tree
Showing 44 changed files with 350 additions and 299 deletions.
84 changes: 18 additions & 66 deletions cmd/restic/cleanup.go
@@ -1,89 +1,41 @@
package main

import (
"context"
"os"
"os/signal"
"sync"
"syscall"

"github.com/restic/restic/internal/debug"
)

var cleanupHandlers struct {
sync.Mutex
list []func(code int) (int, error)
done bool
ch chan os.Signal
}

func init() {
cleanupHandlers.ch = make(chan os.Signal, 1)
go CleanupHandler(cleanupHandlers.ch)
signal.Notify(cleanupHandlers.ch, syscall.SIGINT, syscall.SIGTERM)
}
func createGlobalContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())

// AddCleanupHandler adds the function f to the list of cleanup handlers so
// that it is executed when all the cleanup handlers are run, e.g. when SIGINT
// is received.
func AddCleanupHandler(f func(code int) (int, error)) {
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()
ch := make(chan os.Signal, 1)
go cleanupHandler(ch, cancel)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)

// reset the done flag for integration tests
cleanupHandlers.done = false

cleanupHandlers.list = append(cleanupHandlers.list, f)
return ctx
}

// RunCleanupHandlers runs all registered cleanup handlers
func RunCleanupHandlers(code int) int {
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()

if cleanupHandlers.done {
return code
}
cleanupHandlers.done = true
// cleanupHandler handles the SIGINT and SIGTERM signals.
func cleanupHandler(c <-chan os.Signal, cancel context.CancelFunc) {
s := <-c
debug.Log("signal %v received, cleaning up", s)
Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)

for _, f := range cleanupHandlers.list {
var err error
code, err = f(code)
if err != nil {
Warnf("error in cleanup handler: %v\n", err)
}
if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
_, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
_, _ = os.Stderr.WriteString(debug.DumpStacktrace())
_, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
}
cleanupHandlers.list = nil
return code
}

// CleanupHandler handles the SIGINT and SIGTERM signals.
func CleanupHandler(c <-chan os.Signal) {
for s := range c {
debug.Log("signal %v received, cleaning up", s)
Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)

if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
_, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
_, _ = os.Stderr.WriteString(debug.DumpStacktrace())
_, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
}

code := 0

if s == syscall.SIGINT || s == syscall.SIGTERM {
code = 130
} else {
code = 1
}

Exit(code)
}
cancel()
}

// Exit runs the cleanup handlers and then terminates the process with the
// given exit code.
// Exit terminates the process with the given exit code.
func Exit(code int) {
code = RunCleanupHandlers(code)
debug.Log("exiting with status code %d", code)
os.Exit(code)
}
25 changes: 19 additions & 6 deletions cmd/restic/cmd_check.go
Expand Up @@ -199,10 +199,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
}

cleanup := prepareCheckCache(opts, &gopts)
AddCleanupHandler(func(code int) (int, error) {
cleanup()
return code, nil
})
defer cleanup()

if !gopts.NoLock {
Verbosef("create exclusive lock for repository\n")
Expand All @@ -222,6 +219,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
Verbosef("load indexes\n")
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
hints, errs := chkr.LoadIndex(ctx, bar)
if ctx.Err() != nil {
return ctx.Err()
}

errorsFound := false
suggestIndexRebuild := false
Expand Down Expand Up @@ -283,6 +283,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
if orphanedPacks > 0 {
Verbosef("%d additional files were found in the repo, which likely contain duplicate data.\nThis is non-critical, you can run `restic prune` to correct this.\n", orphanedPacks)
}
if ctx.Err() != nil {
return ctx.Err()
}

Verbosef("check snapshots, trees and blobs\n")
errChan = make(chan error)
Expand Down Expand Up @@ -316,9 +319,16 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
// Must happen after `errChan` is read from in the above loop to avoid
// deadlocking in the case of errors.
wg.Wait()
if ctx.Err() != nil {
return ctx.Err()
}

if opts.CheckUnused {
for _, id := range chkr.UnusedBlobs(ctx) {
unused, err := chkr.UnusedBlobs(ctx)
if err != nil {
return err
}
for _, id := range unused {
Verbosef("unused blob %v\n", id)
errorsFound = true
}
Expand Down Expand Up @@ -395,10 +405,13 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
doReadData(packs)
}

if ctx.Err() != nil {
return ctx.Err()
}

if errorsFound {
return errors.Fatal("repository contains errors")
}

Verbosef("no errors were found\n")

return nil
Expand Down
7 changes: 5 additions & 2 deletions cmd/restic/cmd_copy.go
Expand Up @@ -53,7 +53,7 @@ func init() {
}

func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []string) error {
secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "destination")
secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "destination")
if err != nil {
return err
}
Expand Down Expand Up @@ -103,6 +103,9 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
// also consider identical snapshot copies
dstSnapshotByOriginal[*sn.ID()] = append(dstSnapshotByOriginal[*sn.ID()], sn)
}
if ctx.Err() != nil {
return ctx.Err()
}

// remember already processed trees across all snapshots
visitedTrees := restic.NewIDSet()
Expand Down Expand Up @@ -147,7 +150,7 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
}
Verbosef("snapshot %s saved\n", newID.Str())
}
return nil
return ctx.Err()
}

func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {
Expand Down
17 changes: 13 additions & 4 deletions cmd/restic/cmd_find.go
Expand Up @@ -439,7 +439,10 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {

if err != errAllPacksFound {
// try to resolve unknown pack ids from the index
packIDs = f.indexPacksToBlobs(ctx, packIDs)
packIDs, err = f.indexPacksToBlobs(ctx, packIDs)
if err != nil {
return err
}
}

if len(packIDs) > 0 {
Expand All @@ -456,13 +459,13 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
return nil
}

func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) map[string]struct{} {
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) (map[string]struct{}, error) {
wctx, cancel := context.WithCancel(ctx)
defer cancel()

// remember which packs were found in the index
indexPackIDs := make(map[string]struct{})
f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries
matchingID := false
Expand All @@ -481,6 +484,9 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
indexPackIDs[idStr] = struct{}{}
}
})
if err != nil {
return nil, err
}

for id := range indexPackIDs {
delete(packIDs, id)
Expand All @@ -493,7 +499,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
}
Warnf("some pack files are missing from the repository, getting their blobs from the repository index: %v\n\n", list)
}
return packIDs
return packIDs, nil
}

func (f *Finder) findObjectPack(id string, t restic.BlobType) {
Expand Down Expand Up @@ -608,6 +614,9 @@ func runFind(ctx context.Context, opts FindOptions, gopts GlobalOptions, args []
for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, opts.Snapshots) {
filteredSnapshots = append(filteredSnapshots, sn)
}
if ctx.Err() != nil {
return ctx.Err()
}

sort.Slice(filteredSnapshots, func(i, j int) bool {
return filteredSnapshots[i].Time.Before(filteredSnapshots[j].Time)
Expand Down
7 changes: 7 additions & 0 deletions cmd/restic/cmd_forget.go
Expand Up @@ -188,6 +188,9 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
snapshots = append(snapshots, sn)
}
if ctx.Err() != nil {
return ctx.Err()
}

var jsonGroups []*ForgetGroup

Expand Down Expand Up @@ -270,6 +273,10 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
}
}

if ctx.Err() != nil {
return ctx.Err()
}

if len(removeSnIDs) > 0 {
if !opts.DryRun {
bar := printer.NewCounter("files deleted")
Expand Down
4 changes: 2 additions & 2 deletions cmd/restic/cmd_init.go
Expand Up @@ -80,7 +80,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []
return err
}

gopts.password, err = ReadPasswordTwice(gopts,
gopts.password, err = ReadPasswordTwice(ctx, gopts,
"enter password for new repository: ",
"enter password again: ")
if err != nil {
Expand Down Expand Up @@ -131,7 +131,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []

func maybeReadChunkerPolynomial(ctx context.Context, opts InitOptions, gopts GlobalOptions) (*chunker.Pol, error) {
if opts.CopyChunkerParameters {
otherGopts, _, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "secondary")
otherGopts, _, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "secondary")
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/restic/cmd_key_add.go
Expand Up @@ -60,7 +60,7 @@ func runKeyAdd(ctx context.Context, gopts GlobalOptions, opts KeyAddOptions, arg
}

func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyAddOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile)
pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil {
return err
}
Expand All @@ -83,7 +83,7 @@ func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOption
// testKeyNewPassword is used to set a new password during integration testing.
var testKeyNewPassword string

func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error) {
func getNewPassword(ctx context.Context, gopts GlobalOptions, newPasswordFile string) (string, error) {
if testKeyNewPassword != "" {
return testKeyNewPassword, nil
}
Expand All @@ -97,7 +97,7 @@ func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error)
newopts := gopts
newopts.password = ""

return ReadPasswordTwice(newopts,
return ReadPasswordTwice(ctx, newopts,
"enter new password: ",
"enter password again: ")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/restic/cmd_key_passwd.go
Expand Up @@ -57,7 +57,7 @@ func runKeyPasswd(ctx context.Context, gopts GlobalOptions, opts KeyPasswdOption
}

func changePassword(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyPasswdOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile)
pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/restic/cmd_list.go
Expand Up @@ -59,10 +59,9 @@ func runList(ctx context.Context, gopts GlobalOptions, args []string) error {
if err != nil {
return err
}
idx.Each(ctx, func(blobs restic.PackedBlob) {
return idx.Each(ctx, func(blobs restic.PackedBlob) {
Printf("%v %v\n", blobs.Type, blobs.ID)
})
return nil
})
default:
return errors.Fatal("invalid type")
Expand Down

0 comments on commit a7b5e09

Please sign in to comment.