diff --git a/bigtable/filter.go b/bigtable/filter.go index f5c8a4d2d3c..bef378b7316 100644 --- a/bigtable/filter.go +++ b/bigtable/filter.go @@ -144,6 +144,18 @@ func (lnf latestNFilter) proto() *btpb.RowFilter { return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerColumnLimitFilter{CellsPerColumnLimitFilter: int32(lnf)}} } +// LabelFilter returns a filter that applies the +// given label to all cells in the output row. +func LabelFilter(label string) Filter { return labelFilter(label) } + +type labelFilter string + +func (lf labelFilter) String() string { return fmt.Sprintf("apply_label(%s)", string(lf)) } + +func (lf labelFilter) proto() *btpb.RowFilter { + return &btpb.RowFilter{Filter: &btpb.RowFilter_ApplyLabelTransformer{ApplyLabelTransformer: string(lf)}} +} + // StripValueFilter returns a filter that replaces each value with the empty string. func StripValueFilter() Filter { return stripValueFilter{} } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 89b815ecb69..8aec3f07976 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "math/rand" + "reflect" "sort" "strings" "sync" @@ -792,7 +793,8 @@ func TestIntegration_Read(t *testing.T) { // We do the read, grab all the cells, turn them into "--", // and join with a comma. - want string + want string + wantLabels []string }{ { desc: "read all, unfiltered", @@ -880,6 +882,14 @@ func TestIntegration_Read(t *testing.T) { limit: LimitRows(2), want: "gwashington-jadams-1,jadams-tjefferson-1", }, + { + desc: "apply labels to the result rows", + rr: RowRange{}, + filter: LabelFilter("test-label"), + limit: LimitRows(2), + want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", + wantLabels: []string{"test-label", "test-label", "test-label"}, + }, { desc: "read all, strip values", rr: RowRange{}, @@ -968,10 +978,11 @@ func TestIntegration_Read(t *testing.T) { if test.limit != nil { opts = append(opts, test.limit) } - var elt []string + var elt, labels []string err := table.ReadRows(ctx, test.rr, func(r Row) bool { for _, ris := range r { for _, ri := range ris { + labels = append(labels, ri.Labels...) elt = append(elt, formatReadItem(ri)) } } @@ -983,6 +994,9 @@ func TestIntegration_Read(t *testing.T) { if got := strings.Join(elt, ","); got != test.want { t.Fatalf("got %q\nwant %q", got, test.want) } + if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) { + t.Fatalf("got %q\nwant %q", got, want) + } }) } } diff --git a/bigtable/reader.go b/bigtable/reader.go index c84a61e4ee5..64aabc91964 100644 --- a/bigtable/reader.go +++ b/bigtable/reader.go @@ -43,6 +43,7 @@ type ReadItem struct { Row, Column string Timestamp Timestamp Value []byte + Labels []string } // The current state of the read rows state machine. @@ -57,14 +58,15 @@ const ( // chunkReader handles cell chunks from the read rows response and combines // them into full Rows. type chunkReader struct { - state rrState - curKey []byte - curFam string - curQual []byte - curTS int64 - curVal []byte - curRow Row - lastKey string + state rrState + curKey []byte + curLabels []string + curFam string + curQual []byte + curTS int64 + curVal []byte + curRow Row + lastKey string } // newChunkReader returns a new chunkReader for handling read rows responses. @@ -138,6 +140,7 @@ func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row // ValueSize is specified so expect a split value of ValueSize bytes if cr.curVal == nil { cr.curVal = make([]byte, 0, cc.ValueSize) + cr.curLabels = cc.Labels } cr.curVal = append(cr.curVal, cc.Value...) cr.state = cellInProgress @@ -145,6 +148,7 @@ func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row // This cell is either the complete value or the last chunk of a split if cr.curVal == nil { cr.curVal = cc.Value + cr.curLabels = cc.Labels } else { cr.curVal = append(cr.curVal, cc.Value...) } @@ -165,9 +169,11 @@ func (cr *chunkReader) finishCell() { Column: string(cr.curFam) + ":" + string(cr.curQual), Timestamp: Timestamp(cr.curTS), Value: cr.curVal, + Labels: cr.curLabels, } cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri) cr.curVal = nil + cr.curLabels = nil } func (cr *chunkReader) commitRow() Row { diff --git a/bigtable/reader_test.go b/bigtable/reader_test.go index 8264dc3e24c..00f453f91db 100644 --- a/bigtable/reader_test.go +++ b/bigtable/reader_test.go @@ -37,7 +37,7 @@ func TestSingleCell(t *testing.T) { cr := newChunkReader() // All in one cell - row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true)) + row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } @@ -47,7 +47,7 @@ func TestSingleCell(t *testing.T) { if len(row["fm"]) != 1 { t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"])) } - want := []ReadItem{ri("rk", "fm", "col", 1, "value")} + want := []ReadItem{ri("rk", "fm", "col", 1, "value", []string{})} if !testutil.Equal(row["fm"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want) } @@ -59,11 +59,11 @@ func TestSingleCell(t *testing.T) { func TestMultipleCells(t *testing.T) { cr := newChunkReader() - mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false)) - mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false)) - mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false)) - mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false)) - row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true)) + mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) + mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{})) + mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{})) + mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false, []string{})) + row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } @@ -72,16 +72,16 @@ func TestMultipleCells(t *testing.T) { } want := []ReadItem{ - ri("rs", "fm1", "col1", 0, "val1"), - ri("rs", "fm1", "col1", 1, "val2"), - ri("rs", "fm1", "col2", 0, "val3"), + ri("rs", "fm1", "col1", 0, "val1", []string{}), + ri("rs", "fm1", "col1", 1, "val2", []string{}), + ri("rs", "fm1", "col2", 0, "val3", []string{}), } if !testutil.Equal(row["fm1"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) } want = []ReadItem{ - ri("rs", "fm2", "col1", 0, "val4"), - ri("rs", "fm2", "col2", 1, "extralongval5"), + ri("rs", "fm2", "col1", 0, "val4", []string{}), + ri("rs", "fm2", "col2", 1, "extralongval5", []string{}), } if !testutil.Equal(row["fm2"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) @@ -94,9 +94,9 @@ func TestMultipleCells(t *testing.T) { func TestSplitCells(t *testing.T) { cr := newChunkReader() - mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false)) + mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false, []string{})) mustProcess(t, cr, ccData("world", 0, false)) - row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true)) + row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } @@ -105,8 +105,8 @@ func TestSplitCells(t *testing.T) { } want := []ReadItem{ - ri("rs", "fm1", "col1", 0, "hello world"), - ri("rs", "fm1", "col2", 0, "val2"), + ri("rs", "fm1", "col1", 0, "hello world", []string{}), + ri("rs", "fm1", "col2", 0, "val2", []string{}), } if !testutil.Equal(row["fm1"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) @@ -119,20 +119,20 @@ func TestSplitCells(t *testing.T) { func TestMultipleRows(t *testing.T) { cr := newChunkReader() - row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true)) + row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } - want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")} + want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})} if !testutil.Equal(row["fm1"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) } - row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true)) + row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } - want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")} + want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})} if !testutil.Equal(row["fm2"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) } @@ -145,20 +145,20 @@ func TestMultipleRows(t *testing.T) { func TestBlankQualifier(t *testing.T) { cr := newChunkReader() - row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true)) + row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } - want := []ReadItem{ri("rs1", "fm1", "", 1, "val1")} + want := []ReadItem{ri("rs1", "fm1", "", 1, "val1", []string{})} if !testutil.Equal(row["fm1"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) } - row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true)) + row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true, []string{})) if err != nil { t.Fatalf("Processing chunk: %v", err) } - want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")} + want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2", []string{})} if !testutil.Equal(row["fm2"], want) { t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want) } @@ -168,14 +168,40 @@ func TestBlankQualifier(t *testing.T) { } } +func TestLabels(t *testing.T) { + cr := newChunkReader() + + mustProcess(t, cr, cc("rs1", "fm1", "col1", 0, "hello ", 11, false, []string{"test-label"})) + row := mustProcess(t, cr, ccData("world", 0, true)) + want := []ReadItem{ + ri("rs1", "fm1", "col1", 0, "hello world", []string{"test-label"}), + } + if !testutil.Equal(row["fm1"], want) { + t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) + } + + row, err := cr.Process(cc("rs2", "fm1", "", 1, "val1", 0, true, []string{"test-label2"})) + if err != nil { + t.Fatalf("Processing chunk: %v", err) + } + want = []ReadItem{ri("rs2", "fm1", "", 1, "val1", []string{"test-label2"})} + if !testutil.Equal(row["fm1"], want) { + t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want) + } + + if err := cr.Close(); err != nil { + t.Fatalf("Close: %v", err) + } +} + func TestReset(t *testing.T) { cr := newChunkReader() - mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false)) - mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false)) - mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false)) + mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) + mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false, []string{})) + mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false, []string{})) mustProcess(t, cr, ccReset()) - row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true)) - want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")} + row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true, []string{})) + want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1", []string{})} if !testutil.Equal(row["fm1"], want) { t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want) } @@ -187,8 +213,8 @@ func TestReset(t *testing.T) { func TestNewFamEmptyQualifier(t *testing.T) { cr := newChunkReader() - mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false)) - _, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true)) + mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false, []string{})) + _, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true, []string{})) if err == nil { t.Fatalf("Expected error on second chunk with no qualifier set") } @@ -300,12 +326,12 @@ func toSet(res []TestResult) map[TestResult]bool { } // ri returns a ReadItem for the given components -func ri(rk string, fm string, qual string, ts int64, val string) ReadItem { - return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts)} +func ri(rk string, fm string, qual string, ts int64, val string, labels []string) ReadItem { + return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts), Labels: labels} } // cc returns a CellChunk proto -func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk { +func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool, labels []string) *btspb.ReadRowsResponse_CellChunk { // The components of the cell key are wrapped and can be null or empty var rkWrapper []byte if rk == nilStr { @@ -335,12 +361,14 @@ func cc(rk string, fm string, qual string, ts int64, val string, size int32, com TimestampMicros: ts, Value: []byte(val), ValueSize: size, - RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit}} + RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit}, + Labels: labels, + } } // ccData returns a CellChunk with only a value and size func ccData(val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk { - return cc(nilStr, nilStr, nilStr, 0, val, size, commit) + return cc(nilStr, nilStr, nilStr, 0, val, size, commit, []string{}) } // ccReset returns a CellChunk with RestRow set to true