From fa77efa1a1880ff89307d54cc7e9e8c09430e4e2 Mon Sep 17 00:00:00 2001 From: shollyman Date: Thu, 19 Nov 2020 16:20:55 -0800 Subject: [PATCH] feat(bigquery): expose hive partitioning options (#3240) * feat(bigquery): add HivePartitioningOptions This resuscitates https://code-review.googlesource.com/c/gocloud/+/43270 and updates it now that the service is properly advertising the fields such as RequirePartitionFilter. --- bigquery/external.go | 72 ++++++++++++++++++++++++++++++++++++ bigquery/external_test.go | 8 ++++ bigquery/integration_test.go | 53 ++++++++++++++++++++++++++ 3 files changed, 133 insertions(+) diff --git a/bigquery/external.go b/bigquery/external.go index e8d1fdee550..3f8647e568a 100644 --- a/bigquery/external.go +++ b/bigquery/external.go @@ -92,6 +92,10 @@ type ExternalDataConfig struct { // Additional options for CSV, GoogleSheets and Bigtable formats. Options ExternalDataConfigOptions + + // HivePartitioningOptions allows use of Hive partitioning based on the + // layout of objects in Google Cloud Storage. + HivePartitioningOptions *HivePartitioningOptions } func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { @@ -106,6 +110,9 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { if e.Schema != nil { q.Schema = e.Schema.toBQ() } + if e.HivePartitioningOptions != nil { + q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ() + } if e.Options != nil { e.Options.populateExternalDataConfig(&q) } @@ -134,6 +141,9 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi return nil, err } } + if q.HivePartitioningOptions != nil { + e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions) + } return e, nil } @@ -409,3 +419,65 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) { } return b, nil } + +// HivePartitioningMode is used in conjunction with HivePartitioningOptions. +type HivePartitioningMode string + +const ( + // AutoHivePartitioningMode automatically infers partitioning key and types. + AutoHivePartitioningMode HivePartitioningMode = "AUTO" + // StringHivePartitioningMode automatically infers partitioning keys and treats values as string. + StringHivePartitioningMode HivePartitioningMode = "STRINGS" + // CustomHivePartitioningMode allows custom definition of the external partitioning. + CustomHivePartitioningMode HivePartitioningMode = "CUSTOM" +) + +// HivePartitioningOptions defines the behavior of Hive partitioning +// when working with external data. +type HivePartitioningOptions struct { + + // Mode defines which hive partitioning mode to use when reading data. + Mode HivePartitioningMode + + // When hive partition detection is requested, a common prefix for + // all source uris should be supplied. The prefix must end immediately + // before the partition key encoding begins. + // + // For example, consider files following this data layout. + // gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro + // gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro + // + // When hive partitioning is requested with either AUTO or STRINGS + // detection, the common prefix can be either of + // gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing + // slash does not matter). + SourceURIPrefix string + + // If set to true, queries against this external table require + // a partition filter to be present that can perform partition + // elimination. Hive-partitioned load jobs with this field + // set to true will fail. + RequirePartitionFilter bool +} + +func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions { + if o == nil { + return nil + } + return &bq.HivePartitioningOptions{ + Mode: string(o.Mode), + SourceUriPrefix: o.SourceURIPrefix, + RequirePartitionFilter: o.RequirePartitionFilter, + } +} + +func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions { + if q == nil { + return nil + } + return &HivePartitioningOptions{ + Mode: HivePartitioningMode(q.Mode), + SourceURIPrefix: q.SourceUriPrefix, + RequirePartitionFilter: q.RequirePartitionFilter, + } +} diff --git a/bigquery/external_test.go b/bigquery/external_test.go index f64297fd1a0..222cf274999 100644 --- a/bigquery/external_test.go +++ b/bigquery/external_test.go @@ -48,6 +48,14 @@ func TestExternalDataConfig(t *testing.T) { Range: "sheet1!A1:Z10", }, }, + { + SourceFormat: Avro, + HivePartitioningOptions: &HivePartitioningOptions{ + Mode: AutoHivePartitioningMode, + SourceURIPrefix: "gs://somebucket/a/b/c", + RequirePartitionFilter: true, + }, + }, { SourceFormat: Bigtable, Options: &BigtableOptions{ diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 60959d0a0ca..e450e207ec5 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -1859,6 +1859,59 @@ func TestIntegration_LegacyQuery(t *testing.T) { } } +func TestIntegration_QueryExternalHivePartitioning(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + autoTable := dataset.Table(tableIDs.New()) + customTable := dataset.Table(tableIDs.New()) + + err := autoTable.Create(ctx, &TableMetadata{ + ExternalDataConfig: &ExternalDataConfig{ + SourceFormat: Parquet, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"}, + AutoDetect: true, + HivePartitioningOptions: &HivePartitioningOptions{ + Mode: AutoHivePartitioningMode, + SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/", + RequirePartitionFilter: true, + }, + }, + }) + if err != nil { + t.Fatalf("table.Create(auto): %v", err) + } + defer autoTable.Delete(ctx) + + err = customTable.Create(ctx, &TableMetadata{ + ExternalDataConfig: &ExternalDataConfig{ + SourceFormat: Parquet, + SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"}, + AutoDetect: true, + HivePartitioningOptions: &HivePartitioningOptions{ + Mode: CustomHivePartitioningMode, + SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/", + RequirePartitionFilter: true, + }, + }, + }) + if err != nil { + t.Fatalf("table.Create(custom): %v", err) + } + defer customTable.Delete(ctx) + + // Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected. + sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID) + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + t.Fatalf("Error querying: %v", err) + } + checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}}) +} + func TestIntegration_QueryParameters(t *testing.T) { if client == nil { t.Skip("Integration tests skipped")