Skip to content

Commit

Permalink
[PFS 227] nil out entire branch (#9944)
Browse files Browse the repository at this point in the history
This PR is to nil out branch field in responses. This is done in
interceptors.
Motivated by need to reduce confusion about relationship between Commits
and Branches.

Most of the changes in this PR are to fix tests. 90%+ of failed tests
are due to nil pointer problem (branch field is nil). A common fix is:
we resolve commit by its id instead of by its branch.
  • Loading branch information
Zhang-Muyang committed May 2, 2024
1 parent 7fc0314 commit 933aac0
Show file tree
Hide file tree
Showing 33 changed files with 1,733 additions and 7,939 deletions.
8,709 changes: 1,148 additions & 7,561 deletions MODULE.bazel.lock

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions python-sdk/tests/test_pfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_start_commit(client: TestClient, default_project: bool):
repo = client.new_repo(default_project)
branch = pfs.Branch(repo=repo, name="master")
commit = client.pfs.start_commit(branch=branch)
assert commit.branch.repo.name == repo.name
assert commit.repo.name == repo.name

# cannot start new commit before the previous one is finished
with pytest.raises(grpc.RpcError, match=r"parent commit .* has not been finished"):
Expand All @@ -139,7 +139,7 @@ def test_start_with_parent(client: TestClient, default_project: bool):
client.pfs.finish_commit(commit=commit1)

commit2 = client.pfs.start_commit(parent=commit1, branch=branch)
assert commit2.branch.repo.name == repo.name
assert commit2.repo.name == repo.name

@staticmethod
def test_start_commit_fork(client: TestClient, default_project: bool):
Expand All @@ -151,8 +151,7 @@ def test_start_commit_fork(client: TestClient, default_project: bool):
client.pfs.finish_commit(commit=commit1)

commit2 = client.pfs.start_commit(parent=commit1, branch=patch_branch)
assert commit2.branch.name == patch_branch.name
assert commit2.branch.repo.name == repo.name
assert commit2.repo.name == repo.name

assert client.pfs.branch_exists(master_branch)
assert client.pfs.branch_exists(patch_branch)
Expand Down Expand Up @@ -198,8 +197,8 @@ def test_inspect_commit(client: TestClient, default_project: bool):
commit=commit, wait=pfs.CommitState.FINISHED
)
assert commit_info.finished
assert commit_info.commit.branch.name == branch.name
assert commit_info.commit.branch.repo.name == repo.name
assert commit_info.commit.branch.name == ""
assert commit_info.commit.repo.name == repo.name

@staticmethod
def test_squash_commit(client: TestClient, default_project: bool):
Expand Down Expand Up @@ -258,8 +257,7 @@ def test_subscribe_commit(client: TestClient, default_project: bool):
_generated_commit = next(commit_generator)
generated_commit = next(commit_generator)
assert generated_commit.commit.id == commit.id
assert generated_commit.commit.branch.repo.name == repo.name
assert generated_commit.commit.branch.name == "master"
assert generated_commit.commit.repo.name == repo.name

@staticmethod
def test_list_commit(client: TestClient):
Expand Down Expand Up @@ -384,7 +382,7 @@ def test_delete_file(client: TestClient, default_project: bool):

with client.pfs.commit(branch=branch) as commit2:
commit2.delete_file(path="/file1.dat")
assert not client.pfs.path_exists(file=pfs.File.from_uri(f"{commit2}:/file1.dat"))
assert not client.pfs.path_exists(file=pfs.File(commit=commit2, path="/file1.dat"))

@staticmethod
def test_walk_file(client: TestClient, default_project: bool):
Expand Down
11 changes: 7 additions & 4 deletions src/internal/client/pfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func NewBranch(projectName, repoName, branchName string) *pfs.Branch {

// NewCommit creates a pfs.Commit in the given project, repo & branch.
func NewCommit(projectName, repoName, branchName, commitID string) *pfs.Commit {
return &pfs.Commit{
Repo: NewRepo(projectName, repoName),
Id: commitID,
Branch: NewBranch(projectName, repoName, branchName),
commit := &pfs.Commit{
Repo: NewRepo(projectName, repoName),
Id: commitID,
}
if branchName != "" {
commit.Branch = NewBranch(projectName, repoName, branchName)
}
return commit
}

// NewFile creates a pfs.File.
Expand Down
1 change: 1 addition & 0 deletions src/internal/middleware/validation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pachyderm/pachyderm/v2/src/internal/middleware/validation",
visibility = ["//src:__subpackages__"],
deps = [
"//src/internal/errors",
"//src/internal/log",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
Expand Down
31 changes: 27 additions & 4 deletions src/internal/middleware/validation/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package validation
import (
"context"
"fmt"

"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -17,7 +17,11 @@ type validatable interface {
ValidateAll() error
}

func UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
type branchNillable interface {
NilBranch()
}

func UnaryServerInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if r, ok := req.(validatable); ok {
if err := r.ValidateAll(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "validate request: %v", err)
Expand All @@ -29,13 +33,22 @@ func UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServer
} else {
log.DPanic(ctx, "no validation routine on request message", zap.String("type", fmt.Sprintf("%T", req)))
}
return handler(ctx, req)

resp, err := handler(ctx, req)
if err != nil {
return nil, err
}
if b, ok := resp.(branchNillable); ok {
b.NilBranch()
}
return resp, nil
}

var _ grpc.UnaryServerInterceptor = UnaryServerInterceptor

type streamWrapper struct {
grpc.ServerStream
IsServerStream bool
}

var _ grpc.ServerStream = new(streamWrapper)
Expand All @@ -55,8 +68,18 @@ func (w *streamWrapper) RecvMsg(m any) error {
return nil
}

func (w *streamWrapper) SendMsg(m any) error {
// google grpc library wraps client stream and puts server stream. We don't want our implementation to apply to client streams.
if w.IsServerStream {
if b, ok := m.(branchNillable); ok {
b.NilBranch()
}
}
return errors.EnsureStack(w.ServerStream.SendMsg(m))
}

func StreamServerInterceptor(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, &streamWrapper{ServerStream: stream})
return handler(srv, &streamWrapper{ServerStream: stream, IsServerStream: info.IsServerStream})
}

var _ grpc.StreamServerInterceptor = StreamServerInterceptor
5 changes: 4 additions & 1 deletion src/internal/pfsload/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func (pc *pachClient) WithCreateFileSetClient(ctx context.Context, cb func(clien
func (pc *pachClient) AddFileSet(ctx context.Context, commit *pfs.Commit, filesetID string) error {
project := commit.Repo.Project.GetName()
repo := commit.Repo.Name
branch := commit.Branch.Name
branch := ""
if commit.Branch != nil {
branch = commit.Branch.Name
}
return client.AddFileSet(ctx, pc.pfs, project, repo, branch, commit.Id, filesetID)
}

Expand Down
8 changes: 6 additions & 2 deletions src/internal/pfssync/cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func (cc *CacheClient) GetFileTAR(commit *pfs.Commit, path string) (io.ReadClose
if c, ok := cc.get(key); ok {
return cc.APIClient.GetFileTAR(c, path)
}
id, err := cc.APIClient.GetFileSet(commit.Repo.Project.GetName(), commit.Repo.Name, commit.Branch.Name, commit.Id)
branch := ""
if commit.Branch != nil {
branch = commit.Branch.Name
}
id, err := cc.APIClient.GetFileSet(commit.Repo.Project.GetName(), commit.Repo.Name, branch, commit.Id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,7 +98,7 @@ func (ccfsc *cacheCreateFileSetClient) CopyFile(dst string, src *pfs.File, opts
newSrc.Commit = c
return errors.EnsureStack(ccfsc.ModifyFile.CopyFile(dst, newSrc, opts...))
}
id, err := ccfsc.APIClient.GetFileSet(src.Commit.Repo.Project.GetName(), src.Commit.Repo.Name, src.Commit.Branch.Name, src.Commit.Id)
id, err := ccfsc.APIClient.GetFileSet(src.Commit.Repo.Project.GetName(), src.Commit.Repo.Name, "", src.Commit.Id)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion src/internal/pfsutil/pfsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ import (
)

func MetaCommit(commit *pfs.Commit) *pfs.Commit {
return client.NewSystemRepo(commit.Repo.Project.GetName(), commit.Repo.Name, pfs.MetaRepoType).NewCommit(commit.Branch.Name, commit.Id)
branch := ""
if commit.Branch != nil {
branch = commit.Branch.Name
}
return client.NewSystemRepo(commit.Repo.Project.GetName(), commit.Repo.Name, pfs.MetaRepoType).NewCommit(branch, commit.Id)
}
63 changes: 63 additions & 0 deletions src/pfs/pfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,69 @@ func (c *Commit) AccessRepo() *Repo {
return c.GetBranch().GetRepo()
}

func (c *Commit) NilBranch() {
if c != nil {
c.Branch = nil
}
}

func (ci *CommitInfo) NilBranch() {
if ci != nil {
ci.Commit.NilBranch()
ci.ParentCommit.NilBranch()
}
}

func (f *File) NilBranch() {
if f != nil {
f.Commit.NilBranch()
}
}

func (fi *FileInfo) NilBranch() {
if fi != nil {
fi.File.NilBranch()
}
}

func (dfr *DiffFileResponse) NilBranch() {
if dfr != nil {
dfr.OldFile.NilBranch()
dfr.NewFile.NilBranch()
}
}

func (cf *CopyFile) NilBranch() {
if cf != nil {
cf.Src.NilBranch()
}
}

func (bi *BranchInfo) NilBranch() {
if bi != nil {
bi.Head.NilBranch()
}
}

func (csi *CommitSetInfo) NilBranch() {
if csi != nil {
for _, ci := range csi.Commits {
ci.NilBranch()
}
}
}

func (fcr *FindCommitsResponse) NilBranch() {
if fcr != nil && fcr.Result != nil {
switch c := fcr.Result.(type) {
case *FindCommitsResponse_LastSearchedCommit:
c.LastSearchedCommit.NilBranch()
case *FindCommitsResponse_FoundCommit:
c.FoundCommit.NilBranch()
}
}
}

func (b *Branch) NewCommit(id string) *Commit {
return &Commit{
Branch: proto.Clone(b).(*Branch),
Expand Down
12 changes: 12 additions & 0 deletions src/pfs/pfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (
"google.golang.org/protobuf/testing/protocmp"
)

func TestCommit_NilBranch(t *testing.T) {
var b1 = &Branch{Name: "dummy"}
var c1 = &Commit{Branch: b1}
c1.NilBranch()
require.Nil(t, c1.Branch)

var b2 = &Branch{Name: ""}
var c2 = &Commit{Branch: b2}
c2.NilBranch()
require.Nil(t, c2.Branch)
}

func TestProject_ValidateName(t *testing.T) {
var p = &Project{Name: "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"}
err := p.ValidateName()
Expand Down
22 changes: 11 additions & 11 deletions src/server/auth/server/testing/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSuperAdminRWO(t *testing.T) {
require.NoError(t, err)
err = aliceClient.PutFile(commit, "/file", strings.NewReader("test data"))
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, "", commit.Id))

// bob cannot read from the repo
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestSuperAdminRWO(t *testing.T) {
// bob can write to the repo
commit, err = bobClient.StartCommit(pfs.DefaultProjectName, repoName, "master")
require.NoError(t, err)
require.NoError(t, bobClient.FinishCommit(pfs.DefaultProjectName, repoName, commit.Branch.Name, commit.Id))
require.NoError(t, bobClient.FinishCommit(pfs.DefaultProjectName, repoName, "", commit.Id))
require.Equal(t, 2, tu.CommitCnt(t, aliceClient, repo)) // check that a new commit was created

// bob can update the repo's ACL
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestFSAdminRWO(t *testing.T) {
require.NoError(t, err)
err = aliceClient.PutFile(commit, "/file", strings.NewReader("test data"))
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, "", commit.Id))

// bob cannot read from the repo
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestFSAdminRWO(t *testing.T) {
// bob can write to the repo
commit, err = bobClient.StartCommit(pfs.DefaultProjectName, repoName, "master")
require.NoError(t, err)
require.NoError(t, bobClient.FinishCommit(pfs.DefaultProjectName, repoName, commit.Branch.Name, commit.Id))
require.NoError(t, bobClient.FinishCommit(pfs.DefaultProjectName, repoName, "", commit.Id))
require.Equal(t, 2, tu.CommitCnt(t, aliceClient, repo)) // check that a new commit was created

// bob can update the repo's ACL
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestFSAdminFixBrokenRepo(t *testing.T) {
require.NoError(t, err)
err = aliceClient.PutFile(commit, "/file", strings.NewReader("test data"))
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repoName, "", commit.Id))
require.Equal(t, 1, tu.CommitCnt(t, rootClient, repo)) // check that a new commit was created
}

Expand Down Expand Up @@ -411,7 +411,7 @@ func TestPreActivationPipelinesKeepRunningAfterActivation(t *testing.T) {
require.NoError(t, err)
err = aliceClient.PutFile(commit, "/file1", strings.NewReader("test data"))
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, "", commit.Id))

// make sure the pipeline runs
require.NoErrorWithinT(t, 60*time.Second, func() error {
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestPreActivationPipelinesKeepRunningAfterActivation(t *testing.T) {
require.NoError(t, err)
err = rootClient.PutFile(commit, "/file2", strings.NewReader("test data"))
require.NoError(t, err)
require.NoError(t, rootClient.FinishCommit(pfs.DefaultProjectName, repo, commit.Branch.Name, commit.Id))
require.NoError(t, rootClient.FinishCommit(pfs.DefaultProjectName, repo, "", commit.Id))

// make sure the pipeline still runs (i.e. it's not running as alice)
require.NoErrorWithinT(t, 60*time.Second, func() error {
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestRobotUserACL(t *testing.T) {
// test that the robot can commit to alice's repo
commit, err = robotClient.StartCommit(pfs.DefaultProjectName, repo2, "master")
require.NoError(t, err)
require.NoError(t, robotClient.FinishCommit(pfs.DefaultProjectName, repo2, commit.Branch.Name, commit.Id))
require.NoError(t, robotClient.FinishCommit(pfs.DefaultProjectName, repo2, "", commit.Id))
}

// TestGroupRoleBinding tests that a group can be added to a role binding
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestGroupRoleBinding(t *testing.T) {
// test that alice can commit to the repo
commit, err := aliceClient.StartCommit(pfs.DefaultProjectName, repo, "master")
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, "", commit.Id))
}

// TestRobotUserAdmin tests that robot users can
Expand Down Expand Up @@ -669,15 +669,15 @@ func TestRobotUserAdmin(t *testing.T) {
require.NoError(t, robotClient2.CreateRepo(pfs.DefaultProjectName, repo))
commit, err := robotClient.StartCommit(pfs.DefaultProjectName, repo, "master")
require.NoError(t, err) // admin privs means robotUser can commit
require.NoError(t, robotClient.FinishCommit(pfs.DefaultProjectName, repo, commit.Branch.Name, commit.Id))
require.NoError(t, robotClient.FinishCommit(pfs.DefaultProjectName, repo, "", commit.Id))

// robotUser adds alice to the repo, and checks that the ACL is updated
require.Equal(t, tu.BuildBindings(tu.Robot(robotUser2), auth.RepoOwnerRole), tu.GetRepoRoleBinding(robotClient.Ctx(), t, robotClient, pfs.DefaultProjectName, repo))
require.NoError(t, robotClient.ModifyRepoRoleBinding(robotClient.Ctx(), pfs.DefaultProjectName, repo, alice, []string{auth.RepoWriterRole}))
require.Equal(t, tu.BuildBindings(tu.Robot(robotUser2), auth.RepoOwnerRole, alice, auth.RepoWriterRole), tu.GetRepoRoleBinding(robotClient.Ctx(), t, robotClient, pfs.DefaultProjectName, repo))
commit, err = aliceClient.StartCommit(pfs.DefaultProjectName, repo, "master")
require.NoError(t, err)
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, commit.Branch.Name, commit.Id))
require.NoError(t, aliceClient.FinishCommit(pfs.DefaultProjectName, repo, "", commit.Id))

_, err = robotClient.Deactivate(robotClient.Ctx(), &auth.DeactivateRequest{})
require.NoError(t, err)
Expand Down

0 comments on commit 933aac0

Please sign in to comment.