-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[native] Add support for bucketed (but not partitioned) tables #22737
Open
aditi-pandit
wants to merge
1
commit into
master
Choose a base branch
from
bucketed_tables
base: master
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+39
−8
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aditi-pandit
force-pushed
the
bucketed_tables
branch
from
May 14, 2024 06:06
afd5eaf
to
532b21d
Compare
aditi-pandit
changed the title
[native] Add support for bucketed (but not partitioned) tables
[Do not review][native] Add support for bucketed (but not partitioned) tables
May 14, 2024
This was referenced May 14, 2024
aditi-pandit
force-pushed
the
bucketed_tables
branch
from
May 17, 2024 18:58
532b21d
to
b35819b
Compare
facebook-github-bot
pushed a commit
to facebookincubator/velox
that referenced
this pull request
May 26, 2024
Summary: The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto. Presto behavior (for bucketed but not partitioned): - Supports CTAS into bucketed (but not partitioned tables) - Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones). The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause). Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630 ### Background #### TableWriter and TableFinish Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store. It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately. ``` EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem; ``` Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver. ``` - Output[PlanNodeId 7] - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint] - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary] - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary] orderkey := orderkey (1:194) partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244) - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] > - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha> - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project> expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262) ``` The above command creates 10 files as follows. 10 is the bucket count. ``` Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd ${DATA_DIR}/hive_data/tpch/lineitem_bucketed Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls 000000_0_20240507_221727_00018_73r2r 000003_0_20240507_221727_00018_73r2r 000006_0_20240507_221727_00018_73r2r 000009_0_20240507_221727_00018_73r2r 000001_0_20240507_221727_00018_73r2r 000004_0_20240507_221727_00018_73r2r 000007_0_20240507_221727_00018_73r2r 000002_0_20240507_221727_00018_73r2r 000005_0_20240507_221727_00018_73r2r 000008_0_20240507_221727_00018_73r2r ``` #### TableWriter output The TableWriter output contains three columns per fragment (one for each individual target file). This format is being presented for completeness. **There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.** | TableWriter output row | |--------| | ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> | | Rows | | Fragments | | CommitContext | |--------|--------|--------|--------|--------| | N (numPartitionUpdates) | | NULL | | TaskCommitContext | | NULL | | PartitionUpdate0 | | | | NULL | | PartitionUpdate1 | | | | NULL | | ... | | | | NULL | | PartitionUpdateN | | | The fragments column is JSON strings of PartitionUpdate as in the following format ``` { "Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604", "updateMode": "NEW", "writePath": "", "targetPath": "", "fileWriteInfos": [ { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 }, { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ] "rowCount": 3950431150, "inMemoryDataSizeInBytes": 4992001194927, "onDiskDataSizeInBytes": 1374893372141, "containsNumberedFileNames": false } ``` The commitcontext column is a constant vector of TaskCommitContext in JSON string ``` { "lifespan": "TaskWide", "taskId": "20220822_190126_00000_78c2f.1.0.0", "pageSinkCommitStrategy": "TASK_COMMIT", "lastPage": false } ``` #### Empty buckets The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it. If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794 ### Design As outlined above all table writing happens in the TableWriter operator. The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it. The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that. ******************************************** Note: The Prestissimo changes are in prestodb/presto#22737 Pull Request resolved: #9740 Reviewed By: kewang1024 Differential Revision: D57748876 Pulled By: xiaoxmeng fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
aditi-pandit
force-pushed
the
bucketed_tables
branch
4 times, most recently
from
May 28, 2024 23:51
b72c257
to
908a0ab
Compare
aditi-pandit
changed the title
[Do not review][native] Add support for bucketed (but not partitioned) tables
[native] Add support for bucketed (but not partitioned) tables
May 28, 2024
aditi-pandit
force-pushed
the
bucketed_tables
branch
from
May 30, 2024 20:26
908a0ab
to
3d092f2
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Presto supports CTAS into bucketed (but not partitioned tables). The user cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).
Prestissimo didn't support these DML.
Motivation and Context
The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
#22630
The main fixes are on the Velox side facebookincubator/velox#9740. The Prestissimo impact is mainly around removing a validation in the code during conversion of the table handles for bucketed (but not partitioned) tables.
Test Plan
e2e tests
Release Notes
Please follow release notes guidelines and fill in the release notes below.