Skip to content

Commit

Permalink
fix: Convert PBs in system test cleanup (#199)
Browse files Browse the repository at this point in the history
Fixes #198, includes #201.

Fix a bug in test_system.py around backup instance proto conversion.
  • Loading branch information
c24t committed Dec 22, 2020
1 parent 2b74f9c commit ede4343
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 80 deletions.
159 changes: 82 additions & 77 deletions tests/system/test_system.py
Expand Up @@ -35,11 +35,13 @@
from google.cloud.spanner_v1 import Type

from google.cloud._helpers import UTC
from google.cloud.spanner_v1 import BurstyPool
from google.cloud.spanner_v1 import COMMIT_TIMESTAMP
from google.cloud.spanner_v1 import Client
from google.cloud.spanner_v1 import KeyRange
from google.cloud.spanner_v1 import KeySet
from google.cloud.spanner_v1 import BurstyPool
from google.cloud.spanner_v1 import COMMIT_TIMESTAMP
from google.cloud.spanner_v1.instance import Backup
from google.cloud.spanner_v1.instance import Instance

from test_utils.retry import RetryErrors
from test_utils.retry import RetryInstanceState
Expand Down Expand Up @@ -115,14 +117,17 @@ def setUpModule():

# Delete test instances that are older than an hour.
cutoff = int(time.time()) - 1 * 60 * 60
for instance in Config.CLIENT.list_instances("labels.python-spanner-systests:true"):
instance_pbs = Config.CLIENT.list_instances("labels.python-spanner-systests:true")
for instance_pb in instance_pbs:
instance = Instance.from_pb(instance_pb, Config.CLIENT)
if "created" not in instance.labels:
continue
create_time = int(instance.labels["created"])
if create_time > cutoff:
continue
# Instance cannot be deleted while backups exist.
for backup in instance.list_backups():
for backup_pb in instance.list_backups():
backup = Backup.from_pb(backup_pb, instance)
backup.delete()
instance.delete()

Expand Down Expand Up @@ -939,9 +944,9 @@ def test_batch_insert_then_read(self):
)

def test_batch_insert_then_read_string_array_of_string(self):
TABLE = "string_plus_array_of_string"
COLUMNS = ["id", "name", "tags"]
ROWDATA = [
table = "string_plus_array_of_string"
columns = ["id", "name", "tags"]
rowdata = [
(0, None, None),
(1, "phred", ["yabba", "dabba", "do"]),
(2, "bharney", []),
Expand All @@ -951,12 +956,12 @@ def test_batch_insert_then_read_string_array_of_string(self):
retry(self._db.reload)()

with self._db.batch() as batch:
batch.delete(TABLE, self.ALL)
batch.insert(TABLE, COLUMNS, ROWDATA)
batch.delete(table, self.ALL)
batch.insert(table, columns, rowdata)

with self._db.snapshot(read_timestamp=batch.committed) as snapshot:
rows = list(snapshot.read(TABLE, COLUMNS, self.ALL))
self._check_rows_data(rows, expected=ROWDATA)
rows = list(snapshot.read(table, columns, self.ALL))
self._check_rows_data(rows, expected=rowdata)

def test_batch_insert_then_read_all_datatypes(self):
retry = RetryInstanceState(_has_all_ddl)
Expand Down Expand Up @@ -1570,23 +1575,23 @@ def _read_w_concurrent_update(self, transaction, pkey):
transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]])

def test_transaction_read_w_concurrent_updates(self):
PKEY = "read_w_concurrent_updates"
self._transaction_concurrency_helper(self._read_w_concurrent_update, PKEY)
pkey = "read_w_concurrent_updates"
self._transaction_concurrency_helper(self._read_w_concurrent_update, pkey)

def _query_w_concurrent_update(self, transaction, pkey):
SQL = "SELECT * FROM counters WHERE name = @name"
sql = "SELECT * FROM counters WHERE name = @name"
rows = list(
transaction.execute_sql(
SQL, params={"name": pkey}, param_types={"name": param_types.STRING}
sql, params={"name": pkey}, param_types={"name": param_types.STRING}
)
)
self.assertEqual(len(rows), 1)
pkey, value = rows[0]
transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[pkey, value + 1]])

def test_transaction_query_w_concurrent_updates(self):
PKEY = "query_w_concurrent_updates"
self._transaction_concurrency_helper(self._query_w_concurrent_update, PKEY)
pkey = "query_w_concurrent_updates"
self._transaction_concurrency_helper(self._query_w_concurrent_update, pkey)

@unittest.skipIf(USE_EMULATOR, "Skipping concurrent transactions")
def test_transaction_read_w_abort(self):
Expand Down Expand Up @@ -1684,9 +1689,9 @@ def test_snapshot_read_w_various_staleness(self):
from datetime import datetime
from google.cloud._helpers import UTC

ROW_COUNT = 400
committed = self._set_up_table(ROW_COUNT)
all_data_rows = list(self._row_data(ROW_COUNT))
row_count = 400
committed = self._set_up_table(row_count)
all_data_rows = list(self._row_data(row_count))

before_reads = datetime.utcnow().replace(tzinfo=UTC)

Expand Down Expand Up @@ -1718,9 +1723,9 @@ def test_snapshot_read_w_various_staleness(self):
self._check_row_data(rows, all_data_rows)

def test_multiuse_snapshot_read_isolation_strong(self):
ROW_COUNT = 40
self._set_up_table(ROW_COUNT)
all_data_rows = list(self._row_data(ROW_COUNT))
row_count = 40
self._set_up_table(row_count)
all_data_rows = list(self._row_data(row_count))
with self._db.snapshot(multi_use=True) as strong:
before = list(strong.read(self.TABLE, self.COLUMNS, self.ALL))
self._check_row_data(before, all_data_rows)
Expand All @@ -1732,9 +1737,9 @@ def test_multiuse_snapshot_read_isolation_strong(self):
self._check_row_data(after, all_data_rows)

def test_multiuse_snapshot_read_isolation_read_timestamp(self):
ROW_COUNT = 40
committed = self._set_up_table(ROW_COUNT)
all_data_rows = list(self._row_data(ROW_COUNT))
row_count = 40
committed = self._set_up_table(row_count)
all_data_rows = list(self._row_data(row_count))

with self._db.snapshot(read_timestamp=committed, multi_use=True) as read_ts:

Expand All @@ -1748,10 +1753,10 @@ def test_multiuse_snapshot_read_isolation_read_timestamp(self):
self._check_row_data(after, all_data_rows)

def test_multiuse_snapshot_read_isolation_exact_staleness(self):
ROW_COUNT = 40
row_count = 40

self._set_up_table(ROW_COUNT)
all_data_rows = list(self._row_data(ROW_COUNT))
self._set_up_table(row_count)
all_data_rows = list(self._row_data(row_count))

time.sleep(1)
delta = datetime.timedelta(microseconds=1000)
Expand All @@ -1768,7 +1773,7 @@ def test_multiuse_snapshot_read_isolation_exact_staleness(self):
self._check_row_data(after, all_data_rows)

def test_read_w_index(self):
ROW_COUNT = 2000
row_count = 2000
# Indexed reads cannot return non-indexed columns
MY_COLUMNS = self.COLUMNS[0], self.COLUMNS[2]
EXTRA_DDL = ["CREATE INDEX contacts_by_last_name ON contacts(last_name)"]
Expand All @@ -1784,7 +1789,7 @@ def test_read_w_index(self):

# We want to make sure the operation completes.
operation.result(30) # raises on failure / timeout.
committed = self._set_up_table(ROW_COUNT, database=temp_db)
committed = self._set_up_table(row_count, database=temp_db)

with temp_db.snapshot(read_timestamp=committed) as snapshot:
rows = list(
Expand All @@ -1794,36 +1799,36 @@ def test_read_w_index(self):
)

expected = list(
reversed([(row[0], row[2]) for row in self._row_data(ROW_COUNT)])
reversed([(row[0], row[2]) for row in self._row_data(row_count)])
)
self._check_rows_data(rows, expected)

def test_read_w_single_key(self):
# [START spanner_test_single_key_read]
ROW_COUNT = 40
committed = self._set_up_table(ROW_COUNT)
row_count = 40
committed = self._set_up_table(row_count)

with self._db.snapshot(read_timestamp=committed) as snapshot:
rows = list(snapshot.read(self.TABLE, self.COLUMNS, KeySet(keys=[(0,)])))

all_data_rows = list(self._row_data(ROW_COUNT))
all_data_rows = list(self._row_data(row_count))
expected = [all_data_rows[0]]
self._check_row_data(rows, expected)
# [END spanner_test_single_key_read]

def test_empty_read(self):
# [START spanner_test_empty_read]
ROW_COUNT = 40
self._set_up_table(ROW_COUNT)
row_count = 40
self._set_up_table(row_count)
with self._db.snapshot() as snapshot:
rows = list(snapshot.read(self.TABLE, self.COLUMNS, KeySet(keys=[(40,)])))
self._check_row_data(rows, [])
# [END spanner_test_empty_read]

def test_read_w_multiple_keys(self):
ROW_COUNT = 40
row_count = 40
indices = [0, 5, 17]
committed = self._set_up_table(ROW_COUNT)
committed = self._set_up_table(row_count)

with self._db.snapshot(read_timestamp=committed) as snapshot:
rows = list(
Expand All @@ -1834,58 +1839,58 @@ def test_read_w_multiple_keys(self):
)
)

all_data_rows = list(self._row_data(ROW_COUNT))
all_data_rows = list(self._row_data(row_count))
expected = [row for row in all_data_rows if row[0] in indices]
self._check_row_data(rows, expected)

def test_read_w_limit(self):
ROW_COUNT = 3000
LIMIT = 100
committed = self._set_up_table(ROW_COUNT)
row_count = 3000
limit = 100
committed = self._set_up_table(row_count)

with self._db.snapshot(read_timestamp=committed) as snapshot:
rows = list(snapshot.read(self.TABLE, self.COLUMNS, self.ALL, limit=LIMIT))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, self.ALL, limit=limit))

all_data_rows = list(self._row_data(ROW_COUNT))
expected = all_data_rows[:LIMIT]
all_data_rows = list(self._row_data(row_count))
expected = all_data_rows[:limit]
self._check_row_data(rows, expected)

def test_read_w_ranges(self):
ROW_COUNT = 3000
START = 1000
END = 2000
committed = self._set_up_table(ROW_COUNT)
row_count = 3000
start = 1000
end = 2000
committed = self._set_up_table(row_count)
with self._db.snapshot(read_timestamp=committed, multi_use=True) as snapshot:
all_data_rows = list(self._row_data(ROW_COUNT))
all_data_rows = list(self._row_data(row_count))

single_key = KeyRange(start_closed=[START], end_open=[START + 1])
single_key = KeyRange(start_closed=[start], end_open=[start + 1])
keyset = KeySet(ranges=(single_key,))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, keyset))
expected = all_data_rows[START : START + 1]
expected = all_data_rows[start : start + 1]
self._check_rows_data(rows, expected)

closed_closed = KeyRange(start_closed=[START], end_closed=[END])
closed_closed = KeyRange(start_closed=[start], end_closed=[end])
keyset = KeySet(ranges=(closed_closed,))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, keyset))
expected = all_data_rows[START : END + 1]
expected = all_data_rows[start : end + 1]
self._check_row_data(rows, expected)

closed_open = KeyRange(start_closed=[START], end_open=[END])
closed_open = KeyRange(start_closed=[start], end_open=[end])
keyset = KeySet(ranges=(closed_open,))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, keyset))
expected = all_data_rows[START:END]
expected = all_data_rows[start:end]
self._check_row_data(rows, expected)

open_open = KeyRange(start_open=[START], end_open=[END])
open_open = KeyRange(start_open=[start], end_open=[end])
keyset = KeySet(ranges=(open_open,))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, keyset))
expected = all_data_rows[START + 1 : END]
expected = all_data_rows[start + 1 : end]
self._check_row_data(rows, expected)

open_closed = KeyRange(start_open=[START], end_closed=[END])
open_closed = KeyRange(start_open=[start], end_closed=[end])
keyset = KeySet(ranges=(open_closed,))
rows = list(snapshot.read(self.TABLE, self.COLUMNS, keyset))
expected = all_data_rows[START + 1 : END + 1]
expected = all_data_rows[start + 1 : end + 1]
self._check_row_data(rows, expected)

def test_read_partial_range_until_end(self):
Expand Down Expand Up @@ -2129,8 +2134,8 @@ def test_partition_read_w_index(self):
batch_txn.close()

def test_execute_sql_w_manual_consume(self):
ROW_COUNT = 3000
committed = self._set_up_table(ROW_COUNT)
row_count = 3000
committed = self._set_up_table(row_count)

with self._db.snapshot(read_timestamp=committed) as snapshot:
streamed = snapshot.execute_sql(self.SQL)
Expand All @@ -2154,9 +2159,9 @@ def _check_sql_results(
self._check_rows_data(rows, expected=expected)

def test_multiuse_snapshot_execute_sql_isolation_strong(self):
ROW_COUNT = 40
self._set_up_table(ROW_COUNT)
all_data_rows = list(self._row_data(ROW_COUNT))
row_count = 40
self._set_up_table(row_count)
all_data_rows = list(self._row_data(row_count))
with self._db.snapshot(multi_use=True) as strong:

before = list(strong.execute_sql(self.SQL))
Expand All @@ -2169,22 +2174,22 @@ def test_multiuse_snapshot_execute_sql_isolation_strong(self):
self._check_row_data(after, all_data_rows)

def test_execute_sql_returning_array_of_struct(self):
SQL = (
sql = (
"SELECT ARRAY(SELECT AS STRUCT C1, C2 "
"FROM (SELECT 'a' AS C1, 1 AS C2 "
"UNION ALL SELECT 'b' AS C1, 2 AS C2) "
"ORDER BY C1 ASC)"
)
self._check_sql_results(
self._db,
sql=SQL,
sql=sql,
params=None,
param_types=None,
expected=[[[["a", 1], ["b", 2]]]],
)

def test_execute_sql_returning_empty_array_of_struct(self):
SQL = (
sql = (
"SELECT ARRAY(SELECT AS STRUCT C1, C2 "
"FROM (SELECT 2 AS C1) X "
"JOIN (SELECT 1 AS C2) Y "
Expand All @@ -2194,7 +2199,7 @@ def test_execute_sql_returning_empty_array_of_struct(self):
self._db.snapshot(multi_use=True)

self._check_sql_results(
self._db, sql=SQL, params=None, param_types=None, expected=[[[]]]
self._db, sql=sql, params=None, param_types=None, expected=[[[]]]
)

def test_invalid_type(self):
Expand Down Expand Up @@ -2359,11 +2364,11 @@ def test_execute_sql_w_numeric_bindings(self):
self._bind_test_helper(TypeCode.NUMERIC, NUMERIC_1, [NUMERIC_1, NUMERIC_2])

def test_execute_sql_w_query_param_struct(self):
NAME = "Phred"
COUNT = 123
SIZE = 23.456
HEIGHT = 188.0
WEIGHT = 97.6
name = "Phred"
count = 123
size = 23.456
height = 188.0
weight = 97.6

record_type = param_types.Struct(
[
Expand Down Expand Up @@ -2416,9 +2421,9 @@ def test_execute_sql_w_query_param_struct(self):
self._check_sql_results(
self._db,
sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight",
params={"r": (NAME, COUNT, SIZE, (HEIGHT, WEIGHT))},
params={"r": (name, count, size, (height, weight))},
param_types={"r": record_type},
expected=[(NAME, COUNT, SIZE, WEIGHT)],
expected=[(name, count, size, weight)],
order=False,
)

Expand Down

0 comments on commit ede4343

Please sign in to comment.