Skip to content

Commit

Permalink
Merge #103589
Browse files Browse the repository at this point in the history
103589: util/parquet: support tuples r=miretskiy a=jayshrivastava

This change adds support for writing tuples. Implementation details below.

The standard way to write a tuple in parquet is to use a group:
```
message schema {                 -- toplevel schema
   optional group a (LIST) {
       optional T1 element;       -- physical column for the first field
       ...
       optional Tn element;       -- physical column for the nth field
   }
}
```

Because parquet has a very strict format, it does not write such groups
as one column with all the fields adjacent to each other. Instead, it
writes each field in the tuple as its own column. This 1:N mapping
from CRDB datum to physical column in parquet violates the assumption
used in this library that the mapping is 1:1.

This change aims to update the library to break that assumption. Firstly,
there is now a clear distiction between a "datum column" and a "physical
column". Also, the `Writer` is updated to be able to write to multiple
physical columns for a given datum, and the reader is updated
to "squash" physical columns into single tuple datums if needed. Finally,
randomized testing and benchmarking is extended to cover tuples.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed May 22, 2023
2 parents 0f2f3bc + 198a5ad commit 7ed8248
Show file tree
Hide file tree
Showing 8 changed files with 662 additions and 224 deletions.
20 changes: 7 additions & 13 deletions pkg/ccl/changefeedccl/testfeed_test.go
Expand Up @@ -1224,29 +1224,23 @@ func extractKeyFromJSONValue(isBare bool, wrapped []byte) (key []byte, value []b
func (c *cloudFeed) appendParquetTestFeedMessages(
path string, topic string, envelopeType changefeedbase.EnvelopeType,
) (err error) {
meta, datums, closeReader, err := parquet.ReadFile(path)
meta, datums, err := parquet.ReadFile(path)
if err != nil {
return err
}
defer func() {
closeErr := closeReader()
if closeErr != nil {
err = errors.CombineErrors(err, closeErr)
}
}()

primaryKeyColumnsString := meta.KeyValueMetadata().FindValue("keyCols")
if primaryKeyColumnsString == nil {
primaryKeyColumnsString, ok := meta.MetaFields["keyCols"]
if !ok {
return errors.Errorf("could not find primary key column names in parquet metadata")
}

columnsNamesString := meta.KeyValueMetadata().FindValue("allCols")
if columnsNamesString == nil {
columnsNamesString, ok := meta.MetaFields["allCols"]
if !ok {
return errors.Errorf("could not find column names in parquet metadata")
}

primaryKeys := strings.Split(*primaryKeyColumnsString, ",")
columns := strings.Split(*columnsNamesString, ",")
primaryKeys := strings.Split(primaryKeyColumnsString, ",")
columns := strings.Split(columnsNamesString, ",")

columnNameSet := make(map[string]struct{})
primaryKeyColumnSet := make(map[string]struct{})
Expand Down
1 change: 1 addition & 0 deletions pkg/util/parquet/BUILD.bazel
Expand Up @@ -53,6 +53,7 @@ go_test(
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/ipaddr",
"//pkg/util/json",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
Expand Down

0 comments on commit 7ed8248

Please sign in to comment.