Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A native parquet reader for primitive types #60361

Merged
merged 15 commits into from
May 24, 2024

Conversation

copperybean
Copy link
Contributor

@copperybean copperybean commented Feb 23, 2024

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

A native parquet reader, which can read parquet binary to ClickHouse Columns directly. Now this feature can be activated by setting input_format_parquet_use_native_reader to true.

Currently, parquet file is read by arrow library, and it's read to arrow table first, and then copy the arrow table to ClickHouse Columns. There are some shortcomings in performance.

  • 1st, it's a redundant operation of copying data from arrow table to ClickHouse Columns.
  • 2nd, in arrow's implementation, repetition and definition levels are read to int16_t array first; then nullability related valid_bits_ is generated. While, the null_map and offsets (not included this time) can be generated directly when reading definition and repetition levels.
  • 3rd, for some situation, the arrow's implementation is inefficient, such as DataTypeDecimal32 type and dictionary parquet row group.

This feature is first implemented in the product BMR of Baidu AI Cloud, which has been fully tested.

Performance Test

As a result, the perforation of current implementation is speedup obviously. To generate the test data, a parquet file src.parquet of TPCDS table store_sales with scale 5000 is used, there are 35207247 rows in this file. Next, the test data is generated with following query:

select
  ss_item_sk,
  ss_wholesale_cost,
  ss_sales_price * ss_item_sk * ss_customer_sk as ss_sales_price_,
  concat('test string with ', toString(ss_item_sk % 1000)) as dict_str,
  concat('test string with ', toString(ss_item_sk * ss_cdemo_sk * ss_hdemo_sk)) as normal_str
from file('<path>/src.parquet', 'Parquet', 'ss_item_sk Nullable(Int32), ss_customer_sk Nullable(Int32), ss_cdemo_sk Nullable(Int32), ss_hdemo_sk Nullable(Int32), ss_wholesale_cost Nullable(Decimal(7, 2)), ss_sales_price Nullable(Decimal(21,6))')
into outfile '<path>/test.parquet' format Parquet

Then the performance is tested by following command

clickhouse-local --query "select max(<field-name>) from file('<path>/test.parquet', 'Parquet')" \
  --max_parsing_threads --max_threads 1 --input_format_parquet_use_native_reader <flag>

For each field, two types tests are triggered with different input_format_parquet_use_native_reader setting, and single thread is used. The parquet reading duration is counted as commit log duration while reading parquet. The CPU model used by this test is Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz.

The test result is detailed in following table

Column Current Duration Previous Duration SpeedUp
ss_item_sk Nullable(Int32) 117ms 141ms 1.2
ss_wholesale_cost Nullable(Decimal(7, 2)) 341ms 1132ms 3.3
ss_sales_price_ Nullable(Decimal(21, 6)) 720ms 1185ms 1.6
dict_str Nullable(String) 259ms 624ms 2.4
normal_str Nullable(String) 717ms 1154ms 1.6

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

Copy link

clickhouse-ci bot commented Feb 23, 2024

This is an automatic comment. The PR descriptions does not match the template.

Please, edit it accordingly.

The error is: More than one changelog category specified: 'Improvement', 'Performance Improvement'

@nikitamikhaylov nikitamikhaylov added the can be tested Allows running workflows for external contributors label Feb 23, 2024
@al13n321 al13n321 self-assigned this Feb 23, 2024
@robot-clickhouse robot-clickhouse added the pr-performance Pull request with some performance improvements label Feb 23, 2024
@robot-clickhouse
Copy link
Member

robot-clickhouse commented Feb 23, 2024

This is an automated comment for commit 3d7befe with description of existing statuses. It's updated for the latest CI running

❌ Click here to open a full report in a separate page

Check nameDescriptionStatus
A SyncThere's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS⏳ pending
CI runningA meta-check that indicates the running CI. Normally, it's in success or pending state. The failed status indicates some problems with the PR⏳ pending
Integration testsThe integration tests report. In parenthesis the package type is given, and in square brackets are the optional part/total tests❌ failure
Mergeable CheckChecks if all other necessary checks are successful❌ failure
Stateless testsRuns stateless functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc❌ failure
Successful checks
Check nameDescriptionStatus
AST fuzzerRuns randomly generated queries to catch program errors. The build type is optionally given in parenthesis. If it fails, ask a maintainer for help✅ success
ClickBenchRuns [ClickBench](https://github.com/ClickHouse/ClickBench/) with instant-attach table✅ success
ClickHouse build checkBuilds ClickHouse in various configurations for use in further steps. You have to fix the builds that fail. Build logs often has enough information to fix the error, but you might have to reproduce the failure locally. The cmake options can be found in the build log, grepping for cmake. Use these options and follow the general build process✅ success
Compatibility checkChecks that clickhouse binary runs on distributions with old libc versions. If it fails, ask a maintainer for help✅ success
Docker keeper imageThe check to build and optionally push the mentioned image to docker hub✅ success
Docker server imageThe check to build and optionally push the mentioned image to docker hub✅ success
Docs checkBuilds and tests the documentation✅ success
Fast testNormally this is the first check that is ran for a PR. It builds ClickHouse and runs most of stateless functional tests, omitting some. If it fails, further checks are not started until it is fixed. Look at the report to see which tests fail, then reproduce the failure locally as described here✅ success
Flaky testsChecks if new added or modified tests are flaky by running them repeatedly, in parallel, with more randomization. Functional tests are run 100 times with address sanitizer, and additional randomization of thread scheduling. Integrational tests are run up to 10 times. If at least once a new test has failed, or was too long, this check will be red. We don't allow flaky tests, read the doc✅ success
Install packagesChecks that the built packages are installable in a clear environment✅ success
PR CheckThere's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
Performance ComparisonMeasure changes in query performance. The performance test report is described in detail here. In square brackets are the optional part/total tests✅ success
Stateful testsRuns stateful functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc✅ success
Stress testRuns stateless functional tests concurrently from several clients to detect concurrency-related errors✅ success
Style checkRuns a set of checks to keep the code style clean. If some of tests failed, see the related log from the report✅ success
Unit testsRuns the unit tests for different release types✅ success
Upgrade checkRuns stress tests on server version from last release and then tries to upgrade it to the version from the PR. It checks if the new server can successfully startup without any errors, crashes or sanitizer asserts✅ success

@copperybean
Copy link
Contributor Author

copperybean commented Feb 29, 2024

Well, I think these break tests are not caused by my commit. UnitTestsAsan and ASTFuzzerTestAsan have succeeded in former test. While the integration test is failed because of logical error.

Hope for further suggestion.

@alexey-milovidov
Copy link
Member

Yes, we need to start the investigation, and then check and fix every failure one by one.

@copperybean
Copy link
Contributor Author

copperybean commented Mar 13, 2024

Yes, we need to start the investigation, and then check and fix every failure one by one.

@alexey-milovidov
About the investigation, do you refer to the investigation about this PR, or just about the failure tests?
And, should I rebase to master and trigger the checks again?

@alexey-milovidov
Copy link
Member

@copperybean, please resolve conflicts.

Copy link
Member

@al13n321 al13n321 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for working on this!

Lots of review comments, mostly unimportant. The only blockers are:

  • Seems that TIMESTAMP types will be incorrectly interpreted as seconds, please test.
  • The block_missing_values_ptr thing, please test. If it doesn't work, at least add a TODO so we don't forget.
  • Pass file metadata to ParquetFileReader::Open to avoid re-reading it for each row group.

tests/queries/0_stateless/02998_native_parquet_reader.sh Outdated Show resolved Hide resolved
Comment on lines 644 to 647
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this comment says, block_missing_values_ptr is used for applying default expressions to omitted fields (here "omitted" means NULLs, I think).

Native reader doesn't populate block_missing_values_ptr, so it'll probably break default expressions. Please test this:

create table a (x Int64, y String default 'asd') engine Memory
insert into function file('t.parquet') select 1 as x, null::Nullable(String) as y
insert into a select * from file('t.parquet')
select * from a

   ┌─x─┬─y───┐
1. │ 1 │ asd │
   └───┴─────┘

If it doesn't work, either populate block_missing_values_ptr based on null masks or at least add a TODO comment about that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this feature. I would like to add a TODO comment, because the block-missing-values logic will be modified again when supporting nested type, so the it can be implemented later for all types.

src/Processors/Formats/Impl/Parquet/ParquetDataBuffer.h Outdated Show resolved Hide resolved
Comment on lines +63 to +64
IndividualVisitor && individual_visitor,
RepeatedVisitor && repeated_visitor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler way to handle nulls is to read all non-null values into a column, then use IColumn::expand to insert default values according to null mask. A little slower because of copying.

I don't mind the current approach, just mentioning the alternative in case it turns out better for the general case of arbitrarily nested arrays, nullables, and tuples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'd prefer to keep current implementation currently, may be I will change the logic as your suggestion when supporting nested columns.

Comment on lines 294 to 330
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *)
{
const TColVec & col_src = *static_cast<const TColVec *>(col_existing.get());
reserveColumnStrRows(column, reading_rows_num);

col_dest.getOffsets().resize(col_src.size());
for (size_t i = 0; i < col_src.size(); i++)
{
auto src_idx = col_src.getData()[i];
if (0 == src_idx)
{
null_map->setNull(i);
}
auto dict_chars_cursor = col_dict_str.getOffsets()[src_idx - 1];
auto str_len = col_dict_str.getOffsets()[src_idx] - dict_chars_cursor;
auto dst_chars_cursor = col_dest.getChars().size();
col_dest.getChars().resize(dst_chars_cursor + str_len);

memcpySmallAllowReadWriteOverflow15(
&col_dest.getChars()[dst_chars_cursor], &col_dict_str.getChars()[dict_chars_cursor], str_len);
col_dest.getOffsets()[i] = col_dest.getChars().size();
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easier to create a ColumnLowCardinality and call convertToFullColumn() on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it will be easier. But the current implementation is more efficient, even not very much. Because the size of new Column generated by convertToFullColumn is smaller than reading_rows_num, so it should be resized to reading_rows_num again.

By the way, I will implement the logic as your suggestion next time. For current codes, I'd prefer not to change it right now. Instead, I can add some comments.

@copperybean
Copy link
Contributor Author

Lots of review comments, mostly unimportant. The only blockers are:

Thanks for the comments, I will fix them in this weekend.

Change-Id: I83a8ec8271edefcd96cb5b3bcd12f6b545d9dec0
Change-Id: I57f025b17a04e2c5dded3f18e7f477841287a2c2
Change-Id: I38b8368b022263d9a71cb3f3e9fdad5d6ca26753
Change-Id: If79741b7456667a8dde3e355d9dc684c2dd84f4f
Change-Id: I53ade40ba24a742a21f9e09dbab7fff90b032b4b
Change-Id: I8f7ebd173558b16d94d3161cb0b5300e7e78833d
Change-Id: Ia7dbf1d762f7f054a9aa677caaaff6bfe1a42c38
…18 tidy warnings

Change-Id: I3119c44dc764caed0dc471f52ac5e634c75c7b50
@al13n321
Copy link
Member

:) insert into function file('u2.parquet') select number as a from numbers(100000) settings output_format_parquet_use_custom_encoder=0

INSERT INTO FUNCTION file('u2.parquet')
SETTINGS output_format_parquet_use_custom_encoder = 0
SELECT number AS a
FROM numbers(100000)
SETTINGS output_format_parquet_use_custom_encoder = 0

Query id: 67ad48c3-7952-4bb4-b1e5-9fe2959be6ad

Ok.

0 rows in set. Elapsed: 0.089 sec. Processed 100.00 thousand rows, 800.00 KB (1.12 million rows/s., 8.97 MB/s.)
Peak memory usage: 10.07 KiB.

:) select * from file('u2.parquet') limit 10 settings input_format_parquet_use_native_reader=1

SELECT *
FROM file('u2.parquet')
LIMIT 10
SETTINGS input_format_parquet_use_native_reader = 1

Query id: 4a17db55-e81c-418d-8b06-5c8f19d330dc


Elapsed: 0.020 sec. 

Received exception from server (version 24.5.1):
Code: 722. DB::Exception: Received from localhost:9000. DB::Exception: Unsupported logical type: Int(bitWidth=64, isSigned=false) and physical type: INT64 for field ==a==, bit width: 64: (in file/uri /home/ubuntu/cluster/n1/store/user_files/u2.parquet): While executing ParquetBlockInputFormat: While executing File. ()

@al13n321
Copy link
Member

Confusing error message when trying to read an array:

:) insert into function file('v.parquet') select [number] as a from numbers(100000) settings output_format_parquet_use_custom_encoder=0

INSERT INTO FUNCTION file('v.parquet')
SETTINGS output_format_parquet_use_custom_encoder = 0
SELECT [number] AS a
FROM numbers(100000)
SETTINGS output_format_parquet_use_custom_encoder = 0

Query id: 1016edab-8db2-41b6-b273-eb77ec069125

Ok.

0 rows in set. Elapsed: 0.184 sec. Processed 100.00 thousand rows, 800.00 KB (543.01 thousand rows/s., 4.34 MB/s.)
Peak memory usage: 12.12 KiB.

 :) select * from file('v.parquet') limit 10 settings input_format_parquet_use_native_reader=1

SELECT *
FROM file('v.parquet')
LIMIT 10
SETTINGS input_format_parquet_use_native_reader = 1

Query id: cca172ee-98d6-42e8-ac9a-4bc837e055ed


Elapsed: 0.019 sec. 

Received exception from server (version 24.5.1):
Code: 722. DB::Exception: Received from localhost:9000. DB::Exception: can not find column with name: a: (in file/uri /home/ubuntu/cluster/n1/store/user_files/v.parquet): While executing ParquetBlockInputFormat: While executing File. ()

@al13n321
Copy link
Member

Fix for the error message:

--- a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp
+++ b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp
@@ -312,16 +312,28 @@ ParquetRecordReader::ParquetRecordReader(
 {
     log = &Poco::Logger::get("ParquetRecordReader");
 
+    std::unordered_map<String, parquet::schema::NodePtr> parquet_columns;
+    auto root = file_reader->metadata()->schema()->group_node();
+    for (int i = 0; i < root->field_count(); ++i)
+    {
+        auto & node = root->field(i);
+        parquet_columns.emplace(node->name(), node);
+    }
+
     parquet_col_indice.reserve(header.columns());
     column_readers.reserve(header.columns());
     for (const auto & col_with_name : header)
     {
-        auto idx = file_reader->metadata()->schema()->ColumnIndex(col_with_name.name);
-        if (idx < 0)
-        {
-            auto msg = PreformattedMessage::create("can not find column with name: {}", col_with_name.name);
-            throw Exception(std::move(msg), ErrorCodes::PARQUET_EXCEPTION);
-        }
+        auto it = parquet_columns.find(col_with_name.name);
+        if (it == parquet_columns.end())
+            throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
+
+        auto node = it->second;
+        if (!node->is_primitive())
+            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
+
+        auto idx = file_reader->metadata()->schema()->ColumnIndex(*node);
+        chassert(idx >= 0);
         parquet_col_indice.push_back(idx);
     }
     if (reader_properties.pre_buffer())

case parquet::Type::BYTE_ARRAY:
return fromByteArray();
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
return fromFLBA();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the Bool and FLBA (fixed string) types are not currently supported, Are there any plans to add support for these types in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bool types are distinct due to their 1-bit width, while clickhouse uses a UInt8 column vector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current framework, supporting these types is not difficult. My plan is to support nested type after this PR, then support all parquet types and enable 02735_parquet_encoder test.

Change-Id: Iefec91bca223eedaabe302b7891808c6d94eed9d
Change-Id: Iff9f5f894e815b184ac35f61b4cac87908c612b5
@copperybean
Copy link
Contributor Author

It seems that the ClickHouse special build check is failed because of OOM?

@al13n321
Copy link
Member

It's broken in master, waiting for #64204 (and maybe something else, that one only fixes one of the two OOMing builds).

@al13n321 al13n321 added this pull request to the merge queue May 24, 2024
github-merge-queue bot pushed a commit that referenced this pull request May 24, 2024
A native parquet reader for primitive types
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to no response for status checks May 24, 2024
@al13n321 al13n321 added this pull request to the merge queue May 24, 2024
Merged via the queue into ClickHouse:master with commit ee3e7f2 May 24, 2024
237 of 242 checks passed
@robot-clickhouse robot-clickhouse added the pr-synced-to-cloud The PR is synced to the cloud repo label May 24, 2024
@@ -93,6 +93,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go to 24.6

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhanglistar
Copy link
Contributor

zhanglistar commented Jun 4, 2024

What a great feature! @copperybean Will you support complex types? @al13n321 Do you have plan to support complex types to make native parquet reader production ready?

@baibaichen
Copy link
Contributor

@copperybean it's cool feature

Do you have plans to support page index? In this case,we need to skip reading some rows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
can be tested Allows running workflows for external contributors pr-performance Pull request with some performance improvements pr-synced-to-cloud The PR is synced to the cloud repo submodule changed At least one submodule changed in this PR.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants