diff --git a/bigtable/cmd/cbt/cbt.go b/bigtable/cmd/cbt/cbt.go old mode 100644 new mode 100755 index a68ddc25cc1..e69d4664738 --- a/bigtable/cmd/cbt/cbt.go +++ b/bigtable/cmd/cbt/cbt.go @@ -32,6 +32,7 @@ import ( "sort" "strconv" "strings" + "sync" "text/tabwriter" "text/template" "time" @@ -409,6 +410,23 @@ var commands = []struct { " Example: cbt help createtable", Required: cbtconfig.NoneRequired, }, + { + Name: "import", + Desc: "Batch write many rows based on the input file", + do: doImport, + Usage: "cbt import [app-profile=] [column-family=] [batch-size=<500>] [workers=<1>]\n" + + " app-profile= The app profile ID to use for the request\n" + + " column-family= The column family label to use\n" + + " batch-size=<500> The max number of rows per batch write request\n" + + " workers=<1> The number of worker threads\n\n" + + " Import data from a csv file into an existing cbt table that has the required column families.\n" + + " See for a sample .csv file and formatting.\n" + + " If no column family row is present, use the column-family flag to specify an existing family.\n\n" + + " Examples:\n" + + " cbt import csv-import-table cbt-import-sample.csv\n" + + " cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5\n", + Required: cbtconfig.ProjectAndInstanceRequired, + }, { Name: "listinstances", Desc: "List instances in a project", @@ -1534,6 +1552,187 @@ func doDeleteAppProfile(ctx context.Context, args ...string) { } } +type importerArgs struct { + appProfile string + fam string + sz int + workers int +} + +type safeReader struct { + mu sync.Mutex + r *csv.Reader + t int // total rows +} + +func doImport(ctx context.Context, args ...string) { + ia, err := parseImporterArgs(ctx, args) + if err != nil { + log.Fatalf("error parsing importer args: %s", err) + } + f, err := os.Open(args[1]) + if err != nil { + log.Fatalf("couldn't open the csv file: %s", err) + } + + tbl := getClient(bigtable.ClientConfig{AppProfile: ia.appProfile}).Open(args[0]) + r := csv.NewReader(f) + importCSV(ctx, tbl, r, ia) +} + +func parseImporterArgs(ctx context.Context, args []string) (importerArgs, error) { + var err error + ia := importerArgs{ + fam: "", + sz: 500, + workers: 1, + } + if len(args) < 2 { + return ia, fmt.Errorf("usage: cbt import [app-profile=] [column-family=] [batch-size=<500>] [workers=<1>]") + } + for _, arg := range args[2:] { + switch { + case strings.HasPrefix(arg, "app-profile="): + ia.appProfile = strings.Split(arg, "=")[1] + case strings.HasPrefix(arg, "column-family="): + ia.fam = strings.Split(arg, "=")[1] + if ia.fam == "" { + return ia, fmt.Errorf("column-family cannot be ''") + } + case strings.HasPrefix(arg, "batch-size="): + ia.sz, err = strconv.Atoi(strings.Split(arg, "=")[1]) + if err != nil || ia.sz <= 0 || ia.sz >= 100000 { + return ia, fmt.Errorf("batch-size must be > 0 and <= 100000") + } + case strings.HasPrefix(arg, "workers="): + ia.workers, err = strconv.Atoi(strings.Split(arg, "=")[1]) + if err != nil || ia.workers <= 0 { + return ia, fmt.Errorf("workers must be > 0, err:%s", err) + } + } + } + return ia, nil +} + +func importCSV(ctx context.Context, tbl *bigtable.Table, r *csv.Reader, ia importerArgs) { + fams, cols, err := parseCsvHeaders(r, ia.fam) + if err != nil { + log.Fatalf("error parsing headers: %s", err) + } + sr := safeReader{r: r} + ts := bigtable.Now() + + var wg sync.WaitGroup + wg.Add(ia.workers) + for i := 0; i < ia.workers; i++ { + go func(w int) { + defer wg.Done() + if e := sr.parseAndWrite(ctx, tbl, fams, cols, ts, ia.sz, w); e != nil { + log.Fatalf("error: %s", e) + } + }(i) + } + wg.Wait() + log.Printf("Done importing %d rows.\n", sr.t) +} + +func parseCsvHeaders(r *csv.Reader, family string) ([]string, []string, error) { + var err error + var fams, cols []string + if family == "" { // no column-family from flag, using first row + fams, err = r.Read() + if err != nil { + return nil, nil, fmt.Errorf("family header reader error:%s", err) + } + } + cols, err = r.Read() // column names are next row + if err != nil { + return nil, nil, fmt.Errorf("columns header reader error:%s", err) + } + if family != "" { + fams = make([]string, len(cols)) + fams[1] = family + } + if len(fams) < 2 || len(cols) < 2 { + return fams, cols, fmt.Errorf("at least 2 columns are required (rowkey + data)") + } + if fams[0] != "" || cols[0] != "" { + return fams, cols, fmt.Errorf("the first column must be empty for column-family and column name rows") + } + if fams[1] == "" || cols[1] == "" { + return fams, cols, fmt.Errorf("the second column (first data column) must have values for column family and column name rows if present") + } + for i := range cols { // fill any blank column families with previous + if i > 0 && fams[i] == "" { + fams[i] = fams[i-1] + } + } + return fams, cols, nil +} + +func batchWrite(ctx context.Context, tbl *bigtable.Table, rk []string, muts []*bigtable.Mutation, worker int) (int, error) { + log.Printf("[%d] Writing batch:: size: %d, firstRowKey: %s, lastRowKey: %s\n", worker, len(rk), rk[0], rk[len(rk)-1]) + errors, err := tbl.ApplyBulk(ctx, rk, muts) + if err != nil { + return 0, fmt.Errorf("applying bulk mutations process error: %v", err) + } + if errors != nil { + return 0, fmt.Errorf("applying bulk mutations had %d errors, first:%v", len(errors), errors[0]) + + } + return len(rk), nil +} + +func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fams, cols []string, ts bigtable.Timestamp, max, worker int) error { + var rowKey []string + var muts []*bigtable.Mutation + var c int + for { + sr.mu.Lock() + for len(rowKey) < max { + line, err := sr.r.Read() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + mut := bigtable.NewMutation() + empty := true + for i, val := range line { + if i > 0 && val != "" { + mut.Set(fams[i], cols[i], ts, []byte(val)) + empty = false + } + } + if empty { + log.Printf("[%d] RowKey '%s' has no mutations, skipping", worker, line[0]) + continue + } + if line[0] == "" { + log.Printf("[%d] RowKey not present, skipping line", worker) + continue + } + rowKey = append(rowKey, line[0]) + muts = append(muts, mut) + } + if len(rowKey) > 0 { + sr.mu.Unlock() + n, err := batchWrite(ctx, tbl, rowKey, muts, worker) + if err != nil { + return err + } + c += n + rowKey = rowKey[:0] + muts = muts[:0] + continue + } + sr.t += c + sr.mu.Unlock() + return nil + } +} + // parseDuration parses a duration string. // It is similar to Go's time.ParseDuration, except with a different set of supported units, // and only simple formats supported. diff --git a/bigtable/cmd/cbt/cbt_test.go b/bigtable/cmd/cbt/cbt_test.go index e7c6ea2cedd..fa8e4013705 100644 --- a/bigtable/cmd/cbt/cbt_test.go +++ b/bigtable/cmd/cbt/cbt_test.go @@ -15,12 +15,20 @@ package main import ( + "bytes" + "context" + "encoding/csv" + "fmt" + "strings" "testing" "time" "cloud.google.com/go/bigtable" + "cloud.google.com/go/bigtable/bttest" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" + "google.golang.org/api/option" + "google.golang.org/grpc" ) func TestParseDuration(t *testing.T) { @@ -171,3 +179,408 @@ func TestParseColumnsFilter(t *testing.T) { } } } + +// Check if we get a substring of the expected error. +// Returns "" if so, else returns the expected substring and error. +func matchesExpectedError(want string, err error) string { + if err != nil { + got := err.Error() + if want == "" || !strings.Contains(got, want) { + return fmt.Sprintf("expected error substr:%s, got:%s", want, got) + } + } else if want != "" { + return fmt.Sprintf("expected error substr:%s", want) + } + return "" +} + +func TestCsvImporterArgs(t *testing.T) { + tests := []struct { + in []string + out importerArgs + err string + }{ + {in: []string{"my-table", "my-file.csv"}, out: importerArgs{"", "", 500, 1}}, + {in: []string{"my-table", "my-file.csv", "app-profile="}, out: importerArgs{"", "", 500, 1}}, + {in: []string{"my-table", "my-file.csv", "app-profile=my-ap", "column-family=my-family", "batch-size=100", "workers=20"}, + out: importerArgs{"my-ap", "my-family", 100, 20}}, + + {in: []string{}, err: "usage: cbt import [app-profile=] [column-family=] [batch-size=<500>] [workers=<1>]"}, + {in: []string{"my-table", "my-file.csv", "column-family="}, err: "column-family cannot be ''"}, + {in: []string{"my-table", "my-file.csv", "batch-size=-5"}, err: "batch-size must be > 0 and <= 100000"}, + {in: []string{"my-table", "my-file.csv", "batch-size=5000000"}, err: "batch-size must be > 0 and <= 100000"}, + {in: []string{"my-table", "my-file.csv", "batch-size=nan"}, err: "batch-size must be > 0 and <= 100000"}, + {in: []string{"my-table", "my-file.csv", "batch-size="}, err: "batch-size must be > 0 and <= 100000"}, + {in: []string{"my-table", "my-file.csv", "workers=0"}, err: "workers must be > 0, err:%!s()"}, + {in: []string{"my-table", "my-file.csv", "workers=nan"}, err: "workers must be > 0, err:strconv.Atoi: parsing \"nan\": invalid syntax"}, + {in: []string{"my-table", "my-file.csv", "workers="}, err: "workers must be > 0, err:strconv.Atoi: parsing \"\": invalid syntax"}, + } + for _, tc := range tests { + got, err := parseImporterArgs(context.Background(), tc.in) + if e := matchesExpectedError(tc.err, err); e != "" { + t.Errorf("%s", e) + continue + } + if tc.err != "" { + continue // received expected error, do not parse below + } + if got.appProfile != tc.out.appProfile || + got.fam != tc.out.fam || + got.sz != tc.out.sz || + got.workers != tc.out.workers { + t.Errorf("parseImportArgs(%q) did not fail, out: %q", tc.in, got) + } + } +} + +func transformToCsvBuffer(data [][]string) ([]byte, error) { + if len(data) == 0 { + return nil, fmt.Errorf("Data cannot be empty") + } + var buf bytes.Buffer + csvWriter := csv.NewWriter(&buf) + if err := csvWriter.WriteAll(data); err != nil { + return nil, err + } + csvWriter.Flush() + if err := csvWriter.Error(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func TestCsvHeaderParser(t *testing.T) { + tests := []struct { + label string + iData [][]string + iFam string + oFams []string + oCols []string + nextLine []string + err string + }{ + {label: "extend-family-gap", + iData: [][]string{{"", "my-family", "", "my-family-2"}, {"", "col-1", "col-2", "col-3"}, {"rk-1", "A", "", ""}}, + iFam: "", + oFams: []string{"", "my-family", "my-family", "my-family-2"}, + oCols: []string{"", "col-1", "col-2", "col-3"}, + nextLine: []string{"rk-1", "A", "", ""}}, + {label: "handle-family-arg", + iData: [][]string{{"", "col-1", "col-2"}, {"rk-1", "A", ""}}, + iFam: "arg-family", + oFams: []string{"", "arg-family", "arg-family"}, + oCols: []string{"", "col-1", "col-2"}, + nextLine: []string{"rk-1", "A", ""}}, + + {label: "eof-header-family", + iData: [][]string{{""}}, + iFam: "", + err: "family header reader error:EOF"}, + {label: "eof-header-column", + iData: [][]string{{""}, {""}}, + iFam: "arg-family", + err: "columns header reader error:EOF"}, + {label: "rowkey-in-header-row", + iData: [][]string{{"ABC", "my-family", ""}}, + iFam: "arg-family", + err: "the first column must be empty for column-family and column name rows"}, + {label: "blank-first-headers", + iData: [][]string{{"", "", ""}}, + iFam: "arg-family", + err: "the second column (first data column) must have values for column family and column name rows if present"}, + } + + for _, tc := range tests { + // create in memory csv like file + byteData, err := transformToCsvBuffer(tc.iData) + if err != nil { + t.Fatal(err) + } + reader := csv.NewReader(bytes.NewReader(byteData)) + + fams, cols, err := parseCsvHeaders(reader, tc.iFam) + if e := matchesExpectedError(tc.err, err); e != "" { + t.Errorf("%s %s", tc.label, e) + continue + } + if tc.err != "" { + continue // received expected error, do not parse below + } + + line, _ := reader.Read() + if err != nil { + t.Errorf("Next line for reader error, got: %q, expect: %q, error:%s", line, tc.nextLine, err) + continue + } + if len(fams) != len(tc.oFams) || + len(cols) != len(tc.oCols) || + len(line) != len(tc.nextLine) { + t.Errorf("parseCsvHeaders() did not fail, incorrect output sizes found, fams: %d, cols:%d, line:%d", len(fams), len(cols), len(line)) + continue + } + for i, f := range fams { + if f != tc.oFams[i] { + t.Errorf("Incorrect column families idx:%d, got: %q, want %q", i, fams[i], tc.oFams[i]) + continue + } + } + for i, c := range cols { + if c != tc.oCols[i] { + t.Errorf("parseCsvHeaders() did not fail for column names idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i]) + continue + } + } + for i, v := range line { + if v != tc.nextLine[i] { + t.Errorf("parseCsvHeaders() did not fail for next line idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i]) + continue + } + } + } +} + +func setupEmulator(t *testing.T, tables, families []string) (context.Context, *bigtable.Client) { + srv, err := bttest.NewServer("localhost:0") + if err != nil { + t.Fatalf("Error starting bttest server: %s", err) + } + + ctx := context.Background() + + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + t.Fatalf("Error %s", err) + } + + proj, instance := "proj", "instance" + adminClient, err := bigtable.NewAdminClient(ctx, proj, instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("Error %s", err) + } + + for _, ta := range tables { + if err = adminClient.CreateTable(ctx, ta); err != nil { + t.Fatalf("Error %s", err) + } + for _, f := range families { + if err = adminClient.CreateColumnFamily(ctx, ta, f); err != nil { + t.Fatalf("Error %s", err) + } + } + } + + client, err := bigtable.NewClient(ctx, proj, instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("Error %s", err) + } + + return ctx, client +} + +func validateData(ctx context.Context, tbl *bigtable.Table, fams, cols []string, rowData [][]string) error { + // vaildate table entries, valMap["rowkey:family:column"] = mutation value + valMap := make(map[string]string) + for _, row := range rowData { + for i, val := range row { + if i > 0 && val != "" { + valMap[row[0]+":"+fams[i]+":"+cols[i]] = val + } + } + } + for _, data := range rowData { + row, err := tbl.ReadRow(ctx, data[0]) + if err != nil { + return err + } + for _, cf := range row { + for _, column := range cf { + k := data[0] + ":" + string(column.Column) + v, ok := valMap[k] + if ok && v == string(column.Value) { + delete(valMap, k) + } + } + } + } + if len(valMap) != 0 { + return fmt.Errorf("Data didn't match after read, not found %v", valMap) + } + return nil +} + +func TestCsvParseAndWrite(t *testing.T) { + ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "my-family-2"}) + + tbl := client.Open("my-table") + fams := []string{"", "my-family", "my-family-2"} + cols := []string{"", "col-1", "col-2"} + rowData := [][]string{ + {"rk-0", "A", "B"}, + {"rk-1", "", "C"}, + } + + byteData, err := transformToCsvBuffer(rowData) + if err != nil { + t.Fatal(err) + } + reader := csv.NewReader(bytes.NewReader(byteData)) + + sr := safeReader{r: reader} + if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil { + t.Fatalf("parseAndWrite() failed unexpectedly, error:%s", err) + } + + if err := validateData(ctx, tbl, fams, cols, rowData); err != nil { + t.Fatalf("Read back validation error:%s", err) + } +} + +func TestCsvParseAndWriteBadFamily(t *testing.T) { + ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"}) + + tbl := client.Open("my-table") + fams := []string{"", "my-family", "not-my-family"} + cols := []string{"", "col-1", "col-2"} + rowData := [][]string{ + {"rk-0", "A", "B"}, + {"rk-1", "", "C"}, + } + + byteData, err := transformToCsvBuffer(rowData) + if err != nil { + t.Fatal(err) + } + reader := csv.NewReader(bytes.NewReader(byteData)) + + sr := safeReader{r: reader} + if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err == nil { + t.Fatalf("parseAndWrite() should have failed with non-existant column family") + } +} + +func TestCsvParseAndWriteDuplicateRowkeys(t *testing.T) { + ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"}) + + tbl := client.Open("my-table") + fams := []string{"", "my-family", "my-family"} + cols := []string{"", "col-1", "col-2"} + rowData := [][]string{ + {"rk-0", "A", ""}, + {"rk-0", "", "B"}, + {"rk-0", "C", ""}, + } + + byteData, err := transformToCsvBuffer(rowData) + if err != nil { + t.Fatal(err) + } + reader := csv.NewReader(bytes.NewReader(byteData)) + + sr := safeReader{r: reader} + if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil { + t.Fatalf("parseAndWrite() should not have failed for duplicate rowkeys: %s", err) + } + + // the "A" not present result is expected, the emulator only keeps 1 version + valMap := map[string]bool{"rk-0:my-family:col-2:B": true, "rk-0:my-family:col-1:C": true} + row, err := tbl.ReadRow(ctx, "rk-0") + if err != nil { + t.Errorf("error %s", err) + } + for _, cf := range row { // each column family in row + for _, column := range cf { // each cf:column, aka each mutation + k := "rk-0:" + string(column.Column) + ":" + string(column.Value) + _, ok := valMap[k] + if ok { + delete(valMap, k) + continue + } + t.Errorf("row data not found for %s\n", k) + } + } + + if len(valMap) != 0 { + t.Fatalf("values were not present in table: %v", valMap) + } +} + +func TestCsvToCbt(t *testing.T) { + tests := []struct { + label string + ia importerArgs + csvData [][]string + expectedFams []string + dataStartIdx int + }{ + { + label: "has-column-families", + ia: importerArgs{fam: "", sz: 1, workers: 1}, + csvData: [][]string{ + {"", "my-family", ""}, + {"", "col-1", "col-2"}, + {"rk-0", "A", ""}, + {"rk-1", "", "B"}, + {"rk-2", "", ""}, + {"rk-3", "C", ""}, + }, + expectedFams: []string{"", "my-family", "my-family"}, + dataStartIdx: 2, + }, + { + label: "no-column-families", + ia: importerArgs{fam: "arg-family", sz: 1, workers: 1}, + csvData: [][]string{ + {"", "col-1", "col-2"}, + {"rk-0", "A", ""}, + {"rk-1", "", "B"}, + {"rk-2", "", ""}, + {"rk-3", "C", "D"}, + }, + expectedFams: []string{"", "arg-family", "arg-family"}, + dataStartIdx: 1, + }, + { + label: "larger-batches", + ia: importerArgs{fam: "arg-family", sz: 100, workers: 1}, + csvData: [][]string{ + {"", "col-1", "col-2"}, + {"rk-0", "A", ""}, + {"rk-1", "", "B"}, + {"rk-2", "", ""}, + {"rk-3", "C", "D"}, + }, + expectedFams: []string{"", "arg-family", "arg-family"}, + dataStartIdx: 1, + }, + { + label: "many-workers", + ia: importerArgs{fam: "arg-family", sz: 1, workers: 20}, + csvData: [][]string{ + {"", "col-1", "col-2"}, + {"rk-0", "A", ""}, + {"rk-1", "", "B"}, + {"rk-2", "", ""}, + {"rk-3", "C", "D"}, + }, + expectedFams: []string{"", "arg-family", "arg-family"}, + dataStartIdx: 1, + }, + } + + for _, tc := range tests { + ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "arg-family"}) + tbl := client.Open("my-table") + + byteData, err := transformToCsvBuffer(tc.csvData) + if err != nil { + t.Fatal(err) + } + reader := csv.NewReader(bytes.NewReader(byteData)) + + importCSV(ctx, tbl, reader, tc.ia) + + if err := validateData(ctx, tbl, tc.expectedFams, tc.csvData[tc.dataStartIdx-1], tc.csvData[tc.dataStartIdx:]); err != nil { + t.Fatalf("Read back validation error: %s", err) + } + } +} diff --git a/bigtable/cmd/cbt/cbtdoc.go b/bigtable/cmd/cbt/cbtdoc.go index 4dc0f9b3a99..eb700f4c32d 100644 --- a/bigtable/cmd/cbt/cbtdoc.go +++ b/bigtable/cmd/cbt/cbtdoc.go @@ -42,6 +42,7 @@ The commands are: deletetable Delete a table doc Print godoc-suitable documentation for cbt help Print help text + import Batch write many rows based on the input file listinstances List instances in a project listclusters List clusters in an instance lookup Read from a single row @@ -274,6 +275,27 @@ Usage: +Batch write many rows based on the input file + +Usage: + cbt import [app-profile=] [column-family=] [batch-size=<500>] [workers=<1>] + app-profile= The app profile ID to use for the request + column-family= The column family label to use + batch-size=<500> The max number of rows per batch write request + workers=<1> The number of worker threads + + Import data from a csv file into an existing cbt table that has the required column families. + See for a sample .csv file and formatting. + If no column family row is present, use the column-family flag to specify an existing family. + + Examples: + cbt import csv-import-table cbt-import-sample.csv + cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5 + + + + + List instances in a project Usage: