Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: preserve timestamp microsecond precision with rows from REST API #402

Merged
merged 7 commits into from Dec 4, 2020
14 changes: 12 additions & 2 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -81,8 +81,18 @@ def _bytes_from_json(value, field):
def _timestamp_from_json(value, field):
"""Coerce 'value' to a datetime, if set or not nullable."""
if _not_null(value, field):
# value will be a float in seconds, to microsecond precision, in UTC.
return _datetime_from_microseconds(1e6 * float(value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be modified to parse from an rfc timestamp string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i didn't get the point. Before passing params["formatOptions.useInt64Timestamp"] = True in client.list_row, received a value in string with decimal in _timestamp_from_json method , but after passing it, received a value in string without decimal and call the method _datetime_from_microseconds(int(value)) which returns datetime object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Yes, that's expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we should update the comment.

Also, are there cases where floating point values are still passed in? If not, we should clean this up now. If so, let's add a TODO to identify those cases and file an issue to clean them up.

# value will be a integer in seconds, to microsecond precision, in UTC.

# if value is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove commented out code.

# if type(value) == int:
# value = value
# elif type(value) == float:
# value = 1e6 * float(value)
# elif value.isdigit():
# value = int(value)
# else:
# value = 1e6 * float(value)
return _datetime_from_microseconds(int(value))


def _timestamp_query_param_from_json(value, field):
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/client.py
Expand Up @@ -3157,6 +3157,7 @@ def list_rows(
if start_index is not None:
params["startIndex"] = start_index

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
Expand Down Expand Up @@ -3237,6 +3238,7 @@ def _list_rows_from_query_results(
if start_index is not None:
params["startIndex"] = start_index

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -839,6 +839,7 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down Expand Up @@ -887,6 +888,7 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down Expand Up @@ -1118,6 +1120,7 @@ def test_result_w_page_size(self):
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "US",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand All @@ -1129,6 +1132,7 @@ def test_result_w_page_size(self):
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "US",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -501,16 +501,16 @@ def test_to_dataframe_column_dtypes():
}
row_data = [
[
"1.4338368E9",
"1433836800000000",
"420",
"1.1",
"1.77",
"Cto_dataframeash",
"true",
"1999-12-01",
],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
["1387811700000000", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1385565300000000", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
query_resource["rows"] = rows
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test__helpers.py
Expand Up @@ -190,18 +190,18 @@ def test_w_none_required(self):
with self.assertRaises(TypeError):
self._call_fut(None, _Field("REQUIRED"))

def test_w_string_value(self):
def test_w_string_int_value(self):
from google.cloud._helpers import _EPOCH

coerced = self._call_fut("1.234567", object())
coerced = self._call_fut("1234567", object())
self.assertEqual(
coerced, _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)
)

def test_w_float_value(self):
def test_w_int_value(self):
from google.cloud._helpers import _EPOCH

coerced = self._call_fut(1.234567, object())
coerced = self._call_fut(1234567, object())
self.assertEqual(
coerced, _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)
)
Expand Down
74 changes: 38 additions & 36 deletions tests/unit/test_client.py
Expand Up @@ -6739,42 +6739,21 @@ def test_list_rows(self):
self.DS_ID,
self.TABLE_ID,
)
WHEN_TS = 1437767599.006
WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC)
WHEN_1 = WHEN + datetime.timedelta(seconds=1)
WHEN_2 = WHEN + datetime.timedelta(seconds=2)
WHEN_TS = 1437767599006000

WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS / 1e6).replace(tzinfo=UTC)
WHEN_1 = WHEN + datetime.timedelta(microseconds=1)
WHEN_2 = WHEN + datetime.timedelta(microseconds=2)
ROWS = 1234
TOKEN = "TOKEN"

def _bigquery_timestamp_float_repr(ts_float):
# Preserve microsecond precision for E+09 timestamps
return "%0.15E" % (ts_float,)

DATA = {
"totalRows": str(ROWS),
"pageToken": TOKEN,
"rows": [
{
"f": [
{"v": "Phred Phlyntstone"},
{"v": "32"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS)},
]
},
{
"f": [
{"v": "Bharney Rhubble"},
{"v": "33"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS + 1)},
]
},
{
"f": [
{"v": "Wylma Phlyntstone"},
{"v": "29"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS + 2)},
]
},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}, {"v": WHEN_TS}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}, {"v": WHEN_TS + 1}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}, {"v": WHEN_TS + 2}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": None}, {"v": None}]},
],
}
Expand Down Expand Up @@ -6807,7 +6786,10 @@ def _bigquery_timestamp_float_repr(ts_float):
self.assertEqual(iterator.next_page_token, TOKEN)

conn.api_request.assert_called_once_with(
method="GET", path="/%s" % PATH, query_params={}, timeout=7.5
method="GET",
path="/%s" % PATH,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=7.5,
)

def test_list_rows_w_start_index_w_page_size(self):
Expand Down Expand Up @@ -6856,20 +6838,30 @@ def test_list_rows_w_start_index_w_page_size(self):
self.assertEqual(len(rows), 2)
self.assertEqual(rows[0], Row(("Wylma Phlyntstone",), f2i))
self.assertEqual(rows[1], Row(("Bhettye Rhubble",), f2i))
self.assertEqual(extra_params, {"startIndex": 1})
self.assertEqual(
extra_params, {"startIndex": 1, "formatOptions.useInt64Timestamp": True}
)

conn.api_request.assert_has_calls(
[
mock.call(
method="GET",
path="/%s" % PATH,
query_params={"startIndex": 1, "maxResults": 2},
query_params={
"startIndex": 1,
"maxResults": 2,
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
),
mock.call(
method="GET",
path="/%s" % PATH,
query_params={"pageToken": "some-page-token", "maxResults": 2},
query_params={
"pageToken": "some-page-token",
"maxResults": 2,
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
),
]
Expand Down Expand Up @@ -6920,6 +6912,7 @@ def test_list_rows_query_params(self):
iterator = client.list_rows(table, **test[0])
six.next(iterator.pages)
req = conn.api_request.call_args_list[i]
test[1]["formatOptions.useInt64Timestamp"] = True
self.assertEqual(req[1]["query_params"], test[1], "for kwargs %s" % test[0])

def test_list_rows_repeated_fields(self):
Expand Down Expand Up @@ -6979,7 +6972,10 @@ def test_list_rows_repeated_fields(self):
conn.api_request.assert_called_once_with(
method="GET",
path="/%s" % PATH,
query_params={"selectedFields": "color,struct"},
query_params={
"selectedFields": "color,struct",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)

Expand Down Expand Up @@ -7047,7 +7043,10 @@ def test_list_rows_w_record_schema(self):
self.assertEqual(page_token, TOKEN)

conn.api_request.assert_called_once_with(
method="GET", path="/%s" % PATH, query_params={}, timeout=None
method="GET",
path="/%s" % PATH,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=None,
)

def test_list_rows_with_missing_schema(self):
Expand Down Expand Up @@ -7109,7 +7108,10 @@ def test_list_rows_with_missing_schema(self):

rows = list(row_iter)
conn.api_request.assert_called_once_with(
method="GET", path=tabledata_path, query_params={}, timeout=None
method="GET",
path=tabledata_path,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=None,
)
self.assertEqual(row_iter.total_rows, 3, msg=repr(table))
self.assertEqual(rows[0].name, "Phred Phlyntstone", msg=repr(table))
Expand Down
24 changes: 16 additions & 8 deletions tests/unit/test_table.py
Expand Up @@ -2402,8 +2402,8 @@ def test_to_dataframe_timestamp_out_of_pyarrow_bounds(self):

schema = [SchemaField("some_timestamp", "TIMESTAMP")]
rows = [
{"f": [{"v": "81953424000.0"}]}, # 4567-01-01 00:00:00 UTC
{"f": [{"v": "253402214400.0"}]}, # 9999-12-31 00:00:00 UTC
{"f": [{"v": "81953424000000000"}]}, # 4567-01-01 00:00:00 UTC
{"f": [{"v": "253402214400000000"}]}, # 9999-12-31 00:00:00 UTC
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
Expand Down Expand Up @@ -2626,9 +2626,9 @@ def test_to_dataframe_w_various_types_nullable(self):
]
row_data = [
[None, None, None, None, None, None],
["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"],
["1433836800000000", "420", "1.1", u"Cash", "true", "1999-12-01"],
["1387811700000000", "2580", "17.7", u"Cash", "false", "1953-06-14"],
["1385565300000000", "2280", "4.4", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand Down Expand Up @@ -2666,9 +2666,17 @@ def test_to_dataframe_column_dtypes(self):
SchemaField("date", "DATE"),
]
row_data = [
["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
["1433836800000000", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
[
"1387811700000000",
"2580",
"17.7",
"28.5",
u"Cash",
"false",
"1953-06-14",
],
["1385565300000000", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand Down