Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable/bttest): fix ModifyColumnFamilies to purge data #4096

Merged
merged 5 commits into from Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions bigtable/bttest/inmem.go
Expand Up @@ -228,6 +228,15 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu
return nil, fmt.Errorf("can't delete unknown family %q", mod.Id)
}
delete(tbl.families, mod.Id)

// Purge all data for this column family
tbl.rows.Ascend(func(i btree.Item) bool {
r := i.(*row)
r.mu.Lock()
defer r.mu.Unlock()
delete(r.families, mod.Id)
return true
})
} else if modify := mod.GetUpdate(); modify != nil {
if _, ok := tbl.families[mod.Id]; !ok {
return nil, fmt.Errorf("no such family %q", mod.Id)
Expand Down
68 changes: 68 additions & 0 deletions bigtable/bttest/inmem_test.go
Expand Up @@ -345,6 +345,74 @@ func TestTableRowsConcurrent(t *testing.T) {
}
}

func TestModifyColumnFamilies(t *testing.T) {
s := &server{
tables: make(map[string]*table),
}
ctx := context.Background()
tblInfo, err := populateTable(ctx, s)
if err != nil {
t.Fatal(err)
}

readRows := func(expectChunks, expectCols, expectFams int) {
t.Helper()
mock := &MockReadRowsServer{}
req := &btpb.ReadRowsRequest{TableName: tblInfo.Name}
if err := s.ReadRows(req, mock); err != nil {
t.Fatalf("ReadRows error: %v", err)
}
cols := map[string]bool{}
fams := map[string]bool{}
chunks := 0
for _, r := range mock.responses {
for _, c := range r.Chunks {
chunks++
colName := c.FamilyName.Value + "." + string(c.Qualifier.Value)
cols[colName] = true
fams[c.FamilyName.Value] = true
}
}
if got, want := len(fams), expectFams; got != want {
t.Errorf("col count: got %d, want %d", got, want)
}
if got, want := len(cols), expectCols; got != want {
t.Errorf("col count: got %d, want %d", got, want)
}
if got, want := chunks, expectChunks; got != want {
t.Errorf("chunk count: got %d, want %d", got, want)
}
}

readRows(27, 9, 3)

// Now drop the middle column.
if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
Name: tblInfo.Name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf1",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
}},
}); err != nil {
t.Fatalf("ModifyColumnFamilies error: %v", err)
}

readRows(18, 6, 2)

// adding the column back should not re-create the data.
if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
Name: tblInfo.Name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf1",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
}},
}); err != nil {
t.Fatalf("ModifyColumnFamilies error: %v", err)
}

readRows(18, 6, 2)
}

func TestDropRowRange(t *testing.T) {
s := &server{
tables: make(map[string]*table),
Expand Down