Skip to content

Commit

Permalink
feat: support conversion from pyarrow RecordBatch to pandas DataFrame (
Browse files Browse the repository at this point in the history
…#39)

* feat: support conversion from pyarrow RecordBatch to pandas DataFrame

* hack together working implementation

TODO: add tests for constructing pandas Series with pyarrow scalars

* fix unit test coverage, optimize arrow to numpy conversion

* apply same optimizations to to_arrow conversion

* remove redundant to_numpy now that to_arrow doesn't use it

* be explicit about chunked array vs array

* add docstrings to arrow conversion functions

* add test case for round-trip to/from pyarrow nanosecond-precision time scalars

* add time32("ms") test case without nulls for completeness
  • Loading branch information
tswast committed Nov 8, 2021
1 parent a31d55d commit facc7b0
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 146 deletions.
109 changes: 93 additions & 16 deletions db_dtypes/__init__.py
Expand Up @@ -17,6 +17,7 @@

import datetime
import re
from typing import Union

import numpy
import packaging.version
Expand All @@ -29,13 +30,16 @@
import pandas.core.dtypes.generic
import pandas.core.nanops
import pyarrow
import pyarrow.compute

from db_dtypes.version import __version__
from db_dtypes import core


date_dtype_name = "dbdate"
time_dtype_name = "dbtime"
_EPOCH = datetime.datetime(1970, 1, 1)
_NPEPOCH = numpy.datetime64(_EPOCH)

pandas_release = packaging.version.parse(pandas.__version__).release

Expand All @@ -52,6 +56,33 @@ class TimeDtype(core.BaseDatetimeDtype):
def construct_array_type(self):
return TimeArray

@staticmethod
def __from_arrow__(
array: Union[pyarrow.Array, pyarrow.ChunkedArray]
) -> "TimeArray":
"""Convert to dbtime data from an Arrow array.
See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
# We can't call combine_chunks on an empty array, so short-circuit the
# rest of the function logic for this special case.
if len(array) == 0:
return TimeArray(numpy.array([], dtype="datetime64[ns]"))

# We can't cast to timestamp("ns"), but time64("ns") has the same
# memory layout: 64-bit integers representing the number of nanoseconds
# since the datetime epoch (midnight 1970-01-01).
array = pyarrow.compute.cast(array, pyarrow.time64("ns"))

# ChunkedArray has no "view" method, so combine into an Array.
if isinstance(array, pyarrow.ChunkedArray):
array = array.combine_chunks()

array = array.view(pyarrow.timestamp("ns"))
np_array = array.to_numpy(zero_copy_only=False)
return TimeArray(np_array)


class TimeArray(core.BaseDatetimeArray):
"""
Expand All @@ -61,8 +92,6 @@ class TimeArray(core.BaseDatetimeArray):
# Data are stored as datetime64 values with a date of Jan 1, 1970

dtype = TimeDtype()
_epoch = datetime.datetime(1970, 1, 1)
_npepoch = numpy.datetime64(_epoch)

@classmethod
def _datetime(
Expand All @@ -75,8 +104,21 @@ def _datetime(
r"(?:\.(?P<fraction>\d*))?)?)?\s*$"
).match,
):
if isinstance(scalar, datetime.time):
return datetime.datetime.combine(cls._epoch, scalar)
# Convert pyarrow values to datetime.time.
if isinstance(scalar, (pyarrow.Time32Scalar, pyarrow.Time64Scalar)):
scalar = (
scalar.cast(pyarrow.time64("ns"))
.cast(pyarrow.int64())
.cast(pyarrow.timestamp("ns"))
.as_py()
)

if scalar is None:
return None
elif isinstance(scalar, datetime.time):
return datetime.datetime.combine(_EPOCH, scalar)
elif isinstance(scalar, pandas.Timestamp):
return scalar.to_datetime64()
elif isinstance(scalar, str):
# iso string
parsed = match_fn(scalar)
Expand Down Expand Up @@ -113,7 +155,7 @@ def _box_func(self, x):
__return_deltas = {"timedelta", "timedelta64", "timedelta64[ns]", "<m8", "<m8[ns]"}

def astype(self, dtype, copy=True):
deltas = self._ndarray - self._npepoch
deltas = self._ndarray - _NPEPOCH
stype = str(dtype)
if stype in self.__return_deltas:
return deltas
Expand All @@ -122,15 +164,25 @@ def astype(self, dtype, copy=True):
else:
return super().astype(dtype, copy=copy)

if pandas_release < (1,):
def __arrow_array__(self, type=None):
"""Convert to an Arrow array from dbtime data.
def to_numpy(self, dtype="object"):
return self.astype(dtype)
See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.array(self._ndarray, type=pyarrow.timestamp("ns"))

def __arrow_array__(self, type=None):
return pyarrow.array(
self.to_numpy(dtype="object"),
type=type if type is not None else pyarrow.time64("ns"),
# ChunkedArray has no "view" method, so combine into an Array.
array = (
array.combine_chunks() if isinstance(array, pyarrow.ChunkedArray) else array
)

# We can't cast to time64("ns"), but timestamp("ns") has the same
# memory layout: 64-bit integers representing the number of nanoseconds
# since the datetime epoch (midnight 1970-01-01).
array = array.view(pyarrow.time64("ns"))
return pyarrow.compute.cast(
array, type if type is not None else pyarrow.time64("ns"),
)


Expand All @@ -146,6 +198,19 @@ class DateDtype(core.BaseDatetimeDtype):
def construct_array_type(self):
return DateArray

@staticmethod
def __from_arrow__(
array: Union[pyarrow.Array, pyarrow.ChunkedArray]
) -> "DateArray":
"""Convert to dbdate data from an Arrow array.
See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.compute.cast(array, pyarrow.timestamp("ns"))
np_array = array.to_numpy()
return DateArray(np_array)


class DateArray(core.BaseDatetimeArray):
"""
Expand All @@ -161,7 +226,13 @@ def _datetime(
scalar,
match_fn=re.compile(r"\s*(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)\s*$").match,
):
if isinstance(scalar, datetime.date):
# Convert pyarrow values to datetime.date.
if isinstance(scalar, (pyarrow.Date32Scalar, pyarrow.Date64Scalar)):
scalar = scalar.as_py()

if scalar is None:
return None
elif isinstance(scalar, datetime.date):
return datetime.datetime(scalar.year, scalar.month, scalar.day)
elif isinstance(scalar, str):
match = match_fn(scalar)
Expand Down Expand Up @@ -197,16 +268,22 @@ def astype(self, dtype, copy=True):
return super().astype(dtype, copy=copy)

def __arrow_array__(self, type=None):
return pyarrow.array(
self._ndarray, type=type if type is not None else pyarrow.date32(),
"""Convert to an Arrow array from dbdate data.
See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.array(self._ndarray, type=pyarrow.timestamp("ns"))
return pyarrow.compute.cast(
array, type if type is not None else pyarrow.date32(),
)

def __add__(self, other):
if isinstance(other, pandas.DateOffset):
return self.astype("object") + other

if isinstance(other, TimeArray):
return (other._ndarray - other._npepoch) + self._ndarray
return (other._ndarray - _NPEPOCH) + self._ndarray

return super().__add__(other)

Expand Down
8 changes: 3 additions & 5 deletions db_dtypes/core.py
Expand Up @@ -17,6 +17,7 @@
import numpy
import pandas
from pandas._libs import NaT
import pandas.api.extensions
import pandas.compat.numpy.function
import pandas.core.algorithms
import pandas.core.arrays
Expand All @@ -32,7 +33,7 @@
pandas_release = pandas_backports.pandas_release


class BaseDatetimeDtype(pandas.core.dtypes.base.ExtensionDtype):
class BaseDatetimeDtype(pandas.api.extensions.ExtensionDtype):
na_value = NaT
kind = "o"
names = None
Expand Down Expand Up @@ -60,10 +61,7 @@ def __init__(self, values, dtype=None, copy: bool = False):

@classmethod
def __ndarray(cls, scalars):
return numpy.array(
[None if scalar is None else cls._datetime(scalar) for scalar in scalars],
"M8[ns]",
)
return numpy.array([cls._datetime(scalar) for scalar in scalars], "M8[ns]",)

@classmethod
def _from_sequence(cls, scalars, *, dtype=None, copy=False):
Expand Down

0 comments on commit facc7b0

Please sign in to comment.