Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an ArrowAdapter #744

Open
danielballan opened this issue May 13, 2024 · 0 comments
Open

Add an ArrowAdapter #744

danielballan opened this issue May 13, 2024 · 0 comments
Assignees

Comments

@danielballan
Copy link
Member

We have various Adapters for tabular data formats. The existing Adapters for parquet-formatted and CSV-formatted data have different advantages. Parquet is a binary format with a rich data type system and built-in compression. CSV is a human-readable format that supports appending rows (which parquet does not).

We propose to add support for Arrow-formatted data (sometimes also called "Feather") which, like CSV is append-able, but like Parquet is binary and has a rich data type system. Like the CSV Adapter, this should implement write_partition and append_partition, with the same signatures:

tiled/tiled/adapters/csv.py

Lines 153 to 185 in 307524d

def append_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
) -> None:
"""
Parameters
----------
data :
partition :
Returns
-------
"""
uri = self._partition_paths[partition]
data.to_csv(uri, index=False, mode="a", header=False)
def write_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
) -> None:
"""
Parameters
----------
data :
partition :
Returns
-------
"""
uri = self._partition_paths[partition]
data.to_csv(uri, index=False)

We have functions in the codebase for reading and writing Arrow-formatted data because the server and the client transmit data as Arrow by default.

c['some_table'].read()  # data is sent as Arrow

These are the functions:

@serialization_registry.register(StructureFamily.table, APACHE_ARROW_FILE_MIME_TYPE)
def serialize_arrow(df, metadata, preserve_index=True):
import pyarrow
table = pyarrow.Table.from_pandas(df, preserve_index=preserve_index)
sink = pyarrow.BufferOutputStream()
with pyarrow.ipc.new_file(sink, table.schema) as writer:
writer.write_table(table)
return memoryview(sink.getvalue())
@deserialization_registry.register(StructureFamily.table, APACHE_ARROW_FILE_MIME_TYPE)
def deserialize_arrow(buffer):
import pyarrow
return pyarrow.ipc.open_file(buffer).read_pandas()

What these functions don't need is deal with partitioning (a.k.a. chunking) of rows. To implement read_partition and write_partition and append_partition we'll need to dig deeper into the Arrow IPC (inter-process communication) API to write "row batches" to an existing table, rather than write/read an entire table in a single call.

https://arrow.apache.org/docs/python/ipc.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants