Skip to content

Commit

Permalink
fix(bigtable/bttest): fix ModifyColumnFamilies to purge data (#4096)
Browse files Browse the repository at this point in the history
- Adds TestModifyColumnFamilies to test that it works

Tested vs. real bigtable.
  • Loading branch information
dragonsinth committed Jun 4, 2021
1 parent fc6f86e commit 2095028
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
9 changes: 9 additions & 0 deletions bigtable/bttest/inmem.go
Expand Up @@ -228,6 +228,15 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu
return nil, fmt.Errorf("can't delete unknown family %q", mod.Id)
}
delete(tbl.families, mod.Id)

// Purge all data for this column family
tbl.rows.Ascend(func(i btree.Item) bool {
r := i.(*row)
r.mu.Lock()
defer r.mu.Unlock()
delete(r.families, mod.Id)
return true
})
} else if modify := mod.GetUpdate(); modify != nil {
if _, ok := tbl.families[mod.Id]; !ok {
return nil, fmt.Errorf("no such family %q", mod.Id)
Expand Down
68 changes: 68 additions & 0 deletions bigtable/bttest/inmem_test.go
Expand Up @@ -345,6 +345,74 @@ func TestTableRowsConcurrent(t *testing.T) {
}
}

func TestModifyColumnFamilies(t *testing.T) {
s := &server{
tables: make(map[string]*table),
}
ctx := context.Background()
tblInfo, err := populateTable(ctx, s)
if err != nil {
t.Fatal(err)
}

readRows := func(expectChunks, expectCols, expectFams int) {
t.Helper()
mock := &MockReadRowsServer{}
req := &btpb.ReadRowsRequest{TableName: tblInfo.Name}
if err := s.ReadRows(req, mock); err != nil {
t.Fatalf("ReadRows error: %v", err)
}
cols := map[string]bool{}
fams := map[string]bool{}
chunks := 0
for _, r := range mock.responses {
for _, c := range r.Chunks {
chunks++
colName := c.FamilyName.Value + "." + string(c.Qualifier.Value)
cols[colName] = true
fams[c.FamilyName.Value] = true
}
}
if got, want := len(fams), expectFams; got != want {
t.Errorf("col count: got %d, want %d", got, want)
}
if got, want := len(cols), expectCols; got != want {
t.Errorf("col count: got %d, want %d", got, want)
}
if got, want := chunks, expectChunks; got != want {
t.Errorf("chunk count: got %d, want %d", got, want)
}
}

readRows(27, 9, 3)

// Now drop the middle column.
if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
Name: tblInfo.Name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf1",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true},
}},
}); err != nil {
t.Fatalf("ModifyColumnFamilies error: %v", err)
}

readRows(18, 6, 2)

// adding the column back should not re-create the data.
if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
Name: tblInfo.Name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf1",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
}},
}); err != nil {
t.Fatalf("ModifyColumnFamilies error: %v", err)
}

readRows(18, 6, 2)
}

func TestDropRowRange(t *testing.T) {
s := &server{
tables: make(map[string]*table),
Expand Down

0 comments on commit 2095028

Please sign in to comment.