diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index 13c7186a283..ceea0e7e006 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -228,6 +228,15 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu return nil, fmt.Errorf("can't delete unknown family %q", mod.Id) } delete(tbl.families, mod.Id) + + // Purge all data for this column family + tbl.rows.Ascend(func(i btree.Item) bool { + r := i.(*row) + r.mu.Lock() + defer r.mu.Unlock() + delete(r.families, mod.Id) + return true + }) } else if modify := mod.GetUpdate(); modify != nil { if _, ok := tbl.families[mod.Id]; !ok { return nil, fmt.Errorf("no such family %q", mod.Id) diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 1d4d5de89f3..e7eb91d2358 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go @@ -345,6 +345,74 @@ func TestTableRowsConcurrent(t *testing.T) { } } +func TestModifyColumnFamilies(t *testing.T) { + s := &server{ + tables: make(map[string]*table), + } + ctx := context.Background() + tblInfo, err := populateTable(ctx, s) + if err != nil { + t.Fatal(err) + } + + readRows := func(expectChunks, expectCols, expectFams int) { + t.Helper() + mock := &MockReadRowsServer{} + req := &btpb.ReadRowsRequest{TableName: tblInfo.Name} + if err := s.ReadRows(req, mock); err != nil { + t.Fatalf("ReadRows error: %v", err) + } + cols := map[string]bool{} + fams := map[string]bool{} + chunks := 0 + for _, r := range mock.responses { + for _, c := range r.Chunks { + chunks++ + colName := c.FamilyName.Value + "." + string(c.Qualifier.Value) + cols[colName] = true + fams[c.FamilyName.Value] = true + } + } + if got, want := len(fams), expectFams; got != want { + t.Errorf("col count: got %d, want %d", got, want) + } + if got, want := len(cols), expectCols; got != want { + t.Errorf("col count: got %d, want %d", got, want) + } + if got, want := chunks, expectChunks; got != want { + t.Errorf("chunk count: got %d, want %d", got, want) + } + } + + readRows(27, 9, 3) + + // Now drop the middle column. + if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{ + Name: tblInfo.Name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "cf1", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Drop{Drop: true}, + }}, + }); err != nil { + t.Fatalf("ModifyColumnFamilies error: %v", err) + } + + readRows(18, 6, 2) + + // adding the column back should not re-create the data. + if _, err := s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{ + Name: tblInfo.Name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "cf1", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, + }}, + }); err != nil { + t.Fatalf("ModifyColumnFamilies error: %v", err) + } + + readRows(18, 6, 2) +} + func TestDropRowRange(t *testing.T) { s := &server{ tables: make(map[string]*table),