Skip to content

Commit

Permalink
Improve Delete Branch message w/ Prov, Triggers (#9610)
Browse files Browse the repository at this point in the history
  • Loading branch information
acohen4 committed Jan 3, 2024
1 parent cc788f3 commit 2e322db
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 16 deletions.
119 changes: 112 additions & 7 deletions src/internal/pfsdb/branches.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,53 @@ func UpsertBranch(ctx context.Context, tx *pachsql.Tx, branchInfo *pfs.BranchInf
}

// DeleteBranch deletes a branch.
func DeleteBranch(ctx context.Context, tx *pachsql.Tx, id BranchID, force bool) error {
func DeleteBranch(ctx context.Context, tx *pachsql.Tx, b *BranchInfoWithID, force bool) error {
if !force {
subv, err := GetDirectBranchSubvenance(ctx, tx, b.ID)
if err != nil {
return errors.Wrapf(err, "collect direct subvenance of branch %q", b.BranchInfo.Branch)
}
if len(subv) > 0 {
return errors.Errorf(
"branch %q cannot be deleted because it's in the direct provenance of %v",
b.BranchInfo.Branch, subv,
)
}
triggered, err := GetTriggeredBranches(ctx, tx, b.ID)
if err != nil {
return errors.Wrapf(err, "collect triggered branches for branch %q", b.BranchInfo.Branch)
}
if len(triggered) > 0 {
return errors.Errorf(
"branch %q cannot be deleted because it is triggered by branches %v",
b.BranchInfo.Branch, triggered,
)
}
triggering, err := GetTriggeringBranches(ctx, tx, b.ID)
if err != nil {
return errors.Wrapf(err, "collect triggering branches for branch %q", b.BranchInfo.Branch)
}
if len(triggering) > 0 {
return errors.Errorf(
"branch %q cannot be deleted because it triggers branches %v",
b.BranchInfo.Branch, triggering,
)
}
}
deleteProvQuery := `DELETE FROM pfs.branch_provenance WHERE from_id = $1`
deleteTriggerQuery := `DELETE FROM pfs.branch_triggers WHERE from_branch_id = $1`
if force {
deleteProvQuery = `DELETE FROM pfs.branch_provenance WHERE from_id = $1 OR to_id = $1`
deleteTriggerQuery = `DELETE FROM pfs.branch_triggers WHERE from_branch_id = $1 OR to_branch_id = $1`
}
if _, err := tx.ExecContext(ctx, deleteProvQuery, id); err != nil {
return errors.Wrapf(err, "could not delete branch provenance for branch %d", id)
if _, err := tx.ExecContext(ctx, deleteProvQuery, b.ID); err != nil {
return errors.Wrapf(err, "could not delete branch provenance for branch %d", b.BranchInfo.Branch)
}
if _, err := tx.ExecContext(ctx, deleteTriggerQuery, id); err != nil {
return errors.Wrapf(err, "could not delete branch trigger for branch %d", id)
if _, err := tx.ExecContext(ctx, deleteTriggerQuery, b.ID); err != nil {
return errors.Wrapf(err, "could not delete branch trigger for branch %d", b.BranchInfo.Branch)
}
if _, err := tx.ExecContext(ctx, `DELETE FROM pfs.branches WHERE id = $1`, id); err != nil {
return errors.Wrapf(err, "could not delete branch %d", id)
if _, err := tx.ExecContext(ctx, `DELETE FROM pfs.branches WHERE id = $1`, b.ID); err != nil {
return errors.Wrapf(err, "could not delete branch %d", b.BranchInfo.Branch)
}
return nil
}
Expand Down Expand Up @@ -429,6 +461,30 @@ func GetBranchProvenance(ctx context.Context, ext sqlx.ExtContext, id BranchID)
return branchPbs, nil
}

func GetDirectBranchSubvenance(ctx context.Context, ext sqlx.ExtContext, id BranchID) ([]*pfs.Branch, error) {
var branches []Branch
if err := sqlx.SelectContext(ctx, ext, &branches, `
SELECT
branch.id,
branch.name,
repo.name as "repo.name",
repo.type as "repo.type",
project.name as "repo.project.name"
FROM pfs.branch_provenance bp
JOIN pfs.branches branch ON bp.from_id = branch.id
JOIN pfs.repos repo ON branch.repo_id = repo.id
JOIN core.projects project ON repo.project_id = project.id
WHERE bp.to_id = $1
`, id); err != nil {
return nil, errors.Wrap(err, "could not get direct branch subvenance")
}
var branchPbs []*pfs.Branch
for _, branch := range branches {
branchPbs = append(branchPbs, branch.Pb())
}
return branchPbs, nil
}

// GetBranchSubvenance returns the full subvenance of a branch, i.e. all branches that either directly or transitively depend on it.
func GetBranchSubvenance(ctx context.Context, ext sqlx.ExtContext, id BranchID) ([]*pfs.Branch, error) {
var branches []Branch
Expand Down Expand Up @@ -474,7 +530,56 @@ func CreateDirectBranchProvenance(ctx context.Context, ext sqlx.ExtContext, from
return nil
}

// GetTriggeredBranches lists all the branches that are directly triggered by a branch
func GetTriggeredBranches(ctx context.Context, ext sqlx.ExtContext, bid BranchID) ([]*pfs.Branch, error) {
var branches []Branch
q := `SELECT branch.id,
branch.name,
repo.name as "repo.name",
repo.type as "repo.type",
project.name as "repo.project.name"
FROM pfs.branches branch
JOIN pfs.repos repo ON branch.repo_id = repo.id
JOIN core.projects project ON repo.project_id = project.id
JOIN pfs.branch_triggers trigger ON trigger.to_branch_id = $1
WHERE branch.id = trigger.from_branch_id
`
if err := sqlx.SelectContext(ctx, ext, &branches, q, bid); err != nil {
return nil, errors.Wrap(err, "could not get triggered branches")
}
var branchPbs []*pfs.Branch
for _, branch := range branches {
branchPbs = append(branchPbs, branch.Pb())
}
return branchPbs, nil
}

// GetTriggeringBranches lists all the branches that would directly trigger a branch
func GetTriggeringBranches(ctx context.Context, ext sqlx.ExtContext, bid BranchID) ([]*pfs.Branch, error) {
var branches []Branch
q := `SELECT branch.id,
branch.name,
repo.name as "repo.name",
repo.type as "repo.type",
project.name as "repo.project.name"
FROM pfs.branches branch
JOIN pfs.repos repo ON branch.repo_id = repo.id
JOIN core.projects project ON repo.project_id = project.id
JOIN pfs.branch_triggers trigger ON trigger.from_branch_id = $1
WHERE branch.id = trigger.to_branch_id
`
if err := sqlx.SelectContext(ctx, ext, &branches, q, bid); err != nil {
return nil, errors.Wrap(err, "could not get triggering branches")
}
var branchPbs []*pfs.Branch
for _, branch := range branches {
branchPbs = append(branchPbs, branch.Pb())
}
return branchPbs, nil
}

func GetBranchTrigger(ctx context.Context, ext sqlx.ExtContext, from BranchID) (*pfs.Trigger, error) {
// TODO: should this handle more than one trigger?
trigger := BranchTrigger{}
if err := sqlx.GetContext(ctx, ext, &trigger, `
SELECT
Expand Down
31 changes: 25 additions & 6 deletions src/internal/pfsdb/branches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pfsdb_test

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -422,16 +423,23 @@ func TestBranchDelete(t *testing.T) {
_, err := pfsdb.GetBranchID(ctx, tx, branchBInfo.Branch)
require.NoError(t, err)
})
withFailedTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
withTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
// Delete branch should fail because there exists branches that depend on it.
branchID, err := pfsdb.GetBranchID(ctx, tx, branchBInfo.Branch)
require.NoError(t, err)
require.ErrorContains(t, pfsdb.DeleteBranch(ctx, tx, branchID, false /* force */), "violates foreign key constraint")
branchInfoWithID := &pfsdb.BranchInfoWithID{ID: branchID, BranchInfo: branchBInfo}
err = pfsdb.DeleteBranch(ctx, tx, branchInfoWithID, false /* force */)
matchErr := fmt.Sprintf("branch %q cannot be deleted because it's in the direct provenance of %v",
branchBInfo.Branch,
[]*pfs.Branch{branchCInfo.Branch},
)
require.Equal(t, matchErr, err.Error())
})
withTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
branchID, err := pfsdb.GetBranchID(ctx, tx, branchBInfo.Branch)
require.NoError(t, err)
require.NoError(t, pfsdb.DeleteBranch(ctx, tx, branchID, true /* force */))
branchInfoWithID := &pfsdb.BranchInfoWithID{ID: branchID, BranchInfo: branchBInfo}
require.NoError(t, pfsdb.DeleteBranch(ctx, tx, branchInfoWithID, true /* force */))
_, err = pfsdb.GetBranchInfo(ctx, tx, branchID)
require.True(t, errors.As(err, &pfsdb.BranchNotFoundError{}))
// Verify that BranchA no longer has BranchB in its subvenance
Expand Down Expand Up @@ -518,12 +526,23 @@ func TestBranchTrigger(t *testing.T) {
require.NoError(t, err)
})
// Try to delete the staging branch, which should fail because master depends on it for triggering.
withFailedTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
require.ErrorContains(t, pfsdb.DeleteBranch(ctx, tx, stagingBranchID, false /* force */), "violates foreign key constraint")
withTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
stagingBranchInfo, err := pfsdb.GetBranchInfo(ctx, tx, stagingBranchID)
require.NoError(t, err)
masterBranchInfo, err := pfsdb.GetBranchInfo(ctx, tx, masterBranchID)
require.NoError(t, err)
err = pfsdb.DeleteBranch(ctx, tx,
&pfsdb.BranchInfoWithID{ID: stagingBranchID, BranchInfo: stagingBranchInfo},
false /* force */)
require.YesError(t, err)
msg := fmt.Sprintf("%q cannot be deleted because it is triggered by branches %v", stagingBranchInfo.Branch, []*pfs.Branch{masterBranchInfo.Branch})
require.ErrorContains(t, err, msg)
})
// Delete with force should work.
withTx(t, ctx, db, func(ctx context.Context, tx *pachsql.Tx) {
require.NoError(t, pfsdb.DeleteBranch(ctx, tx, stagingBranchID, true /* force */))
stagingBranchInfo, err := pfsdb.GetBranchInfo(ctx, tx, stagingBranchID)
require.NoError(t, err)
require.NoError(t, pfsdb.DeleteBranch(ctx, tx, &pfsdb.BranchInfoWithID{ID: stagingBranchID, BranchInfo: stagingBranchInfo}, true /* force */))
})
})
}
4 changes: 2 additions & 2 deletions src/server/pfs/server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (d *driver) deleteRepoInfo(ctx context.Context, txnCtx *txncontext.Transact
// against certain corruption situations where the RepoInfo doesn't
// exist in postgres but branches do.
err = pfsdb.ForEachBranch(ctx, txnCtx.SqlTx, &pfs.Branch{Repo: ri.Repo}, func(branchInfoWithID pfsdb.BranchInfoWithID) error {
return pfsdb.DeleteBranch(ctx, txnCtx.SqlTx, branchInfoWithID.ID, force)
return pfsdb.DeleteBranch(ctx, txnCtx.SqlTx, &branchInfoWithID, force)
}, pfsdb.OrderByBranchColumn{Column: pfsdb.BranchColumnID, Order: pfsdb.SortOrderAsc})
if err != nil {
return errors.Wrap(err, "delete repo info")
Expand Down Expand Up @@ -1981,7 +1981,7 @@ func (d *driver) deleteBranch(ctx context.Context, txnCtx *txncontext.Transactio
if err != nil {
return errors.Wrapf(err, "get branch %q", branch.Key())
}
return pfsdb.DeleteBranch(ctx, txnCtx.SqlTx, branchInfoWithID.ID, force)
return pfsdb.DeleteBranch(ctx, txnCtx.SqlTx, branchInfoWithID, force)
}

func (d *driver) deleteAll(ctx context.Context) error {
Expand Down
18 changes: 17 additions & 1 deletion src/server/pfs/server/testing/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,22 @@ func TestToggleBranchProvenance(t *testing.T) {
require.Equal(t, inCommitInfo.Commit.Id, inMasterCommitInfo.Commit.Id)
}

func TestDeleteBranchWithProvenance(t *testing.T) {
ctx := pctx.TestContext(t)
env := realenv.NewRealEnv(ctx, t, dockertestenv.NewTestDBConfig(t).PachConfigOption)
require.NoError(t, env.PachClient.CreateRepo(pfs.DefaultProjectName, "in"))
require.NoError(t, env.PachClient.CreateRepo(pfs.DefaultProjectName, "out"))
require.NoError(t, env.PachClient.CreateBranch(pfs.DefaultProjectName, "out", "master", "", "", []*pfs.Branch{client.NewBranch(pfs.DefaultProjectName, "in", "master")}))
err := env.PachClient.DeleteBranch(pfs.DefaultProjectName, "in", "master", false)
require.YesError(t, err)
matchErr := fmt.Sprintf("branch %q cannot be deleted because it's in the direct provenance of %v",
client.NewBranch(pfs.DefaultProjectName, "in", "master"),
[]*pfs.Branch{client.NewBranch(pfs.DefaultProjectName, "out", "master")},
)
require.Equal(t, matchErr, err.Error())
require.NoError(t, env.PachClient.DeleteBranch(pfs.DefaultProjectName, "in", "master", true))
}

func TestRecreateBranchProvenance(t *testing.T) {
ctx := pctx.TestContext(t)
env := realenv.NewRealEnv(ctx, t, dockertestenv.NewTestDBConfig(t).PachConfigOption)
Expand Down Expand Up @@ -6431,7 +6447,7 @@ OpLoop:
branch := inputBranches[i]
err = env.PachClient.DeleteBranch(pfs.DefaultProjectName, branch.Repo.Name, branch.Name, false)
// don't fail if the error was just that it couldn't delete the branch without breaking subvenance
if err != nil && !strings.Contains(err.Error(), `delete on table "branches" violates foreign key constraint "branch_provenance_to_id_fkey" on table "branch_provenance`) {
if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("branch %q cannot be deleted because it's in the direct provenance of", branch)) {
require.NoError(t, err)
}
inputBranches = append(inputBranches[:i], inputBranches[i+1:]...)
Expand Down

0 comments on commit 2e322db

Please sign in to comment.