Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable/cbt): cbt 'import' cmd to parse a .csv file and write to CBT #5072

Merged
merged 16 commits into from Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
199 changes: 199 additions & 0 deletions bigtable/cmd/cbt/cbt.go 100644 → 100755
Expand Up @@ -32,6 +32,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"text/template"
"time"
Expand Down Expand Up @@ -409,6 +410,23 @@ var commands = []struct {
" Example: cbt help createtable",
Required: cbtconfig.NoneRequired,
},
{
Name: "import",
Desc: "Batch write many rows based on the input file",
do: doImport,
Usage: "cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]\n" +
" app-profile=<app-profile-id> The app profile ID to use for the request\n" +
" column-family=<family-name> The column family label to use\n" +
" batch-size=<500> The max number of rows per batch write request\n" +
" workers=<1> The number of worker threads\n\n" +
" Import data from a csv file into an existing cbt table that has the required column families.\n" +
" See <example.csv.github.com/cbt-import-sample.csv> for a sample .csv file and formatting.\n" +
" If no column family row is present, use the column-family flag to specify an existing family.\n\n" +
" Examples:\n" +
" cbt import csv-import-table cbt-import-sample.csv\n" +
" cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5\n",
Required: cbtconfig.ProjectAndInstanceRequired,
},
{
Name: "listinstances",
Desc: "List instances in a project",
Expand Down Expand Up @@ -1534,6 +1552,187 @@ func doDeleteAppProfile(ctx context.Context, args ...string) {
}
}

type importerArgs struct {
appProfile string
fam string
sz int
workers int
}

type safeReader struct {
mu sync.Mutex
r *csv.Reader
t int // total rows
}

func doImport(ctx context.Context, args ...string) {
ia, err := parseImporterArgs(ctx, args)
if err != nil {
log.Fatalf("error parsing importer args: %s", err)
}
f, err := os.Open(args[1])
if err != nil {
log.Fatalf("couldn't open the csv file: %s", err)
}

tbl := getClient(bigtable.ClientConfig{AppProfile: ia.appProfile}).Open(args[0])
r := csv.NewReader(f)
importCSV(ctx, tbl, r, ia)
}

func parseImporterArgs(ctx context.Context, args []string) (importerArgs, error) {
var err error
ia := importerArgs{
fam: "",
sz: 500,
workers: 1,
}
if len(args) < 2 {
return ia, fmt.Errorf("usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]")
}
for _, arg := range args[2:] {
switch {
case strings.HasPrefix(arg, "app-profile="):
ia.appProfile = strings.Split(arg, "=")[1]
case strings.HasPrefix(arg, "column-family="):
ia.fam = strings.Split(arg, "=")[1]
if ia.fam == "" {
return ia, fmt.Errorf("column-family cannot be ''")
}
case strings.HasPrefix(arg, "batch-size="):
ia.sz, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.sz <= 0 || ia.sz >= 100000 {
return ia, fmt.Errorf("batch-size must be > 0 and <= 100000")
}
case strings.HasPrefix(arg, "workers="):
ia.workers, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.workers <= 0 {
return ia, fmt.Errorf("workers must be > 0, err:%s", err)
}
}
}
return ia, nil
}

func importCSV(ctx context.Context, tbl *bigtable.Table, r *csv.Reader, ia importerArgs) {
fams, cols, err := parseCsvHeaders(r, ia.fam)
if err != nil {
log.Fatalf("error parsing headers: %s", err)
}
sr := safeReader{r: r}
ts := bigtable.Now()

var wg sync.WaitGroup
wg.Add(ia.workers)
for i := 0; i < ia.workers; i++ {
go func(w int) {
defer wg.Done()
if e := sr.parseAndWrite(ctx, tbl, fams, cols, ts, ia.sz, w); e != nil {
log.Fatalf("error: %s", e)
}
}(i)
}
wg.Wait()
log.Printf("Done importing %d rows.\n", sr.t)
}

func parseCsvHeaders(r *csv.Reader, family string) ([]string, []string, error) {
var err error
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
var fams, cols []string
if family == "" { // no column-family from flag, using first row
fams, err = r.Read()
if err != nil {
return nil, nil, fmt.Errorf("family header reader error:%s", err)
}
}
cols, err = r.Read() // column names are next row
if err != nil {
return nil, nil, fmt.Errorf("columns header reader error:%s", err)
}
if family != "" {
fams = make([]string, len(cols))
fams[1] = family
}
if len(fams) < 2 || len(cols) < 2 {
return fams, cols, fmt.Errorf("at least 2 columns are required (rowkey + data)")
}
if fams[0] != "" || cols[0] != "" {
return fams, cols, fmt.Errorf("the first column must be empty for column-family and column name rows")
}
if fams[1] == "" || cols[1] == "" {
return fams, cols, fmt.Errorf("the second column (first data column) must have values for column family and column name rows if present")
}
for i := range cols { // fill any blank column families with previous
if i > 0 && fams[i] == "" {
fams[i] = fams[i-1]
}
}
return fams, cols, nil
}

func batchWrite(ctx context.Context, tbl *bigtable.Table, rk []string, muts []*bigtable.Mutation, worker int) (int, error) {
log.Printf("[%d] Writing batch:: size: %d, firstRowKey: %s, lastRowKey: %s\n", worker, len(rk), rk[0], rk[len(rk)-1])
errors, err := tbl.ApplyBulk(ctx, rk, muts)
if err != nil {
return 0, fmt.Errorf("applying bulk mutations process error: %v", err)
}
if errors != nil {
return 0, fmt.Errorf("applying bulk mutations had %d errors, first:%v", len(errors), errors[0])

}
return len(rk), nil
}

func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fams, cols []string, ts bigtable.Timestamp, max, worker int) error {
var rowKey []string
var muts []*bigtable.Mutation
var c int
for {
sr.mu.Lock()
for len(rowKey) < max {
line, err := sr.r.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
mut := bigtable.NewMutation()
empty := true
for i, val := range line {
if i > 0 && val != "" {
mut.Set(fams[i], cols[i], ts, []byte(val))
empty = false
}
}
if empty {
log.Printf("[%d] RowKey '%s' has no mutations, skipping", worker, line[0])
continue
}
if line[0] == "" {
log.Printf("[%d] RowKey not present, skipping line", worker)
continue
}
rowKey = append(rowKey, line[0])
muts = append(muts, mut)
}
if len(rowKey) > 0 {
sr.mu.Unlock()
n, err := batchWrite(ctx, tbl, rowKey, muts, worker)
if err != nil {
return err
}
c += n
rowKey = rowKey[:0]
muts = muts[:0]
continue
}
sr.t += c
sr.mu.Unlock()
return nil
}
}

// parseDuration parses a duration string.
// It is similar to Go's time.ParseDuration, except with a different set of supported units,
// and only simple formats supported.
Expand Down