-
Notifications
You must be signed in to change notification settings - Fork 549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protobuf to Arrow converter #18449
base: dev
Are you sure you want to change the base?
Protobuf to Arrow converter #18449
Conversation
private: | ||
template<typename T> | ||
void | ||
do_add(const google::protobuf::Message* /* msg */, int /* field_idx */) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and throughout- do you want const google::protobuf::Message* const
? Is there a reason to prefer pointers to references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Protobuf API takes returns pointers for objects and takes pointers as arguments. I try to avoid raw pointers as much as possible, but I get this impression this is the Protobuf way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Google generally uses raw pointers over l-value references in function parameters. This is because it forces the caller to annotate parameters as taking a pointer (via &).
Absl will have many of the same patterns (especially in the string methods like StripPrefix and SimpleAtoi)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: #18245 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋 don't mind me driving by!
class proto_to_arrow_struct : public proto_to_arrow_interface { | ||
public: | ||
explicit proto_to_arrow_struct( | ||
const google::protobuf::Descriptor* message_descriptor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question re: const google::protobuf::Descriptor* const
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | ||
field_idx++) { | ||
auto desc = message_descriptor->field(field_idx); | ||
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer a switch
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer a switch here.
+1, but nit
_arrow_data_type = arrow::struct_(_fields); | ||
|
||
// Make builder | ||
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add child_builders.reserve(_child_arrays.size())
to avoid resizing (can also apply this to push_back()
loops above). We may also want to use fragmented_vector
instead of vector
to avoid oversized seastar allocs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add child_builders.reserve(_child_arrays.size()) to avoid resizing (can also apply this to push_back() loops above). We may also want to use fragmented_vector instead of vector to avoid oversized seastar allocs.
+1
@@ -1,5 +1,6 @@ | |||
#pragma once | |||
|
|||
#include "datalake/errors.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this commit won't build because this header file isn't introduced until the last commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -128,6 +128,7 @@ add_subdirectory(compat) | |||
add_subdirectory(rp_util) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the first commit message:
protobuf_to_arrow_converter represents all messages as a structure
This eliminates duplicated code between protobuf_to_arrow_converter
and proto_to_arrow_struct. Each converter contains a single instance
of proto_to_arrow_struct that represents the structure of the top-level
message.
please follow the general format of commits in the tree. roughly:
subsystem: subject
description of what is being added, fixed, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
@@ -0,0 +1,39 @@ | |||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
files are missing copyright headers.
|
||
/// Return an Arrow field descriptor for this Array. Used for building | ||
/// A schema. | ||
virtual std::shared_ptr<arrow::Field> field(const std::string& name) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't use std::shared_ptr in redpanda. i realize that it's impossible to use the high-level arrow/parquet api without it, but it needs to be called out, addressing the short term and long term implication, why we can't work around it now, and what the plan for addressing it is later (either leaving it as-is with justification or fixing/removing/working around it).
template<> | ||
void do_add<arrow::Int32Type>( | ||
const google::protobuf::Message* msg, int field_idx) { | ||
auto desc = msg->GetDescriptor()->field(field_idx); | ||
_arrow_status = _builder->Append( | ||
msg->GetReflection()->GetInt32(*msg, desc)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we move all of these specializations out of the header and into a compilation unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we move all of these specializations out of the header and into a compilation unit.
I guess we can then forward declare the google::protobuf::Message
and not include the header.
private: | ||
template<typename T> | ||
void | ||
do_add(const google::protobuf::Message* /* msg */, int /* field_idx */) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this? seems like it could be removed if it's unused and we are building specializations (below) for each of the supported types.
return batch; | ||
} | ||
|
||
model::record make_record( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are existing make_record helpers. do they need to be generalized?
return ret; | ||
} | ||
|
||
model::record_batch make_protobuf_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is a protobuf batch?
converter.add_message(generate_nested_message("III", 3)); | ||
converter.add_message(generate_nested_message("IV", 4)); | ||
converter.add_message(generate_nested_message("V", 5)); | ||
EXPECT_EQ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace generic exceptions with error codes or datalake::initializati…
it looks like this commit can be squashed into the previous commit
@@ -242,9 +242,6 @@ inline datalake::schema_info get_test_schema() { | |||
struct protobuf_random_batches_generator { | |||
ss::circular_buffer<model::record_batch> | |||
operator()(std::optional<model::timestamp> base_ts = std::nullopt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up some comments
squash this commit into the previous commit
@@ -0,0 +1,22 @@ | |||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to add errors.h
fold into previous commit to fix compilation
This eliminates duplicated code between protobuf_to_arrow_converter and proto_to_arrow_struct. Each converter contains a single instance of proto_to_arrow_struct that represents the structure of the top-level message.
f0b9baa
to
c732be6
Compare
|
||
FetchContent_Declare( | ||
arrow | ||
GIT_REPOSITORY https://github.com/apache/arrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an entry to licenses/third_party.md
template<> | ||
void do_add<arrow::Int32Type>( | ||
const google::protobuf::Message* msg, int field_idx) { | ||
auto desc = msg->GetDescriptor()->field(field_idx); | ||
_arrow_status = _builder->Append( | ||
msg->GetReflection()->GetInt32(*msg, desc)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we move all of these specializations out of the header and into a compilation unit.
I guess we can then forward declare the google::protobuf::Message
and not include the header.
field_message_descriptor)); | ||
} else { | ||
throw std::runtime_error( | ||
std::string("Unknown type: ") + desc->cpp_type_name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ss::string where we have the choice (nearly everywhere)
In this case the exception accepts a std::string
. Might be nice to use fmt::format("Unknown type: {}", desc->cpp_type_name())
// Set up child arrays | ||
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | ||
field_idx++) { | ||
auto desc = message_descriptor->field(field_idx); | ||
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) { | ||
_child_arrays.push_back( | ||
std::make_unique<proto_to_arrow_scalar<arrow::Int32Type>>()); | ||
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT64) { | ||
_child_arrays.push_back( | ||
std::make_unique<proto_to_arrow_scalar<arrow::Int64Type>>()); | ||
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_BOOL) { | ||
_child_arrays.push_back( | ||
std::make_unique< | ||
proto_to_arrow_scalar<arrow::BooleanType>>()); | ||
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_FLOAT) { | ||
_child_arrays.push_back( | ||
std::make_unique<proto_to_arrow_scalar<arrow::FloatType>>()); | ||
} else if ( | ||
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_DOUBLE) { | ||
_child_arrays.push_back( | ||
std::make_unique<proto_to_arrow_scalar<arrow::DoubleType>>()); | ||
} else if ( | ||
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_STRING) { | ||
_child_arrays.push_back( | ||
std::make_unique<proto_to_arrow_scalar<arrow::StringType>>()); | ||
} else if ( | ||
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) { | ||
auto field_message_descriptor = desc->message_type(); | ||
if (field_message_descriptor == nullptr) { | ||
throw std::runtime_error( | ||
std::string("Can't find schema for nested type : ") | ||
+ desc->cpp_type_name()); | ||
} | ||
_child_arrays.push_back(std::make_unique<proto_to_arrow_struct>( | ||
field_message_descriptor)); | ||
} else { | ||
throw std::runtime_error( | ||
std::string("Unknown type: ") + desc->cpp_type_name()); | ||
} | ||
} | ||
// Make Arrow data types | ||
|
||
// This could be combined into a single loop with the one above, but | ||
// this seems more readable to me | ||
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | ||
field_idx++) { | ||
auto field_desc = message_descriptor->field(field_idx); | ||
auto field_name = field_desc->name(); | ||
_fields.push_back(_child_arrays[field_idx]->field(field_name)); | ||
} | ||
_arrow_data_type = arrow::struct_(_fields); | ||
|
||
// Make builder | ||
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders; | ||
// This could also be collapsed into the above loop | ||
for (auto& child : _child_arrays) { | ||
child_builders.push_back(child->builder()); | ||
} | ||
_builder = std::make_shared<arrow::StructBuilder>( | ||
_arrow_data_type, arrow::default_memory_pool(), child_builders); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I prefer the single loop and a covering switch:
// Set up child arrays | |
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | |
field_idx++) { | |
auto desc = message_descriptor->field(field_idx); | |
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) { | |
_child_arrays.push_back( | |
std::make_unique<proto_to_arrow_scalar<arrow::Int32Type>>()); | |
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT64) { | |
_child_arrays.push_back( | |
std::make_unique<proto_to_arrow_scalar<arrow::Int64Type>>()); | |
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_BOOL) { | |
_child_arrays.push_back( | |
std::make_unique< | |
proto_to_arrow_scalar<arrow::BooleanType>>()); | |
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_FLOAT) { | |
_child_arrays.push_back( | |
std::make_unique<proto_to_arrow_scalar<arrow::FloatType>>()); | |
} else if ( | |
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_DOUBLE) { | |
_child_arrays.push_back( | |
std::make_unique<proto_to_arrow_scalar<arrow::DoubleType>>()); | |
} else if ( | |
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_STRING) { | |
_child_arrays.push_back( | |
std::make_unique<proto_to_arrow_scalar<arrow::StringType>>()); | |
} else if ( | |
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) { | |
auto field_message_descriptor = desc->message_type(); | |
if (field_message_descriptor == nullptr) { | |
throw std::runtime_error( | |
std::string("Can't find schema for nested type : ") | |
+ desc->cpp_type_name()); | |
} | |
_child_arrays.push_back(std::make_unique<proto_to_arrow_struct>( | |
field_message_descriptor)); | |
} else { | |
throw std::runtime_error( | |
std::string("Unknown type: ") + desc->cpp_type_name()); | |
} | |
} | |
// Make Arrow data types | |
// This could be combined into a single loop with the one above, but | |
// this seems more readable to me | |
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | |
field_idx++) { | |
auto field_desc = message_descriptor->field(field_idx); | |
auto field_name = field_desc->name(); | |
_fields.push_back(_child_arrays[field_idx]->field(field_name)); | |
} | |
_arrow_data_type = arrow::struct_(_fields); | |
// Make builder | |
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders; | |
// This could also be collapsed into the above loop | |
for (auto& child : _child_arrays) { | |
child_builders.push_back(child->builder()); | |
} | |
_builder = std::make_shared<arrow::StructBuilder>( | |
_arrow_data_type, arrow::default_memory_pool(), child_builders); | |
constexpr auto make_field = [](const pb::FieldDescriptor* const desc) | |
-> std::unique_ptr<proto_to_arrow_interface> { | |
switch (desc->cpp_type()) { | |
case pb::FieldDescriptor::CPPTYPE_INT32: { | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::Int32Type>>(); | |
case pb::FieldDescriptor::CPPTYPE_INT64: | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::Int64Type>>(); | |
case pb::FieldDescriptor::CPPTYPE_BOOL: | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::BooleanType>>(); | |
case pb::FieldDescriptor::CPPTYPE_FLOAT: | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::FloatType>>(); | |
case pb::FieldDescriptor::CPPTYPE_DOUBLE: | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::DoubleType>>(); | |
case pb::FieldDescriptor::CPPTYPE_STRING: | |
return std::make_unique< | |
proto_to_arrow_scalar<arrow::StringType>>(); | |
case pb::FieldDescriptor::CPPTYPE_MESSAGE: { | |
if (auto msg_type = desc->message_type(); msg_type == nullptr) { | |
throw datalake::initialization_error(fmt::format( | |
"Can't find schema for nested type : {}", | |
desc->cpp_type_name())); | |
} else { | |
return std::make_unique<proto_to_arrow_struct>(msg_type); | |
} | |
} | |
case pb::FieldDescriptor::CPPTYPE_UINT32: | |
case pb::FieldDescriptor::CPPTYPE_UINT64: | |
case pb::FieldDescriptor::CPPTYPE_ENUM: | |
throw datalake::initialization_error( | |
fmt::format("Unknown type: ", desc->cpp_type_name())); | |
} | |
} | |
}; | |
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders; | |
for (int field_idx = 0; field_idx < message_descriptor->field_count(); | |
++field_idx) { | |
auto desc = message_descriptor->field(field_idx); | |
_child_arrays.push_back(make_field(desc)); | |
auto& child = _child_arrays.back(); | |
_fields.push_back(child->field(desc->name())); | |
child_builders.push_back(child->builder()); | |
} | |
_builder = std::make_shared<arrow::StructBuilder>( | |
_arrow_data_type, | |
arrow::default_memory_pool(), | |
std::move(child_builders)); |
NOTE: I didn't fixup the use of std::shared_ptr
array = std::move(builder_result).ValueUnsafe(); | ||
_values.push_back(array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the copy?
array = std::move(builder_result).ValueUnsafe(); | |
_values.push_back(array); | |
_values.push_back(std::move(builder_result).ValueUnsafe()); |
Splitting up #18313 into multiple PRs. This is a self-contained PR for the Protobuf to Arrow conversion code.
I have also made a slight refactoring: there was some duplicate code between the protobuf_to_arrow_converter and proto_to_arrow_struct classes. I eliminated this by making the converter contain an instance of protobut_to_arrow_struct
Backports Required
Release Notes