Skip to content

Commit

Permalink
feat(bigquery): expose hive partitioning options (#3240)
Browse files Browse the repository at this point in the history
* feat(bigquery): add HivePartitioningOptions

This resuscitates https://code-review.googlesource.com/c/gocloud/+/43270
and updates it now that the service is properly advertising the fields
such as RequirePartitionFilter.
  • Loading branch information
shollyman committed Nov 20, 2020
1 parent ea3cde5 commit fa77efa
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
72 changes: 72 additions & 0 deletions bigquery/external.go
Expand Up @@ -92,6 +92,10 @@ type ExternalDataConfig struct {

// Additional options for CSV, GoogleSheets and Bigtable formats.
Options ExternalDataConfigOptions

// HivePartitioningOptions allows use of Hive partitioning based on the
// layout of objects in Google Cloud Storage.
HivePartitioningOptions *HivePartitioningOptions
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -106,6 +110,9 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
}
if e.HivePartitioningOptions != nil {
q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ()
}
if e.Options != nil {
e.Options.populateExternalDataConfig(&q)
}
Expand Down Expand Up @@ -134,6 +141,9 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
return nil, err
}
}
if q.HivePartitioningOptions != nil {
e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions)
}
return e, nil
}

Expand Down Expand Up @@ -409,3 +419,65 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
}
return b, nil
}

// HivePartitioningMode is used in conjunction with HivePartitioningOptions.
type HivePartitioningMode string

const (
// AutoHivePartitioningMode automatically infers partitioning key and types.
AutoHivePartitioningMode HivePartitioningMode = "AUTO"
// StringHivePartitioningMode automatically infers partitioning keys and treats values as string.
StringHivePartitioningMode HivePartitioningMode = "STRINGS"
// CustomHivePartitioningMode allows custom definition of the external partitioning.
CustomHivePartitioningMode HivePartitioningMode = "CUSTOM"
)

// HivePartitioningOptions defines the behavior of Hive partitioning
// when working with external data.
type HivePartitioningOptions struct {

// Mode defines which hive partitioning mode to use when reading data.
Mode HivePartitioningMode

// When hive partition detection is requested, a common prefix for
// all source uris should be supplied. The prefix must end immediately
// before the partition key encoding begins.
//
// For example, consider files following this data layout.
// gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro
// gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro
//
// When hive partitioning is requested with either AUTO or STRINGS
// detection, the common prefix can be either of
// gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing
// slash does not matter).
SourceURIPrefix string

// If set to true, queries against this external table require
// a partition filter to be present that can perform partition
// elimination. Hive-partitioned load jobs with this field
// set to true will fail.
RequirePartitionFilter bool
}

func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions {
if o == nil {
return nil
}
return &bq.HivePartitioningOptions{
Mode: string(o.Mode),
SourceUriPrefix: o.SourceURIPrefix,
RequirePartitionFilter: o.RequirePartitionFilter,
}
}

func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions {
if q == nil {
return nil
}
return &HivePartitioningOptions{
Mode: HivePartitioningMode(q.Mode),
SourceURIPrefix: q.SourceUriPrefix,
RequirePartitionFilter: q.RequirePartitionFilter,
}
}
8 changes: 8 additions & 0 deletions bigquery/external_test.go
Expand Up @@ -48,6 +48,14 @@ func TestExternalDataConfig(t *testing.T) {
Range: "sheet1!A1:Z10",
},
},
{
SourceFormat: Avro,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://somebucket/a/b/c",
RequirePartitionFilter: true,
},
},
{
SourceFormat: Bigtable,
Options: &BigtableOptions{
Expand Down
53 changes: 53 additions & 0 deletions bigquery/integration_test.go
Expand Up @@ -1859,6 +1859,59 @@ func TestIntegration_LegacyQuery(t *testing.T) {
}
}

func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

autoTable := dataset.Table(tableIDs.New())
customTable := dataset.Table(tableIDs.New())

err := autoTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(auto): %v", err)
}
defer autoTable.Delete(ctx)

err = customTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: CustomHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(custom): %v", err)
}
defer customTable.Delete(ctx)

// Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected.
sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("Error querying: %v", err)
}
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
}

func TestIntegration_QueryParameters(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down

0 comments on commit fa77efa

Please sign in to comment.