Skip to content

Commit

Permalink
fix(bigtable/cbt) test verification duplication removed
Browse files Browse the repository at this point in the history
  • Loading branch information
markduffett committed Nov 4, 2021
1 parent 92b8135 commit 77b3f18
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 82 deletions.
5 changes: 3 additions & 2 deletions bigtable/cmd/cbt/cbt.go
Expand Up @@ -1647,7 +1647,8 @@ func parseCsvHeaders(r *csv.Reader, fams []string) ([]string, []string, error) {
}
if len(fams) < 2 || fams[1] == "" {
return fams, cols, fmt.Errorf("The first data column requires a non-empty column family or column-family flag set")
} else if len(fams) < len(cols) {
}
if len(fams) < len(cols) {
ext := make([]string, len(cols)-len(fams))
fams = append(fams, ext...)
}
Expand Down Expand Up @@ -1698,7 +1699,7 @@ func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fa
}
}
if empty {
log.Printf("[%d] RowKey [%s] has no mutations, skipping", worker, line[0])
log.Printf("[%d] RowKey '%s' has no mutations, skipping", worker, line[0])
continue
}
if line[0] == "" {
Expand Down
123 changes: 43 additions & 80 deletions bigtable/cmd/cbt/cbt_test.go
Expand Up @@ -18,7 +18,7 @@ import (
"bytes"
"context"
"encoding/csv"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -229,14 +229,13 @@ func TestCsvImporterArgs(t *testing.T) {
}
}

func writeAsCSV(records [][]string) ([]byte, error) {
if records == nil || len(records) == 0 {
return nil, errors.New("Records cannot be nil or empty")
func writeAsCSV(data [][]string) ([]byte, error) {
if len(data) == 0 {
return nil, fmt.Errorf("Data cannot be empty")
}
var buf bytes.Buffer
csvWriter := csv.NewWriter(&buf)
err := csvWriter.WriteAll(records)
if err != nil {
if err := csvWriter.WriteAll(data); err != nil {
return nil, err
}
csvWriter.Flush()
Expand Down Expand Up @@ -369,6 +368,38 @@ func setupEmulator(t *testing.T, tables, families []string) (context.Context, *b
return ctx, client
}

func validateData(ctx context.Context, tbl *bigtable.Table, fams, cols []string, rowData [][]string) error {
// vaildate table entries, valMap["rowkey:family:column"] = mutation value
valMap := make(map[string]string)
for _, row := range rowData {
for i, val := range row {
if i > 0 && val != "" {
valMap[row[0]+":"+fams[i]+":"+cols[i]] = val
}
}
}
for _, data := range rowData {
if row, err := tbl.ReadRow(ctx, data[0]); err != nil {
return err
} else {
for _, cf := range row {
for _, column := range cf {
k := data[0] + ":" + string(column.Column)
v, ok := valMap[k]
if ok && v == string(column.Value) {
delete(valMap, k)
continue
}
}
}
}
}
if len(valMap) != 0 {
return fmt.Errorf("Data didn't match after read, not found %v", valMap)
}
return nil
}

func TestCsvParseAndWrite(t *testing.T) {
ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "my-family-2"})

Expand All @@ -379,7 +410,6 @@ func TestCsvParseAndWrite(t *testing.T) {
{"rk-0", "A", "B"},
{"rk-1", "", "C"},
}
matchCount := 3

byteData, err := writeAsCSV(rowData)
if err != nil {
Expand All @@ -392,37 +422,8 @@ func TestCsvParseAndWrite(t *testing.T) {
t.Fatalf("parseAndWrite() failed unexpectedly")
}

// vaildate table entries
colMap := make(map[string][]string)
for i := range fams {
var col []string
for _, r := range rowData {
col = append(col, r[i])
}
colMap[fams[i]+":"+cols[i]] = col
}
for rowIdx, data := range rowData {
if row, err := tbl.ReadRow(ctx, data[0]); err != nil {
t.Errorf("Error %s", err)
} else {
for _, fam := range fams {
for _, column := range row[fam] {
colId := string(column.Column)
col, ok := colMap[colId]
if ok {
if string(column.Value) == col[rowIdx] {
matchCount--
continue
}
t.Errorf("Column data didnt match, colId: %s, got: %s, want %s\n", colId, string(column.Value), col[rowIdx])
}
}
}
}
}

if matchCount != 0 {
t.Fatalf("Data didn't match after read for %d values", matchCount)
if err := validateData(ctx, tbl, fams, cols, rowData); err != nil {
t.Fatalf("Read back validation error:%s", err)
}
}

Expand Down Expand Up @@ -479,14 +480,13 @@ func TestCsvParseAndWriteDuplicateRowkeys(t *testing.T) {
} else {
for _, cf := range row { // each column family in row
for _, column := range cf { // each cf:column, aka each mutation
colId := string(column.Column)
k := "rk-0:" + colId + ":" + string(column.Value)
k := "rk-0:" + string(column.Column) + ":" + string(column.Value)
_, ok := valMap[k]
if ok {
delete(valMap, k)
continue
}
t.Errorf("row data not found, colId: %s\n", colId)
t.Errorf("row data not found for %s\n", k)
}
}
}
Expand All @@ -502,7 +502,6 @@ func TestCsvToCbt(t *testing.T) {
ia importerArgs
csvData [][]string
expectedFams []string
matchCount int
dataStartIdx int
}{
{
Expand All @@ -517,7 +516,6 @@ func TestCsvToCbt(t *testing.T) {
{"rk-3", "C", ""},
},
expectedFams: []string{"", "my-family", "my-family"},
matchCount: 3,
dataStartIdx: 2,
},
{
Expand All @@ -531,7 +529,6 @@ func TestCsvToCbt(t *testing.T) {
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
matchCount: 4,
dataStartIdx: 1,
},
{
Expand All @@ -545,7 +542,6 @@ func TestCsvToCbt(t *testing.T) {
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
matchCount: 4,
dataStartIdx: 1,
},
{
Expand All @@ -559,7 +555,6 @@ func TestCsvToCbt(t *testing.T) {
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
matchCount: 4,
dataStartIdx: 1,
},
}
Expand All @@ -576,40 +571,8 @@ func TestCsvToCbt(t *testing.T) {

importCSV(ctx, tbl, reader, tc.ia)

// created lookup map for expected outputs
colRow := tc.csvData[tc.dataStartIdx-1]
colMap := make(map[string][]string)
for i := range tc.expectedFams {
var col []string
for _, r := range tc.csvData[tc.dataStartIdx:] {
col = append(col, r[i])
}
colMap[tc.expectedFams[i]+":"+colRow[i]] = col
}

// read rows back and validate mutations
for rowIdx, data := range tc.csvData[tc.dataStartIdx:] {
if row, err := tbl.ReadRow(ctx, data[0]); err != nil {
t.Errorf("%s error %s", tc.label, err)
} else {
for _, cf := range row { // each column family in row
for _, column := range cf { // each cf:column, aka each mutation
colId := string(column.Column)
col, ok := colMap[colId]
if ok {
if string(column.Value) == col[rowIdx] {
tc.matchCount--
continue
}
t.Errorf("%s, column data didnt match, colId: %s, got: %s, want %s\n", tc.label, colId, string(column.Value), col[rowIdx])
}
}
}
}
}

if tc.matchCount != 0 {
t.Fatalf("%s, data didn't match after read for %d values", tc.label, tc.matchCount)
if err := validateData(ctx, tbl, tc.expectedFams, tc.csvData[tc.dataStartIdx-1], tc.csvData[tc.dataStartIdx:]); err != nil {
t.Fatalf("Read back validation error: %s", err)
}
}
}

0 comments on commit 77b3f18

Please sign in to comment.