Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protobuf to Arrow converter #18449

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

jcipar
Copy link
Contributor

@jcipar jcipar commented May 13, 2024

Splitting up #18313 into multiple PRs. This is a self-contained PR for the Protobuf to Arrow conversion code.

I have also made a slight refactoring: there was some duplicate code between the protobuf_to_arrow_converter and proto_to_arrow_struct classes. I eliminated this by making the converter contain an instance of protobut_to_arrow_struct

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

@jcipar jcipar marked this pull request as ready for review May 13, 2024 20:09
private:
template<typename T>
void
do_add(const google::protobuf::Message* /* msg */, int /* field_idx */) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and throughout- do you want const google::protobuf::Message* const? Is there a reason to prefer pointers to references?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Protobuf API takes returns pointers for objects and takes pointers as arguments. I try to avoid raw pointers as much as possible, but I get this impression this is the Protobuf way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Google generally uses raw pointers over l-value references in function parameters. This is because it forces the caller to annotate parameters as taking a pointer (via &).

Absl will have many of the same patterns (especially in the string methods like StripPrefix and SimpleAtoi)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 don't mind me driving by!

class proto_to_arrow_struct : public proto_to_arrow_interface {
public:
explicit proto_to_arrow_struct(
const google::protobuf::Descriptor* message_descriptor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question re: const google::protobuf::Descriptor* const

for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto desc = message_descriptor->field(field_idx);
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer a switch here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer a switch here.
+1, but nit

_arrow_data_type = arrow::struct_(_fields);

// Make builder
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add child_builders.reserve(_child_arrays.size()) to avoid resizing (can also apply this to push_back() loops above). We may also want to use fragmented_vector instead of vector to avoid oversized seastar allocs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add child_builders.reserve(_child_arrays.size()) to avoid resizing (can also apply this to push_back() loops above). We may also want to use fragmented_vector instead of vector to avoid oversized seastar allocs.

+1

@@ -1,5 +1,6 @@
#pragma once

#include "datalake/errors.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this commit won't build because this header file isn't introduced until the last commit

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not a big deal at this stage, but looks like test coverage could hit a few more spots too

image

@@ -128,6 +128,7 @@ add_subdirectory(compat)
add_subdirectory(rp_util)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the first commit message:

protobuf_to_arrow_converter represents all messages as a structure

This eliminates duplicated code between protobuf_to_arrow_converter
and proto_to_arrow_struct. Each converter contains a single instance
of proto_to_arrow_struct that represents the structure of the top-level
message.

please follow the general format of commits in the tree. roughly:

subsystem: subject

description of what is being added, fixed, etc...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

@@ -0,0 +1,39 @@
#pragma once
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files are missing copyright headers.


/// Return an Arrow field descriptor for this Array. Used for building
/// A schema.
virtual std::shared_ptr<arrow::Field> field(const std::string& name) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use std::shared_ptr in redpanda. i realize that it's impossible to use the high-level arrow/parquet api without it, but it needs to be called out, addressing the short term and long term implication, why we can't work around it now, and what the plan for addressing it is later (either leaving it as-is with justification or fixing/removing/working around it).

Comment on lines +64 to +70
template<>
void do_add<arrow::Int32Type>(
const google::protobuf::Message* msg, int field_idx) {
auto desc = msg->GetDescriptor()->field(field_idx);
_arrow_status = _builder->Append(
msg->GetReflection()->GetInt32(*msg, desc));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we move all of these specializations out of the header and into a compilation unit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we move all of these specializations out of the header and into a compilation unit.

I guess we can then forward declare the google::protobuf::Message and not include the header.

private:
template<typename T>
void
do_add(const google::protobuf::Message* /* msg */, int /* field_idx */) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this? seems like it could be removed if it's unused and we are building specializations (below) for each of the supported types.

return batch;
}

model::record make_record(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are existing make_record helpers. do they need to be generalized?

return ret;
}

model::record_batch make_protobuf_batch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a protobuf batch?

converter.add_message(generate_nested_message("III", 3));
converter.add_message(generate_nested_message("IV", 4));
converter.add_message(generate_nested_message("V", 5));
EXPECT_EQ(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace generic exceptions with error codes or datalake::initializati…

it looks like this commit can be squashed into the previous commit

@@ -242,9 +242,6 @@ inline datalake::schema_info get_test_schema() {
struct protobuf_random_batches_generator {
ss::circular_buffer<model::record_batch>
operator()(std::optional<model::timestamp> base_ts = std::nullopt) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up some comments

squash this commit into the previous commit

@@ -0,0 +1,22 @@
#pragma once
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to add errors.h

fold into previous commit to fix compilation

@jcipar jcipar requested review from BenPope and a team as code owners May 16, 2024 15:12
@jcipar jcipar requested review from ivotron and removed request for a team May 16, 2024 15:12
@ivotron ivotron removed their request for review May 16, 2024 16:46
This eliminates duplicated code between protobuf_to_arrow_converter
and proto_to_arrow_struct. Each converter contains a single instance
of proto_to_arrow_struct that represents the structure of the top-level
message.

FetchContent_Declare(
arrow
GIT_REPOSITORY https://github.com/apache/arrow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an entry to licenses/third_party.md

Comment on lines +64 to +70
template<>
void do_add<arrow::Int32Type>(
const google::protobuf::Message* msg, int field_idx) {
auto desc = msg->GetDescriptor()->field(field_idx);
_arrow_status = _builder->Append(
msg->GetReflection()->GetInt32(*msg, desc));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we move all of these specializations out of the header and into a compilation unit.

I guess we can then forward declare the google::protobuf::Message and not include the header.

field_message_descriptor));
} else {
throw std::runtime_error(
std::string("Unknown type: ") + desc->cpp_type_name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ss::string where we have the choice (nearly everywhere)

In this case the exception accepts a std::string. Might be nice to use fmt::format("Unknown type: {}", desc->cpp_type_name())

Comment on lines 23 to 83
// Set up child arrays
for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto desc = message_descriptor->field(field_idx);
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::Int32Type>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT64) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::Int64Type>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_BOOL) {
_child_arrays.push_back(
std::make_unique<
proto_to_arrow_scalar<arrow::BooleanType>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_FLOAT) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::FloatType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_DOUBLE) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::DoubleType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_STRING) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::StringType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) {
auto field_message_descriptor = desc->message_type();
if (field_message_descriptor == nullptr) {
throw std::runtime_error(
std::string("Can't find schema for nested type : ")
+ desc->cpp_type_name());
}
_child_arrays.push_back(std::make_unique<proto_to_arrow_struct>(
field_message_descriptor));
} else {
throw std::runtime_error(
std::string("Unknown type: ") + desc->cpp_type_name());
}
}
// Make Arrow data types

// This could be combined into a single loop with the one above, but
// this seems more readable to me
for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto field_desc = message_descriptor->field(field_idx);
auto field_name = field_desc->name();
_fields.push_back(_child_arrays[field_idx]->field(field_name));
}
_arrow_data_type = arrow::struct_(_fields);

// Make builder
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders;
// This could also be collapsed into the above loop
for (auto& child : _child_arrays) {
child_builders.push_back(child->builder());
}
_builder = std::make_shared<arrow::StructBuilder>(
_arrow_data_type, arrow::default_memory_pool(), child_builders);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer the single loop and a covering switch:

Suggested change
// Set up child arrays
for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto desc = message_descriptor->field(field_idx);
if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT32) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::Int32Type>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_INT64) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::Int64Type>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_BOOL) {
_child_arrays.push_back(
std::make_unique<
proto_to_arrow_scalar<arrow::BooleanType>>());
} else if (desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_FLOAT) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::FloatType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_DOUBLE) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::DoubleType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_STRING) {
_child_arrays.push_back(
std::make_unique<proto_to_arrow_scalar<arrow::StringType>>());
} else if (
desc->cpp_type() == pb::FieldDescriptor::CPPTYPE_MESSAGE) {
auto field_message_descriptor = desc->message_type();
if (field_message_descriptor == nullptr) {
throw std::runtime_error(
std::string("Can't find schema for nested type : ")
+ desc->cpp_type_name());
}
_child_arrays.push_back(std::make_unique<proto_to_arrow_struct>(
field_message_descriptor));
} else {
throw std::runtime_error(
std::string("Unknown type: ") + desc->cpp_type_name());
}
}
// Make Arrow data types
// This could be combined into a single loop with the one above, but
// this seems more readable to me
for (int field_idx = 0; field_idx < message_descriptor->field_count();
field_idx++) {
auto field_desc = message_descriptor->field(field_idx);
auto field_name = field_desc->name();
_fields.push_back(_child_arrays[field_idx]->field(field_name));
}
_arrow_data_type = arrow::struct_(_fields);
// Make builder
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders;
// This could also be collapsed into the above loop
for (auto& child : _child_arrays) {
child_builders.push_back(child->builder());
}
_builder = std::make_shared<arrow::StructBuilder>(
_arrow_data_type, arrow::default_memory_pool(), child_builders);
constexpr auto make_field = [](const pb::FieldDescriptor* const desc)
-> std::unique_ptr<proto_to_arrow_interface> {
switch (desc->cpp_type()) {
case pb::FieldDescriptor::CPPTYPE_INT32: {
return std::make_unique<
proto_to_arrow_scalar<arrow::Int32Type>>();
case pb::FieldDescriptor::CPPTYPE_INT64:
return std::make_unique<
proto_to_arrow_scalar<arrow::Int64Type>>();
case pb::FieldDescriptor::CPPTYPE_BOOL:
return std::make_unique<
proto_to_arrow_scalar<arrow::BooleanType>>();
case pb::FieldDescriptor::CPPTYPE_FLOAT:
return std::make_unique<
proto_to_arrow_scalar<arrow::FloatType>>();
case pb::FieldDescriptor::CPPTYPE_DOUBLE:
return std::make_unique<
proto_to_arrow_scalar<arrow::DoubleType>>();
case pb::FieldDescriptor::CPPTYPE_STRING:
return std::make_unique<
proto_to_arrow_scalar<arrow::StringType>>();
case pb::FieldDescriptor::CPPTYPE_MESSAGE: {
if (auto msg_type = desc->message_type(); msg_type == nullptr) {
throw datalake::initialization_error(fmt::format(
"Can't find schema for nested type : {}",
desc->cpp_type_name()));
} else {
return std::make_unique<proto_to_arrow_struct>(msg_type);
}
}
case pb::FieldDescriptor::CPPTYPE_UINT32:
case pb::FieldDescriptor::CPPTYPE_UINT64:
case pb::FieldDescriptor::CPPTYPE_ENUM:
throw datalake::initialization_error(
fmt::format("Unknown type: ", desc->cpp_type_name()));
}
}
};
std::vector<std::shared_ptr<arrow::ArrayBuilder>> child_builders;
for (int field_idx = 0; field_idx < message_descriptor->field_count();
++field_idx) {
auto desc = message_descriptor->field(field_idx);
_child_arrays.push_back(make_field(desc));
auto& child = _child_arrays.back();
_fields.push_back(child->field(desc->name()));
child_builders.push_back(child->builder());
}
_builder = std::make_shared<arrow::StructBuilder>(
_arrow_data_type,
arrow::default_memory_pool(),
std::move(child_builders));

NOTE: I didn't fixup the use of std::shared_ptr

Comment on lines +126 to +128
array = std::move(builder_result).ValueUnsafe();
_values.push_back(array);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the copy?

Suggested change
array = std::move(builder_result).ValueUnsafe();
_values.push_back(array);
_values.push_back(std::move(builder_result).ValueUnsafe());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants