Skip to content

Commit

Permalink
feat(bigtable): implement LabelFilter (#2665)
Browse files Browse the repository at this point in the history
This allows cells to be labeled during filtering.

Closes #1700 

Co-authored-by: Chris Cotter <cjcotter@google.com>
  • Loading branch information
Gurov Ilya and tritone committed Aug 12, 2020
1 parent 92575e8 commit 3da4284
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 46 deletions.
12 changes: 12 additions & 0 deletions bigtable/filter.go
Expand Up @@ -144,6 +144,18 @@ func (lnf latestNFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerColumnLimitFilter{CellsPerColumnLimitFilter: int32(lnf)}}
}

// LabelFilter returns a filter that applies the
// given label to all cells in the output row.
func LabelFilter(label string) Filter { return labelFilter(label) }

type labelFilter string

func (lf labelFilter) String() string { return fmt.Sprintf("apply_label(%s)", string(lf)) }

func (lf labelFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_ApplyLabelTransformer{ApplyLabelTransformer: string(lf)}}
}

// StripValueFilter returns a filter that replaces each value with the empty string.
func StripValueFilter() Filter { return stripValueFilter{} }

Expand Down
18 changes: 16 additions & 2 deletions bigtable/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -792,7 +793,8 @@ func TestIntegration_Read(t *testing.T) {

// We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
// and join with a comma.
want string
want string
wantLabels []string
}{
{
desc: "read all, unfiltered",
Expand Down Expand Up @@ -880,6 +882,14 @@ func TestIntegration_Read(t *testing.T) {
limit: LimitRows(2),
want: "gwashington-jadams-1,jadams-tjefferson-1",
},
{
desc: "apply labels to the result rows",
rr: RowRange{},
filter: LabelFilter("test-label"),
limit: LimitRows(2),
want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1",
wantLabels: []string{"test-label", "test-label", "test-label"},
},
{
desc: "read all, strip values",
rr: RowRange{},
Expand Down Expand Up @@ -968,10 +978,11 @@ func TestIntegration_Read(t *testing.T) {
if test.limit != nil {
opts = append(opts, test.limit)
}
var elt []string
var elt, labels []string
err := table.ReadRows(ctx, test.rr, func(r Row) bool {
for _, ris := range r {
for _, ri := range ris {
labels = append(labels, ri.Labels...)
elt = append(elt, formatReadItem(ri))
}
}
Expand All @@ -983,6 +994,9 @@ func TestIntegration_Read(t *testing.T) {
if got := strings.Join(elt, ","); got != test.want {
t.Fatalf("got %q\nwant %q", got, test.want)
}
if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) {
t.Fatalf("got %q\nwant %q", got, want)
}
})
}
}
Expand Down
22 changes: 14 additions & 8 deletions bigtable/reader.go
Expand Up @@ -43,6 +43,7 @@ type ReadItem struct {
Row, Column string
Timestamp Timestamp
Value []byte
Labels []string
}

// The current state of the read rows state machine.
Expand All @@ -57,14 +58,15 @@ const (
// chunkReader handles cell chunks from the read rows response and combines
// them into full Rows.
type chunkReader struct {
state rrState
curKey []byte
curFam string
curQual []byte
curTS int64
curVal []byte
curRow Row
lastKey string
state rrState
curKey []byte
curLabels []string
curFam string
curQual []byte
curTS int64
curVal []byte
curRow Row
lastKey string
}

// newChunkReader returns a new chunkReader for handling read rows responses.
Expand Down Expand Up @@ -138,13 +140,15 @@ func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row
// ValueSize is specified so expect a split value of ValueSize bytes
if cr.curVal == nil {
cr.curVal = make([]byte, 0, cc.ValueSize)
cr.curLabels = cc.Labels
}
cr.curVal = append(cr.curVal, cc.Value...)
cr.state = cellInProgress
} else {
// This cell is either the complete value or the last chunk of a split
if cr.curVal == nil {
cr.curVal = cc.Value
cr.curLabels = cc.Labels
} else {
cr.curVal = append(cr.curVal, cc.Value...)
}
Expand All @@ -165,9 +169,11 @@ func (cr *chunkReader) finishCell() {
Column: string(cr.curFam) + ":" + string(cr.curQual),
Timestamp: Timestamp(cr.curTS),
Value: cr.curVal,
Labels: cr.curLabels,
}
cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri)
cr.curVal = nil
cr.curLabels = nil
}

func (cr *chunkReader) commitRow() Row {
Expand Down
100 changes: 64 additions & 36 deletions bigtable/reader_test.go
Expand Up @@ -37,7 +37,7 @@ func TestSingleCell(t *testing.T) {
cr := newChunkReader()

// All in one cell
row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true))
row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
Expand All @@ -47,7 +47,7 @@ func TestSingleCell(t *testing.T) {
if len(row["fm"]) != 1 {
t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"]))
}
want := []ReadItem{ri("rk", "fm", "col", 1, "value")}
want := []ReadItem{ri("rk", "fm", "col", 1, "value", []string{})}
if !testutil.Equal(row["fm"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want)
}
Expand All @@ -59,11 +59,11 @@ func TestSingleCell(t *testing.T) {
func TestMultipleCells(t *testing.T) {
cr := newChunkReader()

mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false))
row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{}))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{}))
mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false, []string{}))
row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
Expand All @@ -72,16 +72,16 @@ func TestMultipleCells(t *testing.T) {
}

want := []ReadItem{
ri("rs", "fm1", "col1", 0, "val1"),
ri("rs", "fm1", "col1", 1, "val2"),
ri("rs", "fm1", "col2", 0, "val3"),
ri("rs", "fm1", "col1", 0, "val1", []string{}),
ri("rs", "fm1", "col1", 1, "val2", []string{}),
ri("rs", "fm1", "col2", 0, "val3", []string{}),
}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
want = []ReadItem{
ri("rs", "fm2", "col1", 0, "val4"),
ri("rs", "fm2", "col2", 1, "extralongval5"),
ri("rs", "fm2", "col1", 0, "val4", []string{}),
ri("rs", "fm2", "col2", 1, "extralongval5", []string{}),
}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
Expand All @@ -94,9 +94,9 @@ func TestMultipleCells(t *testing.T) {
func TestSplitCells(t *testing.T) {
cr := newChunkReader()

mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false, []string{}))
mustProcess(t, cr, ccData("world", 0, false))
row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true))
row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
Expand All @@ -105,8 +105,8 @@ func TestSplitCells(t *testing.T) {
}

want := []ReadItem{
ri("rs", "fm1", "col1", 0, "hello world"),
ri("rs", "fm1", "col2", 0, "val2"),
ri("rs", "fm1", "col1", 0, "hello world", []string{}),
ri("rs", "fm1", "col2", 0, "val2", []string{}),
}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
Expand All @@ -119,20 +119,20 @@ func TestSplitCells(t *testing.T) {
func TestMultipleRows(t *testing.T) {
cr := newChunkReader()

row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true))
row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}

row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true))
row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
Expand All @@ -145,20 +145,20 @@ func TestMultipleRows(t *testing.T) {
func TestBlankQualifier(t *testing.T) {
cr := newChunkReader()

row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true))
row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "", 1, "val1")}
want := []ReadItem{ri("rs1", "fm1", "", 1, "val1", []string{})}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}

row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true))
row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
Expand All @@ -168,14 +168,40 @@ func TestBlankQualifier(t *testing.T) {
}
}

func TestLabels(t *testing.T) {
cr := newChunkReader()

mustProcess(t, cr, cc("rs1", "fm1", "col1", 0, "hello ", 11, false, []string{"test-label"}))
row := mustProcess(t, cr, ccData("world", 0, true))
want := []ReadItem{
ri("rs1", "fm1", "col1", 0, "hello world", []string{"test-label"}),
}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}

row, err := cr.Process(cc("rs2", "fm1", "", 1, "val1", 0, true, []string{"test-label2"}))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm1", "", 1, "val1", []string{"test-label2"})}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}

if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}

func TestReset(t *testing.T) {
cr := newChunkReader()
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{}))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{}))
mustProcess(t, cr, ccReset())
row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true))
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{}))
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want)
}
Expand All @@ -187,8 +213,8 @@ func TestReset(t *testing.T) {
func TestNewFamEmptyQualifier(t *testing.T) {
cr := newChunkReader()

mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
_, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true))
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{}))
_, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true, []string{}))
if err == nil {
t.Fatalf("Expected error on second chunk with no qualifier set")
}
Expand Down Expand Up @@ -300,12 +326,12 @@ func toSet(res []TestResult) map[TestResult]bool {
}

// ri returns a ReadItem for the given components
func ri(rk string, fm string, qual string, ts int64, val string) ReadItem {
return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts)}
func ri(rk string, fm string, qual string, ts int64, val string, labels []string) ReadItem {
return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts), Labels: labels}
}

// cc returns a CellChunk proto
func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk {
func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool, labels []string) *btspb.ReadRowsResponse_CellChunk {
// The components of the cell key are wrapped and can be null or empty
var rkWrapper []byte
if rk == nilStr {
Expand Down Expand Up @@ -335,12 +361,14 @@ func cc(rk string, fm string, qual string, ts int64, val string, size int32, com
TimestampMicros: ts,
Value: []byte(val),
ValueSize: size,
RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit}}
RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit},
Labels: labels,
}
}

// ccData returns a CellChunk with only a value and size
func ccData(val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk {
return cc(nilStr, nilStr, nilStr, 0, val, size, commit)
return cc(nilStr, nilStr, nilStr, 0, val, size, commit, []string{})
}

// ccReset returns a CellChunk with RestRow set to true
Expand Down

0 comments on commit 3da4284

Please sign in to comment.