Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a Arrow compression options (Only LZ4 for now) #166

Merged
merged 1 commit into from Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery_storage/__init__.py
Expand Up @@ -20,6 +20,7 @@
from google.cloud.bigquery_storage_v1 import __version__
from google.cloud.bigquery_storage_v1.types.arrow import ArrowRecordBatch
from google.cloud.bigquery_storage_v1.types.arrow import ArrowSchema
from google.cloud.bigquery_storage_v1.types.arrow import ArrowSerializationOptions
from google.cloud.bigquery_storage_v1.types.avro import AvroRows
from google.cloud.bigquery_storage_v1.types.avro import AvroSchema
from google.cloud.bigquery_storage_v1.types.storage import CreateReadSessionRequest
Expand All @@ -38,6 +39,7 @@
"types",
"ArrowRecordBatch",
"ArrowSchema",
"ArrowSerializationOptions",
"AvroRows",
"AvroSchema",
"BigQueryReadClient",
Expand Down
19 changes: 17 additions & 2 deletions google/cloud/bigquery_storage_v1/proto/arrow.proto
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC.
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

Expand Down Expand Up @@ -43,3 +42,19 @@ message ArrowRecordBatch {
// The count of rows in `serialized_record_batch`.
int64 row_count = 2;
}

// Contains options specific to Arrow Serialization.
message ArrowSerializationOptions {
// Compression codec's supported by Arrow.
enum CompressionCodec {
// If unspecified no compression will be used.
COMPRESSION_UNSPECIFIED = 0;

// LZ4 Frame (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md)
LZ4_FRAME = 1;
}

// The compression codec to use for Arrow buffers in serialized record
// batches.
CompressionCodec buffer_compression = 2;
}
3 changes: 1 addition & 2 deletions google/cloud/bigquery_storage_v1/proto/avro.proto
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC.
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

Expand Down
22 changes: 18 additions & 4 deletions google/cloud/bigquery_storage_v1/proto/storage.proto
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC.
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

Expand Down Expand Up @@ -70,7 +69,8 @@ service BigQueryRead {
post: "/v1/{read_session.table=projects/*/datasets/*/tables/*}"
body: "*"
};
option (google.api.method_signature) = "parent,read_session,max_stream_count";
option (google.api.method_signature) =
"parent,read_session,max_stream_count";
}

// Reads rows from the stream in the format prescribed by the ReadSession.
Expand Down Expand Up @@ -99,7 +99,8 @@ service BigQueryRead {
// original, primary, and residual, that original[0-j] = primary[0-j] and
// original[j-n] = residual[0-m] once the streams have been read to
// completion.
rpc SplitReadStream(SplitReadStreamRequest) returns (SplitReadStreamResponse) {
rpc SplitReadStream(SplitReadStreamRequest)
returns (SplitReadStreamResponse) {
option (google.api.http) = {
get: "/v1/{name=projects/*/locations/*/sessions/*/streams/*}"
};
Expand Down Expand Up @@ -201,6 +202,19 @@ message ReadRowsResponse {
// Throttling state. If unset, the latest response still describes
// the current throttling status.
ThrottleState throttle_state = 5;

// The schema for the read. If read_options.selected_fields is set, the
// schema may be different from the table schema as it will only contain
// the selected fields. This schema is equivelant to the one returned by
// CreateSession. This field is only populated in the first ReadRowsResponse
// RPC.
oneof schema {
// Output only. Avro schema.
AvroSchema avro_schema = 7 [(google.api.field_behavior) = OUTPUT_ONLY];

// Output only. Arrow schema.
ArrowSchema arrow_schema = 8 [(google.api.field_behavior) = OUTPUT_ONLY];
}
}

// Request message for `SplitReadStream`.
Expand Down
28 changes: 18 additions & 10 deletions google/cloud/bigquery_storage_v1/proto/stream.proto
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC.
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

Expand Down Expand Up @@ -72,17 +71,27 @@ message ReadSession {
// "nullable_field is not NULL"
// "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))"
// "numeric_field BETWEEN 1.0 AND 5.0"
//
// Restricted to a maximum length for 1 MB.
string row_restriction = 2;

// Optional. Options specific to the Apache Arrow output format.
oneof output_format_serialization_options {
ArrowSerializationOptions arrow_serialization_options = 3
[(google.api.field_behavior) = OPTIONAL];
}
}

// Output only. Unique identifier for the session, in the form
// `projects/{project_id}/locations/{location}/sessions/{session_id}`.
string name = 1 [(google.api.field_behavior) = OUTPUT_ONLY];

// Output only. Time at which the session becomes invalid. After this time, subsequent
// requests to read this Session will return errors. The expire_time is
// automatically assigned and currently cannot be specified or updated.
google.protobuf.Timestamp expire_time = 2 [(google.api.field_behavior) = OUTPUT_ONLY];
// Output only. Time at which the session becomes invalid. After this time,
// subsequent requests to read this Session will return errors. The
// expire_time is automatically assigned and currently cannot be specified or
// updated.
google.protobuf.Timestamp expire_time = 2
[(google.api.field_behavior) = OUTPUT_ONLY];

// Immutable. Data format of the output data.
DataFormat data_format = 3 [(google.api.field_behavior) = IMMUTABLE];
Expand All @@ -102,12 +111,11 @@ message ReadSession {
// `projects/{project_id}/datasets/{dataset_id}/tables/{table_id}`
string table = 6 [
(google.api.field_behavior) = IMMUTABLE,
(google.api.resource_reference) = {
type: "bigquery.googleapis.com/Table"
}
(google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" }
];

// Optional. Any modifiers which are applied when reading from the specified table.
// Optional. Any modifiers which are applied when reading from the specified
// table.
TableModifiers table_modifiers = 7 [(google.api.field_behavior) = OPTIONAL];

// Optional. Read options for this session (e.g. column selection, filters).
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery_storage_v1/types/__init__.py
Expand Up @@ -18,6 +18,7 @@
from .arrow import (
ArrowRecordBatch,
ArrowSchema,
ArrowSerializationOptions,
)
from .avro import (
AvroRows,
Expand All @@ -42,6 +43,7 @@
__all__ = (
"ArrowRecordBatch",
"ArrowSchema",
"ArrowSerializationOptions",
"AvroRows",
"AvroSchema",
"CreateReadSessionRequest",
Expand Down
19 changes: 18 additions & 1 deletion google/cloud/bigquery_storage_v1/types/arrow.py
Expand Up @@ -20,7 +20,7 @@

__protobuf__ = proto.module(
package="google.cloud.bigquery.storage.v1",
manifest={"ArrowSchema", "ArrowRecordBatch",},
manifest={"ArrowSchema", "ArrowRecordBatch", "ArrowSerializationOptions",},
)


Expand Down Expand Up @@ -55,4 +55,21 @@ class ArrowRecordBatch(proto.Message):
row_count = proto.Field(proto.INT64, number=2)


class ArrowSerializationOptions(proto.Message):
r"""Contains options specific to Arrow Serialization.

Attributes:
buffer_compression (google.cloud.bigquery_storage_v1.types.ArrowSerializationOptions.CompressionCodec):
The compression codec to use for Arrow
buffers in serialized record batches.
"""

class CompressionCodec(proto.Enum):
r"""Compression codec's supported by Arrow."""
COMPRESSION_UNSPECIFIED = 0
LZ4_FRAME = 1

buffer_compression = proto.Field(proto.ENUM, number=2, enum=CompressionCodec,)


__all__ = tuple(sorted(__protobuf__.manifest))
12 changes: 12 additions & 0 deletions google/cloud/bigquery_storage_v1/types/storage.py
Expand Up @@ -154,6 +154,10 @@ class ReadRowsResponse(proto.Message):
Throttling state. If unset, the latest
response still describes the current throttling
status.
avro_schema (google.cloud.bigquery_storage_v1.types.AvroSchema):
Output only. Avro schema.
arrow_schema (google.cloud.bigquery_storage_v1.types.ArrowSchema):
Output only. Arrow schema.
"""

avro_rows = proto.Field(
Expand All @@ -170,6 +174,14 @@ class ReadRowsResponse(proto.Message):

throttle_state = proto.Field(proto.MESSAGE, number=5, message="ThrottleState",)

avro_schema = proto.Field(
proto.MESSAGE, number=7, oneof="schema", message=avro.AvroSchema,
)

arrow_schema = proto.Field(
proto.MESSAGE, number=8, oneof="schema", message=arrow.ArrowSchema,
)


class SplitReadStreamRequest(proto.Message):
r"""Request message for ``SplitReadStream``.
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/bigquery_storage_v1/types/stream.py
Expand Up @@ -104,13 +104,24 @@ class TableReadOptions(proto.Message):
Examples: "int_field > 5" "date_field = CAST('2014-9-27' as
DATE)" "nullable_field is not NULL" "st_equals(geo_field,
st_geofromtext("POINT(2, 2)"))" "numeric_field BETWEEN 1.0
AND 5.0".
AND 5.0"

Restricted to a maximum length for 1 MB.
arrow_serialization_options (google.cloud.bigquery_storage_v1.types.ArrowSerializationOptions):

"""

selected_fields = proto.RepeatedField(proto.STRING, number=1)

row_restriction = proto.Field(proto.STRING, number=2)

arrow_serialization_options = proto.Field(
proto.MESSAGE,
number=3,
oneof="output_format_serialization_options",
message=arrow.ArrowSerializationOptions,
)

name = proto.Field(proto.STRING, number=1)

expire_time = proto.Field(proto.MESSAGE, number=2, message=timestamp.Timestamp,)
Expand Down
6 changes: 3 additions & 3 deletions synth.metadata
Expand Up @@ -4,15 +4,15 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/python-bigquery-storage.git",
"sha": "2d981460e62960f273ceb2537a70aeda8cbde5ad"
"sha": "8c1071337ea6c76a61f601b46254e6f07d8f5add"
}
},
{
"git": {
"name": "googleapis",
"remote": "https://github.com/googleapis/googleapis.git",
"sha": "149a3a84c29c9b8189576c7442ccb6dcf6a8f95b",
"internalRef": "364411656"
"sha": "c539b9b08b3366ee00c0ec1950f4df711552a269",
"internalRef": "365759522"
}
},
{
Expand Down