Skip to content

Commit

Permalink
fix: preserve timestamp microsecond precision with rows from REST API (
Browse files Browse the repository at this point in the history
…#402)

* feat: add formatOption default tru for tablelist and query result

* feat: remove float point serialize

* fix: lint

* feat: remove comments
  • Loading branch information
HemangChothani committed Dec 4, 2020
1 parent f421058 commit 04510a7
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 53 deletions.
4 changes: 2 additions & 2 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -81,8 +81,8 @@ def _bytes_from_json(value, field):
def _timestamp_from_json(value, field):
"""Coerce 'value' to a datetime, if set or not nullable."""
if _not_null(value, field):
# value will be a float in seconds, to microsecond precision, in UTC.
return _datetime_from_microseconds(1e6 * float(value))
# value will be a integer in seconds, to microsecond precision, in UTC.
return _datetime_from_microseconds(int(value))


def _timestamp_query_param_from_json(value, field):
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/client.py
Expand Up @@ -3157,6 +3157,7 @@ def list_rows(
if start_index is not None:
params["startIndex"] = start_index

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
Expand Down Expand Up @@ -3237,6 +3238,7 @@ def _list_rows_from_query_results(
if start_index is not None:
params["startIndex"] = start_index

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -839,6 +839,7 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down Expand Up @@ -887,6 +888,7 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down Expand Up @@ -1118,6 +1120,7 @@ def test_result_w_page_size(self):
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "US",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand All @@ -1129,6 +1132,7 @@ def test_result_w_page_size(self):
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "US",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -501,16 +501,16 @@ def test_to_dataframe_column_dtypes():
}
row_data = [
[
"1.4338368E9",
"1433836800000000",
"420",
"1.1",
"1.77",
"Cto_dataframeash",
"true",
"1999-12-01",
],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
["1387811700000000", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1385565300000000", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
query_resource["rows"] = rows
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test__helpers.py
Expand Up @@ -190,18 +190,18 @@ def test_w_none_required(self):
with self.assertRaises(TypeError):
self._call_fut(None, _Field("REQUIRED"))

def test_w_string_value(self):
def test_w_string_int_value(self):
from google.cloud._helpers import _EPOCH

coerced = self._call_fut("1.234567", object())
coerced = self._call_fut("1234567", object())
self.assertEqual(
coerced, _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)
)

def test_w_float_value(self):
def test_w_int_value(self):
from google.cloud._helpers import _EPOCH

coerced = self._call_fut(1.234567, object())
coerced = self._call_fut(1234567, object())
self.assertEqual(
coerced, _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)
)
Expand Down
74 changes: 38 additions & 36 deletions tests/unit/test_client.py
Expand Up @@ -6739,42 +6739,21 @@ def test_list_rows(self):
self.DS_ID,
self.TABLE_ID,
)
WHEN_TS = 1437767599.006
WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC)
WHEN_1 = WHEN + datetime.timedelta(seconds=1)
WHEN_2 = WHEN + datetime.timedelta(seconds=2)
WHEN_TS = 1437767599006000

WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS / 1e6).replace(tzinfo=UTC)
WHEN_1 = WHEN + datetime.timedelta(microseconds=1)
WHEN_2 = WHEN + datetime.timedelta(microseconds=2)
ROWS = 1234
TOKEN = "TOKEN"

def _bigquery_timestamp_float_repr(ts_float):
# Preserve microsecond precision for E+09 timestamps
return "%0.15E" % (ts_float,)

DATA = {
"totalRows": str(ROWS),
"pageToken": TOKEN,
"rows": [
{
"f": [
{"v": "Phred Phlyntstone"},
{"v": "32"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS)},
]
},
{
"f": [
{"v": "Bharney Rhubble"},
{"v": "33"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS + 1)},
]
},
{
"f": [
{"v": "Wylma Phlyntstone"},
{"v": "29"},
{"v": _bigquery_timestamp_float_repr(WHEN_TS + 2)},
]
},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}, {"v": WHEN_TS}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}, {"v": WHEN_TS + 1}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}, {"v": WHEN_TS + 2}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": None}, {"v": None}]},
],
}
Expand Down Expand Up @@ -6807,7 +6786,10 @@ def _bigquery_timestamp_float_repr(ts_float):
self.assertEqual(iterator.next_page_token, TOKEN)

conn.api_request.assert_called_once_with(
method="GET", path="/%s" % PATH, query_params={}, timeout=7.5
method="GET",
path="/%s" % PATH,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=7.5,
)

def test_list_rows_w_start_index_w_page_size(self):
Expand Down Expand Up @@ -6856,20 +6838,30 @@ def test_list_rows_w_start_index_w_page_size(self):
self.assertEqual(len(rows), 2)
self.assertEqual(rows[0], Row(("Wylma Phlyntstone",), f2i))
self.assertEqual(rows[1], Row(("Bhettye Rhubble",), f2i))
self.assertEqual(extra_params, {"startIndex": 1})
self.assertEqual(
extra_params, {"startIndex": 1, "formatOptions.useInt64Timestamp": True}
)

conn.api_request.assert_has_calls(
[
mock.call(
method="GET",
path="/%s" % PATH,
query_params={"startIndex": 1, "maxResults": 2},
query_params={
"startIndex": 1,
"maxResults": 2,
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
),
mock.call(
method="GET",
path="/%s" % PATH,
query_params={"pageToken": "some-page-token", "maxResults": 2},
query_params={
"pageToken": "some-page-token",
"maxResults": 2,
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
),
]
Expand Down Expand Up @@ -6920,6 +6912,7 @@ def test_list_rows_query_params(self):
iterator = client.list_rows(table, **test[0])
six.next(iterator.pages)
req = conn.api_request.call_args_list[i]
test[1]["formatOptions.useInt64Timestamp"] = True
self.assertEqual(req[1]["query_params"], test[1], "for kwargs %s" % test[0])

def test_list_rows_repeated_fields(self):
Expand Down Expand Up @@ -6979,7 +6972,10 @@ def test_list_rows_repeated_fields(self):
conn.api_request.assert_called_once_with(
method="GET",
path="/%s" % PATH,
query_params={"selectedFields": "color,struct"},
query_params={
"selectedFields": "color,struct",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)

Expand Down Expand Up @@ -7047,7 +7043,10 @@ def test_list_rows_w_record_schema(self):
self.assertEqual(page_token, TOKEN)

conn.api_request.assert_called_once_with(
method="GET", path="/%s" % PATH, query_params={}, timeout=None
method="GET",
path="/%s" % PATH,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=None,
)

def test_list_rows_with_missing_schema(self):
Expand Down Expand Up @@ -7109,7 +7108,10 @@ def test_list_rows_with_missing_schema(self):

rows = list(row_iter)
conn.api_request.assert_called_once_with(
method="GET", path=tabledata_path, query_params={}, timeout=None
method="GET",
path=tabledata_path,
query_params={"formatOptions.useInt64Timestamp": True},
timeout=None,
)
self.assertEqual(row_iter.total_rows, 3, msg=repr(table))
self.assertEqual(rows[0].name, "Phred Phlyntstone", msg=repr(table))
Expand Down
24 changes: 16 additions & 8 deletions tests/unit/test_table.py
Expand Up @@ -2451,8 +2451,8 @@ def test_to_dataframe_timestamp_out_of_pyarrow_bounds(self):

schema = [SchemaField("some_timestamp", "TIMESTAMP")]
rows = [
{"f": [{"v": "81953424000.0"}]}, # 4567-01-01 00:00:00 UTC
{"f": [{"v": "253402214400.0"}]}, # 9999-12-31 00:00:00 UTC
{"f": [{"v": "81953424000000000"}]}, # 4567-01-01 00:00:00 UTC
{"f": [{"v": "253402214400000000"}]}, # 9999-12-31 00:00:00 UTC
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
Expand Down Expand Up @@ -2675,9 +2675,9 @@ def test_to_dataframe_w_various_types_nullable(self):
]
row_data = [
[None, None, None, None, None, None],
["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"],
["1433836800000000", "420", "1.1", u"Cash", "true", "1999-12-01"],
["1387811700000000", "2580", "17.7", u"Cash", "false", "1953-06-14"],
["1385565300000000", "2280", "4.4", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand Down Expand Up @@ -2715,9 +2715,17 @@ def test_to_dataframe_column_dtypes(self):
SchemaField("date", "DATE"),
]
row_data = [
["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
["1433836800000000", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
[
"1387811700000000",
"2580",
"17.7",
"28.5",
u"Cash",
"false",
"1953-06-14",
],
["1385565300000000", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand Down

0 comments on commit 04510a7

Please sign in to comment.