Skip to content

Commit

Permalink
feat(bigquery): expose ParquetOptions for loads and external tables (#…
Browse files Browse the repository at this point in the history
…4016)

Adapts ParquetOptions to behave similarly to the various other
format options like CSV/BigTable/Sheets.

Also, refactors the FileConfig tests to be table-test style to make it
easier to test multiple configs.
  • Loading branch information
shollyman committed May 3, 2021
1 parent a825ef4 commit f9c4ccb
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 44 deletions.
34 changes: 33 additions & 1 deletion bigquery/external.go
Expand Up @@ -90,7 +90,7 @@ type ExternalDataConfig struct {
// when reading data.
MaxBadRecords int64

// Additional options for CSV, GoogleSheets and Bigtable formats.
// Additional options for CSV, GoogleSheets, Bigtable, and Parquet formats.
Options ExternalDataConfigOptions

// HivePartitioningOptions allows use of Hive partitioning based on the
Expand Down Expand Up @@ -139,6 +139,8 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
if err != nil {
return nil, err
}
case q.ParquetOptions != nil:
e.Options = bqToParquetOptions(q.ParquetOptions)
}
return e, nil
}
Expand Down Expand Up @@ -416,6 +418,36 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
return b, nil
}

// ParquetOptions are additional options for Parquet external data sources.
type ParquetOptions struct {
// EnumAsString indicates whether to infer Parquet ENUM logical type as
// STRING instead of BYTES by default.
EnumAsString bool

// EnableListInference indicates whether to use schema inference
// specifically for Parquet LIST logical type.
EnableListInference bool
}

func (o *ParquetOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
if o != nil {
c.ParquetOptions = &bq.ParquetOptions{
EnumAsString: o.EnumAsString,
EnableListInference: o.EnableListInference,
}
}
}

func bqToParquetOptions(q *bq.ParquetOptions) *ParquetOptions {
if q == nil {
return nil
}
return &ParquetOptions{
EnumAsString: q.EnumAsString,
EnableListInference: q.EnableListInference,
}
}

// HivePartitioningMode is used in conjunction with HivePartitioningOptions.
type HivePartitioningMode string

Expand Down
7 changes: 7 additions & 0 deletions bigquery/external_test.go
Expand Up @@ -80,6 +80,13 @@ func TestExternalDataConfig(t *testing.T) {
},
},
},
{
SourceFormat: Parquet,
Options: &ParquetOptions{
EnumAsString: true,
EnableListInference: true,
},
},
} {
q := want.toBQ()
got, err := bqToExternalDataConfig(&q)
Expand Down
15 changes: 15 additions & 0 deletions bigquery/file.go
Expand Up @@ -74,6 +74,9 @@ type FileConfig struct {

// Additional options for CSV files.
CSVOptions

// Additional options for Parquet files.
ParquetOptions *ParquetOptions
}

func (fc *FileConfig) populateLoadConfig(conf *bq.JobConfigurationLoad) {
Expand All @@ -89,6 +92,12 @@ func (fc *FileConfig) populateLoadConfig(conf *bq.JobConfigurationLoad) {
if fc.Schema != nil {
conf.Schema = fc.Schema.toBQ()
}
if fc.ParquetOptions != nil {
conf.ParquetOptions = &bq.ParquetOptions{
EnumAsString: fc.ParquetOptions.EnumAsString,
EnableListInference: fc.ParquetOptions.EnableListInference,
}
}
conf.Quote = fc.quote()
}

Expand Down Expand Up @@ -122,6 +131,12 @@ func (fc *FileConfig) populateExternalDataConfig(conf *bq.ExternalDataConfigurat
if format == CSV {
fc.CSVOptions.populateExternalDataConfig(conf)
}
if fc.ParquetOptions != nil {
conf.ParquetOptions = &bq.ParquetOptions{
EnumAsString: fc.ParquetOptions.EnumAsString,
EnableListInference: fc.ParquetOptions.EnableListInference,
}
}
}

// Encoding specifies the character encoding of data to be loaded into BigQuery.
Expand Down
161 changes: 118 additions & 43 deletions bigquery/file_test.go
Expand Up @@ -17,7 +17,6 @@ package bigquery
import (
"testing"

"cloud.google.com/go/internal/pretty"
"cloud.google.com/go/internal/testutil"
bq "google.golang.org/api/bigquery/v2"
)
Expand Down Expand Up @@ -45,54 +44,130 @@ var (
)

func TestFileConfigPopulateLoadConfig(t *testing.T) {
want := &bq.JobConfigurationLoad{
SourceFormat: "CSV",
FieldDelimiter: "\t",
SkipLeadingRows: 8,
AllowJaggedRows: true,
AllowQuotedNewlines: true,
Autodetect: true,
Encoding: "UTF-8",
MaxBadRecords: 7,
IgnoreUnknownValues: true,
Schema: &bq.TableSchema{
Fields: []*bq.TableFieldSchema{
bqStringFieldSchema(),
bqNestedFieldSchema(),
}},
Quote: &hyphen,
testcases := []struct {
description string
fileConfig *FileConfig
want *bq.JobConfigurationLoad
}{
{
description: "default json",
fileConfig: &FileConfig{
SourceFormat: JSON,
},
want: &bq.JobConfigurationLoad{
SourceFormat: "NEWLINE_DELIMITED_JSON",
},
},
{
description: "csv",
fileConfig: &fc,
want: &bq.JobConfigurationLoad{
SourceFormat: "CSV",
FieldDelimiter: "\t",
SkipLeadingRows: 8,
AllowJaggedRows: true,
AllowQuotedNewlines: true,
Autodetect: true,
Encoding: "UTF-8",
MaxBadRecords: 7,
IgnoreUnknownValues: true,
Schema: &bq.TableSchema{
Fields: []*bq.TableFieldSchema{
bqStringFieldSchema(),
bqNestedFieldSchema(),
}},
Quote: &hyphen,
},
},
{
description: "parquet",
fileConfig: &FileConfig{
SourceFormat: Parquet,
ParquetOptions: &ParquetOptions{
EnumAsString: true,
EnableListInference: true,
},
},
want: &bq.JobConfigurationLoad{
SourceFormat: "PARQUET",
ParquetOptions: &bq.ParquetOptions{
EnumAsString: true,
EnableListInference: true,
},
},
},
}
got := &bq.JobConfigurationLoad{}
fc.populateLoadConfig(got)
if !testutil.Equal(got, want) {
t.Errorf("got:\n%v\nwant:\n%v", pretty.Value(got), pretty.Value(want))
for _, tc := range testcases {
got := &bq.JobConfigurationLoad{}
tc.fileConfig.populateLoadConfig(got)
if diff := testutil.Diff(got, tc.want); diff != "" {
t.Errorf("case %s, got=-, want=+:\n%s", tc.description, diff)
}
}
}

func TestFileConfigPopulateExternalDataConfig(t *testing.T) {
got := &bq.ExternalDataConfiguration{}
fc.populateExternalDataConfig(got)

want := &bq.ExternalDataConfiguration{
SourceFormat: "CSV",
Autodetect: true,
MaxBadRecords: 7,
IgnoreUnknownValues: true,
Schema: &bq.TableSchema{
Fields: []*bq.TableFieldSchema{
bqStringFieldSchema(),
bqNestedFieldSchema(),
}},
CsvOptions: &bq.CsvOptions{
AllowJaggedRows: true,
AllowQuotedNewlines: true,
Encoding: "UTF-8",
FieldDelimiter: "\t",
Quote: &hyphen,
SkipLeadingRows: 8,
testcases := []struct {
description string
fileConfig *FileConfig
want *bq.ExternalDataConfiguration
}{
{
description: "json defaults",
fileConfig: &FileConfig{
SourceFormat: JSON,
},
want: &bq.ExternalDataConfiguration{
SourceFormat: "NEWLINE_DELIMITED_JSON",
},
},
{
description: "csv fileconfig",
fileConfig: &fc,
want: &bq.ExternalDataConfiguration{
SourceFormat: "CSV",
Autodetect: true,
MaxBadRecords: 7,
IgnoreUnknownValues: true,
Schema: &bq.TableSchema{
Fields: []*bq.TableFieldSchema{
bqStringFieldSchema(),
bqNestedFieldSchema(),
}},
CsvOptions: &bq.CsvOptions{
AllowJaggedRows: true,
AllowQuotedNewlines: true,
Encoding: "UTF-8",
FieldDelimiter: "\t",
Quote: &hyphen,
SkipLeadingRows: 8,
},
},
},
{
description: "parquet",
fileConfig: &FileConfig{
SourceFormat: Parquet,
ParquetOptions: &ParquetOptions{
EnumAsString: true,
EnableListInference: true,
},
},
want: &bq.ExternalDataConfiguration{
SourceFormat: "PARQUET",
ParquetOptions: &bq.ParquetOptions{
EnumAsString: true,
EnableListInference: true,
},
},
},
}
if diff := testutil.Diff(got, want); diff != "" {
t.Errorf("got=-, want=+:\n%s", diff)
for _, tc := range testcases {
got := &bq.ExternalDataConfiguration{}
tc.fileConfig.populateExternalDataConfig(got)
if diff := testutil.Diff(got, tc.want); diff != "" {
t.Errorf("case %s, got=-, want=+:\n%s", tc.description, diff)
}
}

}

0 comments on commit f9c4ccb

Please sign in to comment.