diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index 79a387eac6..0f56431cb3 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -161,41 +161,6 @@ def _make_list_value_pbs(values): # pylint: disable=too-many-branches -def _parse_value(value, field_type): - if value is None: - return None - if field_type.code == TypeCode.STRING: - result = value - elif field_type.code == TypeCode.BYTES: - result = value.encode("utf8") - elif field_type.code == TypeCode.BOOL: - result = value - elif field_type.code == TypeCode.INT64: - result = int(value) - elif field_type.code == TypeCode.FLOAT64: - if isinstance(value, str): - result = float(value) - else: - result = value - elif field_type.code == TypeCode.DATE: - result = _date_from_iso8601_date(value) - elif field_type.code == TypeCode.TIMESTAMP: - DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds - result = DatetimeWithNanoseconds.from_rfc3339(value) - elif field_type.code == TypeCode.ARRAY: - result = [_parse_value(item, field_type.array_element_type) for item in value] - elif field_type.code == TypeCode.STRUCT: - result = [ - _parse_value(item, field_type.struct_type.fields[i].type_) - for (i, item) in enumerate(value) - ] - elif field_type.code == TypeCode.NUMERIC: - result = decimal.Decimal(value) - else: - raise ValueError("Unknown type: %s" % (field_type,)) - return result - - def _parse_value_pb(value_pb, field_type): """Convert a Value protobuf to cell data. @@ -209,17 +174,41 @@ def _parse_value_pb(value_pb, field_type): :returns: value extracted from value_pb :raises ValueError: if unknown type is passed """ + type_code = field_type.code if value_pb.HasField("null_value"): return None - if value_pb.HasField("string_value"): - return _parse_value(value_pb.string_value, field_type) - if value_pb.HasField("bool_value"): - return _parse_value(value_pb.bool_value, field_type) - if value_pb.HasField("number_value"): - return _parse_value(value_pb.number_value, field_type) - if value_pb.HasField("list_value"): - return _parse_value(value_pb.list_value, field_type) - raise ValueError("No value set in Value: %s" % (value_pb,)) + if type_code == TypeCode.STRING: + return value_pb.string_value + elif type_code == TypeCode.BYTES: + return value_pb.string_value.encode("utf8") + elif type_code == TypeCode.BOOL: + return value_pb.bool_value + elif type_code == TypeCode.INT64: + return int(value_pb.string_value) + elif type_code == TypeCode.FLOAT64: + if value_pb.HasField("string_value"): + return float(value_pb.string_value) + else: + return value_pb.number_value + elif type_code == TypeCode.DATE: + return _date_from_iso8601_date(value_pb.string_value) + elif type_code == TypeCode.TIMESTAMP: + DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds + return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value) + elif type_code == TypeCode.ARRAY: + return [ + _parse_value_pb(item_pb, field_type.array_element_type) + for item_pb in value_pb.list_value.values + ] + elif type_code == TypeCode.STRUCT: + return [ + _parse_value_pb(item_pb, field_type.struct_type.fields[i].type_) + for (i, item_pb) in enumerate(value_pb.list_value.values) + ] + elif field_type.code == TypeCode.NUMERIC: + return decimal.Decimal(value_pb.string_value) + else: + raise ValueError("Unknown type: %s" % (field_type,)) # pylint: enable=too-many-branches diff --git a/google/cloud/spanner_v1/streamed.py b/google/cloud/spanner_v1/streamed.py index a8b15a8f2b..ec4cb97b9d 100644 --- a/google/cloud/spanner_v1/streamed.py +++ b/google/cloud/spanner_v1/streamed.py @@ -14,12 +14,15 @@ """Wrapper for streaming results.""" +from google.protobuf.struct_pb2 import ListValue +from google.protobuf.struct_pb2 import Value from google.cloud import exceptions +from google.cloud.spanner_v1 import PartialResultSet from google.cloud.spanner_v1 import TypeCode import six # pylint: disable=ungrouped-imports -from google.cloud.spanner_v1._helpers import _parse_value +from google.cloud.spanner_v1._helpers import _parse_value_pb # pylint: enable=ungrouped-imports @@ -88,7 +91,7 @@ def _merge_chunk(self, value): field = self.fields[current_column] merged = _merge_by_type(self._pending_chunk, value, field.type_) self._pending_chunk = None - return _parse_value(merged, field.type_) + return merged def _merge_values(self, values): """Merge values into rows. @@ -96,14 +99,17 @@ def _merge_values(self, values): :type values: list of :class:`~google.protobuf.struct_pb2.Value` :param values: non-chunked values from partial result set. """ - width = len(self.fields) + print(self.fields) + field_types = [field.type_ for field in self.fields] + width = len(field_types) + index = len(self._current_row) for value in values: - index = len(self._current_row) - field = self.fields[index] - self._current_row.append(_parse_value(value, field.type_)) - if len(self._current_row) == width: + self._current_row.append(_parse_value_pb(value, field_types[index])) + index += 1 + if index == width: self._rows.append(self._current_row) self._current_row = [] + index = 0 def _consume_next(self): """Consume the next partial result set from the stream. @@ -111,6 +117,7 @@ def _consume_next(self): Parse the result set into new/existing rows in :attr:`_rows` """ response = six.next(self._response_iterator) + response_pb = PartialResultSet.pb(response) if self._metadata is None: # first response metadata = self._metadata = response.metadata @@ -119,29 +126,27 @@ def _consume_next(self): if source is not None and source._transaction_id is None: source._transaction_id = metadata.transaction.id - if "stats" in response: # last response + if response_pb.HasField("stats"): # last response self._stats = response.stats - values = list(response.values) + values = list(response_pb.values) if self._pending_chunk is not None: values[0] = self._merge_chunk(values[0]) - if response.chunked_value: + if response_pb.chunked_value: self._pending_chunk = values.pop() self._merge_values(values) def __iter__(self): - iter_rows, self._rows[:] = self._rows[:], () while True: - if not iter_rows: - try: - self._consume_next() - except StopIteration: - return - iter_rows, self._rows[:] = self._rows[:], () + iter_rows, self._rows[:] = self._rows[:], () while iter_rows: yield iter_rows.pop(0) + try: + self._consume_next() + except StopIteration: + return def one(self): """Return exactly one result, or raise an exception. @@ -213,9 +218,15 @@ def _unmergeable(lhs, rhs, type_): def _merge_float64(lhs, rhs, type_): # pylint: disable=unused-argument """Helper for '_merge_by_type'.""" - if type(lhs) == str: - return float(lhs + rhs) - array_continuation = type(lhs) == float and type(rhs) == str and rhs == "" + lhs_kind = lhs.WhichOneof("kind") + if lhs_kind == "string_value": + return Value(string_value=lhs.string_value + rhs.string_value) + rhs_kind = rhs.WhichOneof("kind") + array_continuation = ( + lhs_kind == "number_value" + and rhs_kind == "string_value" + and rhs.string_value == "" + ) if array_continuation: return lhs raise Unmergeable(lhs, rhs, type_) @@ -223,7 +234,7 @@ def _merge_float64(lhs, rhs, type_): # pylint: disable=unused-argument def _merge_string(lhs, rhs, type_): # pylint: disable=unused-argument """Helper for '_merge_by_type'.""" - return str(lhs) + str(rhs) + return Value(string_value=lhs.string_value + rhs.string_value) _UNMERGEABLE_TYPES = (TypeCode.BOOL,) @@ -234,17 +245,17 @@ def _merge_array(lhs, rhs, type_): element_type = type_.array_element_type if element_type.code in _UNMERGEABLE_TYPES: # Individual values cannot be merged, just concatenate - lhs.extend(rhs) + lhs.list_value.values.extend(rhs.list_value.values) return lhs + lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values) # Sanity check: If either list is empty, short-circuit. # This is effectively a no-op. if not len(lhs) or not len(rhs): - lhs.extend(rhs) - return lhs + return Value(list_value=ListValue(values=(lhs + rhs))) first = rhs.pop(0) - if first is None: # can't merge + if first.HasField("null_value"): # can't merge lhs.append(first) else: last = lhs.pop() @@ -255,23 +266,22 @@ def _merge_array(lhs, rhs, type_): lhs.append(first) else: lhs.append(merged) - lhs.extend(rhs) - return lhs + return Value(list_value=ListValue(values=(lhs + rhs))) def _merge_struct(lhs, rhs, type_): """Helper for '_merge_by_type'.""" fields = type_.struct_type.fields + lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values) # Sanity check: If either list is empty, short-circuit. # This is effectively a no-op. if not len(lhs) or not len(rhs): - lhs.extend(rhs) - return lhs + return Value(list_value=ListValue(values=(lhs + rhs))) candidate_type = fields[len(lhs) - 1].type_ first = rhs.pop(0) - if first is None or candidate_type.code in _UNMERGEABLE_TYPES: + if first.HasField("null_value") or candidate_type.code in _UNMERGEABLE_TYPES: lhs.append(first) else: last = lhs.pop() @@ -282,8 +292,7 @@ def _merge_struct(lhs, rhs, type_): lhs.append(first) else: lhs.append(merged) - lhs.extend(rhs) - return lhs + return Value(list_value=ListValue(values=lhs + rhs)) _MERGE_BY_TYPE = { diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index d554f3f717..fecf2581de 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -146,13 +146,6 @@ def test_w_float(self): self.assertIsInstance(value_pb, Value) self.assertEqual(value_pb.number_value, 3.14159) - def test_w_float_str(self): - from google.protobuf.struct_pb2 import Value - - value_pb = self._callFUT(3.14159) - self.assertIsInstance(value_pb, Value) - self.assertEqual(value_pb.number_value, 3.14159) - def test_w_float_nan(self): from google.protobuf.struct_pb2 import Value @@ -309,174 +302,6 @@ def test_w_multiple_values(self): self.assertEqual(found.values[1].string_value, expected[1]) -class Test_parse_value(unittest.TestCase): - def _callFUT(self, *args, **kw): - from google.cloud.spanner_v1._helpers import _parse_value - - return _parse_value(*args, **kw) - - def test_w_null(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.STRING) - value = expected_value = None - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_string(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.STRING) - value = expected_value = u"Value" - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_bytes(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.BYTES) - value = "Value" - expected_value = b"Value" - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_bool(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.BOOL) - value = expected_value = True - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_int(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.INT64) - value = "12345" - expected_value = 12345 - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_float(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.FLOAT64) - value = "3.14159" - expected_value = 3.14159 - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_date(self): - import datetime - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - value = "2020-09-22" - expected_value = datetime.date(2020, 9, 22) - field_type = Type(code=TypeCode.DATE) - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_timestamp_wo_nanos(self): - import pytz - from google.api_core import datetime_helpers - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.TIMESTAMP) - value = "2016-12-20T21:13:47.123456Z" - expected_value = datetime_helpers.DatetimeWithNanoseconds( - 2016, 12, 20, 21, 13, 47, microsecond=123456, tzinfo=pytz.UTC - ) - - parsed = self._callFUT(value, field_type) - self.assertIsInstance(parsed, datetime_helpers.DatetimeWithNanoseconds) - self.assertEqual(parsed, expected_value) - - def test_w_timestamp_w_nanos(self): - import pytz - from google.api_core import datetime_helpers - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.TIMESTAMP) - value = "2016-12-20T21:13:47.123456789Z" - expected_value = datetime_helpers.DatetimeWithNanoseconds( - 2016, 12, 20, 21, 13, 47, nanosecond=123456789, tzinfo=pytz.UTC - ) - - parsed = self._callFUT(value, field_type) - self.assertIsInstance(parsed, datetime_helpers.DatetimeWithNanoseconds) - self.assertEqual(parsed, expected_value) - - def test_w_array_empty(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type( - code=TypeCode.ARRAY, array_element_type=Type(code=TypeCode.INT64) - ) - value = [] - - self.assertEqual(self._callFUT(value, field_type), []) - - def test_w_array_non_empty(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type( - code=TypeCode.ARRAY, array_element_type=Type(code=TypeCode.INT64) - ) - values = ["32", "19", "5"] - expected_values = [32, 19, 5] - - self.assertEqual(self._callFUT(values, field_type), expected_values) - - def test_w_struct(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import StructType - from google.cloud.spanner_v1 import TypeCode - - struct_type_pb = StructType( - fields=[ - StructType.Field(name="name", type_=Type(code=TypeCode.STRING)), - StructType.Field(name="age", type_=Type(code=TypeCode.INT64)), - ] - ) - field_type = Type(code=TypeCode.STRUCT, struct_type=struct_type_pb) - values = [u"phred", "32"] - expected_values = [u"phred", 32] - - self.assertEqual(self._callFUT(values, field_type), expected_values) - - def test_w_numeric(self): - import decimal - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.NUMERIC) - expected_value = decimal.Decimal("99999999999999999999999999999.999999999") - value = "99999999999999999999999999999.999999999" - - self.assertEqual(self._callFUT(value, field_type), expected_value) - - def test_w_unknown_type(self): - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.TYPE_CODE_UNSPECIFIED) - value_pb = object() - - with self.assertRaises(ValueError): - self._callFUT(value_pb, field_type) - - class Test_parse_value_pb(unittest.TestCase): def _callFUT(self, *args, **kw): from google.cloud.spanner_v1._helpers import _parse_value_pb @@ -676,17 +501,6 @@ def test_w_unknown_type(self): with self.assertRaises(ValueError): self._callFUT(value_pb, field_type) - def test_w_empty_value(self): - from google.protobuf.struct_pb2 import Value - from google.cloud.spanner_v1 import Type - from google.cloud.spanner_v1 import TypeCode - - field_type = Type(code=TypeCode.STRING) - value_pb = Value() - - with self.assertRaises(ValueError): - self._callFUT(value_pb, field_type) - class Test_parse_list_value_pbs(unittest.TestCase): def _callFUT(self, *args, **kw): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 5250e41c95..2305937204 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -393,6 +393,7 @@ def _read_helper(self, multi_use, first=True, count=0, partition=None): from google.cloud.spanner_v1._helpers import _make_value_pb VALUES = [[u"bharney", 31], [u"phred", 32]] + VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] struct_type_pb = StructType( fields=[ StructType.Field(name="name", type_=Type(code=TypeCode.STRING)), @@ -408,7 +409,7 @@ def _read_helper(self, multi_use, first=True, count=0, partition=None): PartialResultSet(stats=stats_pb), ] for i in range(len(result_sets)): - result_sets[i].values.extend(VALUES[i]) + result_sets[i].values.extend(VALUE_PBS[i]) KEYS = [["bharney@example.com"], ["phred@example.com"]] keyset = KeySet(keys=KEYS) INDEX = "email-address-index" @@ -561,6 +562,7 @@ def _execute_sql_helper( ) VALUES = [[u"bharney", u"rhubbyl", 31], [u"phred", u"phlyntstone", 32]] + VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] MODE = 2 # PROFILE struct_type_pb = StructType( fields=[ @@ -578,7 +580,7 @@ def _execute_sql_helper( PartialResultSet(stats=stats_pb), ] for i in range(len(result_sets)): - result_sets[i].values.extend(VALUES[i]) + result_sets[i].values.extend(VALUE_PBS[i]) iterator = _MockIterator(*result_sets) database = _Database() api = database.spanner_api = self._make_spanner_api() diff --git a/tests/unit/test_streamed.py b/tests/unit/test_streamed.py index 4a31c5d179..63f3bf81fe 100644 --- a/tests/unit/test_streamed.py +++ b/tests/unit/test_streamed.py @@ -89,6 +89,16 @@ def _make_value(value): return _make_value_pb(value) + @staticmethod + def _make_list_value(values=(), value_pbs=None): + from google.protobuf.struct_pb2 import ListValue + from google.protobuf.struct_pb2 import Value + from google.cloud.spanner_v1._helpers import _make_list_value_pb + + if value_pbs is not None: + return Value(list_value=ListValue(values=value_pbs)) + return Value(list_value=_make_list_value_pb(values)) + @staticmethod def _make_result_set_metadata(fields=(), transaction_id=None): from google.cloud.spanner_v1 import ResultSetMetadata @@ -161,26 +171,25 @@ def test__merge_chunk_int64(self): streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("age", TypeCode.INT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = 42 - chunk = 13 + streamed._pending_chunk = self._make_value(42) + chunk = self._make_value(13) merged = streamed._merge_chunk(chunk) - self.assertEqual(merged, 4213) + self.assertEqual(merged.string_value, "4213") self.assertIsNone(streamed._pending_chunk) def test__merge_chunk_float64_nan_string(self): from google.cloud.spanner_v1 import TypeCode - from math import isnan iterator = _MockCancellableIterator() streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("weight", TypeCode.FLOAT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = u"Na" - chunk = u"N" + streamed._pending_chunk = self._make_value(u"Na") + chunk = self._make_value(u"N") merged = streamed._merge_chunk(chunk) - self.assertTrue(isnan(merged)) + self.assertEqual(merged.string_value, u"NaN") def test__merge_chunk_float64_w_empty(self): from google.cloud.spanner_v1 import TypeCode @@ -189,11 +198,11 @@ def test__merge_chunk_float64_w_empty(self): streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("weight", TypeCode.FLOAT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = 3.14159 - chunk = "" + streamed._pending_chunk = self._make_value(3.14159) + chunk = self._make_value("") merged = streamed._merge_chunk(chunk) - self.assertEqual(merged, 3.14159) + self.assertEqual(merged.number_value, 3.14159) def test__merge_chunk_float64_w_float64(self): from google.cloud.spanner_v1.streamed import Unmergeable @@ -203,8 +212,8 @@ def test__merge_chunk_float64_w_float64(self): streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("weight", TypeCode.FLOAT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = 3.14159 - chunk = 2.71828 + streamed._pending_chunk = self._make_value(3.14159) + chunk = self._make_value(2.71828) with self.assertRaises(Unmergeable): streamed._merge_chunk(chunk) @@ -216,12 +225,12 @@ def test__merge_chunk_string(self): streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("name", TypeCode.STRING)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = u"phred" - chunk = u"wylma" + streamed._pending_chunk = self._make_value(u"phred") + chunk = self._make_value(u"wylma") merged = streamed._merge_chunk(chunk) - self.assertEqual(merged, u"phredwylma") + self.assertEqual(merged.string_value, u"phredwylma") self.assertIsNone(streamed._pending_chunk) def test__merge_chunk_string_w_bytes(self): @@ -231,11 +240,11 @@ def test__merge_chunk_string_w_bytes(self): streamed = self._make_one(iterator) FIELDS = [self._make_scalar_field("image", TypeCode.BYTES)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = ( + streamed._pending_chunk = self._make_value( u"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA" u"6fptVAAAACXBIWXMAAAsTAAALEwEAmpwYAAAA\n" ) - chunk = ( + chunk = self._make_value( u"B3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0FNUExF" u"MG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n" ) @@ -243,10 +252,10 @@ def test__merge_chunk_string_w_bytes(self): merged = streamed._merge_chunk(chunk) self.assertEqual( - merged, - b"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACXBIWXMAAAsTAAAL" - b"EwEAmpwYAAAA\nB3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0" - b"FNUExFMG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n", + merged.string_value, + u"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACXBIWXMAAAsTAAAL" + u"EwEAmpwYAAAA\nB3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0" + u"FNUExFMG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n", ) self.assertIsNone(streamed._pending_chunk) @@ -257,12 +266,12 @@ def test__merge_chunk_array_of_bool(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.BOOL)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [True, True] - chunk = [False, False, False] + streamed._pending_chunk = self._make_list_value([True, True]) + chunk = self._make_list_value([False, False, False]) merged = streamed._merge_chunk(chunk) - expected = [True, True, False, False, False] + expected = self._make_list_value([True, True, False, False, False]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -273,12 +282,12 @@ def test__merge_chunk_array_of_int(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.INT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [0, 1, 2] - chunk = [3, 4, 5] + streamed._pending_chunk = self._make_list_value([0, 1, 2]) + chunk = self._make_list_value([3, 4, 5]) merged = streamed._merge_chunk(chunk) - expected = [0, 1, 23, 4, 5] + expected = self._make_list_value([0, 1, 23, 4, 5]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -294,12 +303,12 @@ def test__merge_chunk_array_of_float(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.FLOAT64)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [PI, SQRT_2] - chunk = ["", EULER, LOG_10] + streamed._pending_chunk = self._make_list_value([PI, SQRT_2]) + chunk = self._make_list_value(["", EULER, LOG_10]) merged = streamed._merge_chunk(chunk) - expected = [PI, SQRT_2, EULER, LOG_10] + expected = self._make_list_value([PI, SQRT_2, EULER, LOG_10]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -310,12 +319,12 @@ def test__merge_chunk_array_of_string_with_empty(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.STRING)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [u"A", u"B", u"C"] - chunk = [] + streamed._pending_chunk = self._make_list_value([u"A", u"B", u"C"]) + chunk = self._make_list_value([]) merged = streamed._merge_chunk(chunk) - expected = [u"A", u"B", u"C"] + expected = self._make_list_value([u"A", u"B", u"C"]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -326,12 +335,12 @@ def test__merge_chunk_array_of_string(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.STRING)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [u"A", u"B", u"C"] - chunk = [None, u"D", u"E"] + streamed._pending_chunk = self._make_list_value([u"A", u"B", u"C"]) + chunk = self._make_list_value([None, u"D", u"E"]) merged = streamed._merge_chunk(chunk) - expected = [u"A", u"B", u"C", None, u"D", u"E"] + expected = self._make_list_value([u"A", u"B", u"C", None, u"D", u"E"]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -342,12 +351,12 @@ def test__merge_chunk_array_of_string_with_null(self): streamed = self._make_one(iterator) FIELDS = [self._make_array_field("name", element_type_code=TypeCode.STRING)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [u"A", u"B", u"C"] - chunk = [u"D", u"E"] + streamed._pending_chunk = self._make_list_value([u"A", u"B", u"C"]) + chunk = self._make_list_value([u"D", u"E"]) merged = streamed._merge_chunk(chunk) - expected = [u"A", u"B", u"CD", u"E"] + expected = self._make_list_value([u"A", u"B", u"CD", u"E"]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -364,17 +373,22 @@ def test__merge_chunk_array_of_array_of_int(self): streamed = self._make_one(iterator) FIELDS = [StructType.Field(name="loloi", type_=array_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [[0, 1], [2]] - chunk = [[3], [4, 5]] + streamed._pending_chunk = self._make_list_value( + value_pbs=[self._make_list_value([0, 1]), self._make_list_value([2])] + ) + chunk = self._make_list_value( + value_pbs=[self._make_list_value([3]), self._make_list_value([4, 5])] + ) merged = streamed._merge_chunk(chunk) - expected = [ - [0, 1], - [23], - [4, 5], - ] - + expected = self._make_list_value( + value_pbs=[ + self._make_list_value([0, 1]), + self._make_list_value([23]), + self._make_list_value([4, 5]), + ] + ) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -391,23 +405,28 @@ def test__merge_chunk_array_of_array_of_string(self): streamed = self._make_one(iterator) FIELDS = [StructType.Field(name="lolos", type_=array_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = [ - [u"A", u"B"], - [u"C"], - ] - chunk = [ - [u"D"], - [u"E", u"F"], - ] + streamed._pending_chunk = self._make_list_value( + value_pbs=[ + self._make_list_value([u"A", u"B"]), + self._make_list_value([u"C"]), + ] + ) + chunk = self._make_list_value( + value_pbs=[ + self._make_list_value([u"D"]), + self._make_list_value([u"E", u"F"]), + ] + ) merged = streamed._merge_chunk(chunk) - expected = [ - [u"A", u"B"], - [u"CD"], - [u"E", u"F"], - ] - + expected = self._make_list_value( + value_pbs=[ + self._make_list_value([u"A", u"B"]), + self._make_list_value([u"CD"]), + self._make_list_value([u"E", u"F"]), + ] + ) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -421,15 +440,15 @@ def test__merge_chunk_array_of_struct(self): ) FIELDS = [self._make_array_field("test", element_type=struct_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - partial = [u"Phred "] - streamed._pending_chunk = [partial] - rest = [u"Phlyntstone", 31] - chunk = [rest] + partial = self._make_list_value([u"Phred "]) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value([u"Phlyntstone", 31]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - struct = [u"Phred Phlyntstone", 31] - expected = [struct] + struct = self._make_list_value([u"Phred Phlyntstone", 31]) + expected = self._make_list_value(value_pbs=[struct]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -443,14 +462,14 @@ def test__merge_chunk_array_of_struct_with_empty(self): ) FIELDS = [self._make_array_field("test", element_type=struct_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - partial = [u"Phred "] - streamed._pending_chunk = [partial] - rest = [] - chunk = [rest] + partial = self._make_list_value([u"Phred "]) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value([]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - expected = [partial] + expected = self._make_list_value(value_pbs=[partial]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -468,15 +487,15 @@ def test__merge_chunk_array_of_struct_unmergeable(self): ) FIELDS = [self._make_array_field("test", element_type=struct_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - partial = [u"Phred Phlyntstone", True] - streamed._pending_chunk = [partial] - rest = [True] - chunk = [rest] + partial = self._make_list_value([u"Phred Phlyntstone", True]) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value([True]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - struct = [u"Phred Phlyntstone", True, True] - expected = [struct] + struct = self._make_list_value([u"Phred Phlyntstone", True, True]) + expected = self._make_list_value(value_pbs=[struct]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -488,15 +507,15 @@ def test__merge_chunk_array_of_struct_unmergeable_split(self): ) FIELDS = [self._make_array_field("test", element_type=struct_type)] streamed._metadata = self._make_result_set_metadata(FIELDS) - partial = [u"Phred Phlyntstone", 1.65] - streamed._pending_chunk = [partial] - rest = ["brown"] - chunk = [rest] + partial = self._make_list_value([u"Phred Phlyntstone", 1.65]) + streamed._pending_chunk = self._make_list_value(value_pbs=[partial]) + rest = self._make_list_value(["brown"]) + chunk = self._make_list_value(value_pbs=[rest]) merged = streamed._merge_chunk(chunk) - struct = [u"Phred Phlyntstone", 1.65, "brown"] - expected = [struct] + struct = self._make_list_value([u"Phred Phlyntstone", 1.65, "brown"]) + expected = self._make_list_value(value_pbs=[struct]) self.assertEqual(merged, expected) self.assertIsNone(streamed._pending_chunk) @@ -527,8 +546,8 @@ def test_merge_values_empty_and_partial(self): self._make_scalar_field("married", TypeCode.BOOL), ] streamed._metadata = self._make_result_set_metadata(FIELDS) - VALUES = [u"Phred Phlyntstone", "42"] BARE = [u"Phred Phlyntstone", 42] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(list(streamed), []) @@ -545,8 +564,8 @@ def test_merge_values_empty_and_filled(self): self._make_scalar_field("married", TypeCode.BOOL), ] streamed._metadata = self._make_result_set_metadata(FIELDS) - VALUES = [u"Phred Phlyntstone", "42", True] BARE = [u"Phred Phlyntstone", 42, True] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(list(streamed), [BARE]) @@ -563,15 +582,6 @@ def test_merge_values_empty_and_filled_plus(self): self._make_scalar_field("married", TypeCode.BOOL), ] streamed._metadata = self._make_result_set_metadata(FIELDS) - VALUES = [ - u"Phred Phlyntstone", - "42", - True, - u"Bharney Rhubble", - "39", - True, - u"Wylma Phlyntstone", - ] BARE = [ u"Phred Phlyntstone", 42, @@ -581,6 +591,7 @@ def test_merge_values_empty_and_filled_plus(self): True, u"Wylma Phlyntstone", ] + VALUES = [self._make_value(bare) for bare in BARE] streamed._current_row = [] streamed._merge_values(VALUES) self.assertEqual(list(streamed), [BARE[0:3], BARE[3:6]]) @@ -616,8 +627,8 @@ def test_merge_values_partial_and_partial(self): streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [u"Phred Phlyntstone"] streamed._current_row[:] = BEFORE - TO_MERGE = ["42"] MERGED = [42] + TO_MERGE = [self._make_value(item) for item in MERGED] streamed._merge_values(TO_MERGE) self.assertEqual(list(streamed), []) self.assertEqual(streamed._current_row, BEFORE + MERGED) @@ -635,8 +646,8 @@ def test_merge_values_partial_and_filled(self): streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [u"Phred Phlyntstone"] streamed._current_row[:] = BEFORE - TO_MERGE = ["42", True] MERGED = [42, True] + TO_MERGE = [self._make_value(item) for item in MERGED] streamed._merge_values(TO_MERGE) self.assertEqual(list(streamed), [BEFORE + MERGED]) self.assertEqual(streamed._current_row, []) @@ -654,8 +665,8 @@ def test_merge_values_partial_and_filled_plus(self): streamed._metadata = self._make_result_set_metadata(FIELDS) BEFORE = [self._make_value(u"Phred Phlyntstone")] streamed._current_row[:] = BEFORE - TO_MERGE = ["42", True, u"Bharney Rhubble", "39", True, u"Wylma Phlyntstone"] MERGED = [42, True, u"Bharney Rhubble", 39, True, u"Wylma Phlyntstone"] + TO_MERGE = [self._make_value(item) for item in MERGED] VALUES = BEFORE + MERGED streamed._merge_values(TO_MERGE) self.assertEqual(list(streamed), [VALUES[0:3], VALUES[3:6]]) @@ -720,7 +731,8 @@ def test_consume_next_first_set_partial(self): ] metadata = self._make_result_set_metadata(FIELDS, transaction_id=TXN_ID) BARE = [u"Phred Phlyntstone", 42] - result_set = self._make_partial_result_set(BARE, metadata=metadata) + VALUES = [self._make_value(bare) for bare in BARE] + result_set = self._make_partial_result_set(VALUES, metadata=metadata) iterator = _MockCancellableIterator(result_set) source = mock.Mock(_transaction_id=None, spec=["_transaction_id"]) streamed = self._make_one(iterator, source=source) @@ -768,7 +780,7 @@ def test_consume_next_w_partial_result(self): streamed._consume_next() self.assertEqual(list(streamed), []) self.assertEqual(streamed._current_row, []) - self.assertEqual(streamed._pending_chunk, VALUES[0].string_value) + self.assertEqual(streamed._pending_chunk, VALUES[0]) def test_consume_next_w_pending_chunk(self): from google.cloud.spanner_v1 import TypeCode @@ -792,7 +804,7 @@ def test_consume_next_w_pending_chunk(self): iterator = _MockCancellableIterator(result_set) streamed = self._make_one(iterator) streamed._metadata = self._make_result_set_metadata(FIELDS) - streamed._pending_chunk = u"Phred " + streamed._pending_chunk = self._make_value(u"Phred ") streamed._consume_next() self.assertEqual( list(streamed),