Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into single_publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Nov 24, 2020
2 parents 40aca4e + 200e261 commit 2dc00b4
Show file tree
Hide file tree
Showing 18 changed files with 839 additions and 171 deletions.
72 changes: 72 additions & 0 deletions bigquery/external.go
Expand Up @@ -92,6 +92,10 @@ type ExternalDataConfig struct {

// Additional options for CSV, GoogleSheets and Bigtable formats.
Options ExternalDataConfigOptions

// HivePartitioningOptions allows use of Hive partitioning based on the
// layout of objects in Google Cloud Storage.
HivePartitioningOptions *HivePartitioningOptions
}

func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
Expand All @@ -106,6 +110,9 @@ func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
if e.Schema != nil {
q.Schema = e.Schema.toBQ()
}
if e.HivePartitioningOptions != nil {
q.HivePartitioningOptions = e.HivePartitioningOptions.toBQ()
}
if e.Options != nil {
e.Options.populateExternalDataConfig(&q)
}
Expand Down Expand Up @@ -134,6 +141,9 @@ func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfi
return nil, err
}
}
if q.HivePartitioningOptions != nil {
e.HivePartitioningOptions = bqToHivePartitioningOptions(q.HivePartitioningOptions)
}
return e, nil
}

Expand Down Expand Up @@ -409,3 +419,65 @@ func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
}
return b, nil
}

// HivePartitioningMode is used in conjunction with HivePartitioningOptions.
type HivePartitioningMode string

const (
// AutoHivePartitioningMode automatically infers partitioning key and types.
AutoHivePartitioningMode HivePartitioningMode = "AUTO"
// StringHivePartitioningMode automatically infers partitioning keys and treats values as string.
StringHivePartitioningMode HivePartitioningMode = "STRINGS"
// CustomHivePartitioningMode allows custom definition of the external partitioning.
CustomHivePartitioningMode HivePartitioningMode = "CUSTOM"
)

// HivePartitioningOptions defines the behavior of Hive partitioning
// when working with external data.
type HivePartitioningOptions struct {

// Mode defines which hive partitioning mode to use when reading data.
Mode HivePartitioningMode

// When hive partition detection is requested, a common prefix for
// all source uris should be supplied. The prefix must end immediately
// before the partition key encoding begins.
//
// For example, consider files following this data layout.
// gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro
// gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro
//
// When hive partitioning is requested with either AUTO or STRINGS
// detection, the common prefix can be either of
// gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing
// slash does not matter).
SourceURIPrefix string

// If set to true, queries against this external table require
// a partition filter to be present that can perform partition
// elimination. Hive-partitioned load jobs with this field
// set to true will fail.
RequirePartitionFilter bool
}

func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions {
if o == nil {
return nil
}
return &bq.HivePartitioningOptions{
Mode: string(o.Mode),
SourceUriPrefix: o.SourceURIPrefix,
RequirePartitionFilter: o.RequirePartitionFilter,
}
}

func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions {
if q == nil {
return nil
}
return &HivePartitioningOptions{
Mode: HivePartitioningMode(q.Mode),
SourceURIPrefix: q.SourceUriPrefix,
RequirePartitionFilter: q.RequirePartitionFilter,
}
}
8 changes: 8 additions & 0 deletions bigquery/external_test.go
Expand Up @@ -48,6 +48,14 @@ func TestExternalDataConfig(t *testing.T) {
Range: "sheet1!A1:Z10",
},
},
{
SourceFormat: Avro,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://somebucket/a/b/c",
RequirePartitionFilter: true,
},
},
{
SourceFormat: Bigtable,
Options: &BigtableOptions{
Expand Down
81 changes: 68 additions & 13 deletions bigquery/integration_test.go
Expand Up @@ -1434,6 +1434,7 @@ func TestIntegration_InsertAndReadNullable(t *testing.T) {
ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
cdt := civil.DateTime{Date: testDate, Time: ctm}
rat := big.NewRat(33, 100)
rat2 := big.NewRat(66, 100)
geo := "POINT(-122.198939 47.669865)"

// Nil fields in the struct.
Expand All @@ -1455,20 +1456,21 @@ func TestIntegration_InsertAndReadNullable(t *testing.T) {

// Populate the struct with values.
testInsertAndReadNullable(t, testStructNullable{
String: NullString{"x", true},
Bytes: []byte{1, 2, 3},
Integer: NullInt64{1, true},
Float: NullFloat64{2.3, true},
Boolean: NullBool{true, true},
Timestamp: NullTimestamp{testTimestamp, true},
Date: NullDate{testDate, true},
Time: NullTime{ctm, true},
DateTime: NullDateTime{cdt, true},
Numeric: rat,
Geography: NullGeography{geo, true},
Record: &subNullable{X: NullInt64{4, true}},
String: NullString{"x", true},
Bytes: []byte{1, 2, 3},
Integer: NullInt64{1, true},
Float: NullFloat64{2.3, true},
Boolean: NullBool{true, true},
Timestamp: NullTimestamp{testTimestamp, true},
Date: NullDate{testDate, true},
Time: NullTime{ctm, true},
DateTime: NullDateTime{cdt, true},
Numeric: rat,
BigNumeric: rat2,
Geography: NullGeography{geo, true},
Record: &subNullable{X: NullInt64{4, true}},
},
[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, geo, []Value{int64(4)}})
[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, rat2, geo, []Value{int64(4)}})
}

func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
Expand Down Expand Up @@ -1857,6 +1859,59 @@ func TestIntegration_LegacyQuery(t *testing.T) {
}
}

func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()

autoTable := dataset.Table(tableIDs.New())
customTable := dataset.Table(tableIDs.New())

err := autoTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: AutoHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(auto): %v", err)
}
defer autoTable.Delete(ctx)

err = customTable.Create(ctx, &TableMetadata{
ExternalDataConfig: &ExternalDataConfig{
SourceFormat: Parquet,
SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
AutoDetect: true,
HivePartitioningOptions: &HivePartitioningOptions{
Mode: CustomHivePartitioningMode,
SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
RequirePartitionFilter: true,
},
},
})
if err != nil {
t.Fatalf("table.Create(custom): %v", err)
}
defer customTable.Delete(ctx)

// Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected.
sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM `%s`.%s.%s WHERE pkey=\"foo\"", customTable.ProjectID, customTable.DatasetID, customTable.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Fatalf("Error querying: %v", err)
}
checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
}

func TestIntegration_QueryParameters(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
41 changes: 23 additions & 18 deletions bigquery/params.go
Expand Up @@ -65,16 +65,17 @@ func (e invalidFieldNameError) Error() string {
var fieldCache = fields.NewCache(bqTagParser, nil, nil)

var (
int64ParamType = &bq.QueryParameterType{Type: "INT64"}
float64ParamType = &bq.QueryParameterType{Type: "FLOAT64"}
boolParamType = &bq.QueryParameterType{Type: "BOOL"}
stringParamType = &bq.QueryParameterType{Type: "STRING"}
bytesParamType = &bq.QueryParameterType{Type: "BYTES"}
dateParamType = &bq.QueryParameterType{Type: "DATE"}
timeParamType = &bq.QueryParameterType{Type: "TIME"}
dateTimeParamType = &bq.QueryParameterType{Type: "DATETIME"}
timestampParamType = &bq.QueryParameterType{Type: "TIMESTAMP"}
numericParamType = &bq.QueryParameterType{Type: "NUMERIC"}
int64ParamType = &bq.QueryParameterType{Type: "INT64"}
float64ParamType = &bq.QueryParameterType{Type: "FLOAT64"}
boolParamType = &bq.QueryParameterType{Type: "BOOL"}
stringParamType = &bq.QueryParameterType{Type: "STRING"}
bytesParamType = &bq.QueryParameterType{Type: "BYTES"}
dateParamType = &bq.QueryParameterType{Type: "DATE"}
timeParamType = &bq.QueryParameterType{Type: "TIME"}
dateTimeParamType = &bq.QueryParameterType{Type: "DATETIME"}
timestampParamType = &bq.QueryParameterType{Type: "TIMESTAMP"}
numericParamType = &bq.QueryParameterType{Type: "NUMERIC"}
bigNumericParamType = &bq.QueryParameterType{Type: "BIGNUMERIC"}
)

var (
Expand Down Expand Up @@ -233,6 +234,9 @@ func paramValue(v reflect.Value) (bq.QueryParameterValue, error) {
return res, nil

case typeOfRat:
// big.Rat types don't communicate scale or precision, so we cannot
// disambiguate between NUMERIC and BIGNUMERIC. For now, we'll continue
// to honor previous behavior and send as Numeric type.
res.Value = NumericString(v.Interface().(*big.Rat))
return res, nil
}
Expand Down Expand Up @@ -304,14 +308,15 @@ func bqToQueryParameter(q *bq.QueryParameter) (QueryParameter, error) {
}

var paramTypeToFieldType = map[string]FieldType{
int64ParamType.Type: IntegerFieldType,
float64ParamType.Type: FloatFieldType,
boolParamType.Type: BooleanFieldType,
stringParamType.Type: StringFieldType,
bytesParamType.Type: BytesFieldType,
dateParamType.Type: DateFieldType,
timeParamType.Type: TimeFieldType,
numericParamType.Type: NumericFieldType,
int64ParamType.Type: IntegerFieldType,
float64ParamType.Type: FloatFieldType,
boolParamType.Type: BooleanFieldType,
stringParamType.Type: StringFieldType,
bytesParamType.Type: BytesFieldType,
dateParamType.Type: DateFieldType,
timeParamType.Type: TimeFieldType,
numericParamType.Type: NumericFieldType,
bigNumericParamType.Type: BigNumericFieldType,
}

// Convert a parameter value from the service to a Go value. This is similar to, but
Expand Down
32 changes: 20 additions & 12 deletions bigquery/schema.go
Expand Up @@ -182,23 +182,27 @@ const (
// GeographyFieldType is a string field type. Geography types represent a set of points
// on the Earth's surface, represented in Well Known Text (WKT) format.
GeographyFieldType FieldType = "GEOGRAPHY"
// BigNumericFieldType is a numeric field type that supports values of larger precision
// and scale than the NumericFieldType.
BigNumericFieldType FieldType = "BIGNUMERIC"
)

var (
errEmptyJSONSchema = errors.New("bigquery: empty JSON schema")
fieldTypes = map[FieldType]bool{
StringFieldType: true,
BytesFieldType: true,
IntegerFieldType: true,
FloatFieldType: true,
BooleanFieldType: true,
TimestampFieldType: true,
RecordFieldType: true,
DateFieldType: true,
TimeFieldType: true,
DateTimeFieldType: true,
NumericFieldType: true,
GeographyFieldType: true,
StringFieldType: true,
BytesFieldType: true,
IntegerFieldType: true,
FloatFieldType: true,
BooleanFieldType: true,
TimestampFieldType: true,
RecordFieldType: true,
DateFieldType: true,
TimeFieldType: true,
DateTimeFieldType: true,
NumericFieldType: true,
GeographyFieldType: true,
BigNumericFieldType: true,
}
// The API will accept alias names for the types based on the Standard SQL type names.
fieldAliases = map[FieldType]FieldType{
Expand Down Expand Up @@ -346,6 +350,10 @@ func inferFieldSchema(fieldName string, rt reflect.Type, nullable bool) (*FieldS
case typeOfDateTime:
return &FieldSchema{Required: true, Type: DateTimeFieldType}, nil
case typeOfRat:
// We automatically infer big.Rat values as NUMERIC as we cannot
// determine precision/scale from the type. Users who want the
// larger precision of BIGNUMERIC need to manipulate the inferred
// schema.
return &FieldSchema{Required: !nullable, Type: NumericFieldType}, nil
}
if ft := nullableFieldType(rt); ft != "" {
Expand Down
6 changes: 4 additions & 2 deletions bigquery/schema_test.go
Expand Up @@ -1041,7 +1041,8 @@ func TestSchemaFromJSON(t *testing.T) {
{"name":"flat_date","type":"DATE","mode":"NULLABLE","description":"Flat required DATE"},
{"name":"flat_time","type":"TIME","mode":"REQUIRED","description":"Flat nullable TIME"},
{"name":"flat_datetime","type":"DATETIME","mode":"NULLABLE","description":"Flat required DATETIME"},
{"name":"flat_numeric","type":"NUMERIC","mode":"REQUIRED","description":"Flat nullable NUMERIC"},
{"name":"flat_numeric","type":"NUMERIC","mode":"REQUIRED","description":"Flat required NUMERIC"},
{"name":"flat_bignumeric","type":"BIGNUMERIC","mode":"NULLABLE","description":"Flat nullable BIGNUMERIC"},
{"name":"flat_geography","type":"GEOGRAPHY","mode":"REQUIRED","description":"Flat required GEOGRAPHY"},
{"name":"aliased_integer","type":"INT64","mode":"REQUIRED","description":"Aliased required integer"},
{"name":"aliased_boolean","type":"BOOL","mode":"NULLABLE","description":"Aliased nullable boolean"},
Expand All @@ -1058,7 +1059,8 @@ func TestSchemaFromJSON(t *testing.T) {
fieldSchema("Flat required DATE", "flat_date", "DATE", false, false, nil),
fieldSchema("Flat nullable TIME", "flat_time", "TIME", false, true, nil),
fieldSchema("Flat required DATETIME", "flat_datetime", "DATETIME", false, false, nil),
fieldSchema("Flat nullable NUMERIC", "flat_numeric", "NUMERIC", false, true, nil),
fieldSchema("Flat required NUMERIC", "flat_numeric", "NUMERIC", false, true, nil),
fieldSchema("Flat nullable BIGNUMERIC", "flat_bignumeric", "BIGNUMERIC", false, false, nil),
fieldSchema("Flat required GEOGRAPHY", "flat_geography", "GEOGRAPHY", false, true, nil),
fieldSchema("Aliased required integer", "aliased_integer", "INTEGER", false, true, nil),
fieldSchema("Aliased nullable boolean", "aliased_boolean", "BOOLEAN", false, false, nil),
Expand Down

0 comments on commit 2dc00b4

Please sign in to comment.