Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configurable leader placement support #399

Merged
merged 9 commits into from Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -144,6 +144,7 @@ def __init__(
self._version_retention_period = None
self._earliest_version_time = None
self._encryption_info = None
self._default_leader = None
self.log_commit_stats = False
self._logger = logger
self._encryption_config = encryption_config
Expand Down Expand Up @@ -279,6 +280,15 @@ def encryption_info(self):
"""
return self._encryption_info

@property
def default_leader(self):
"""The read-write region which contains the database's leader replicas.

:rtype: str
:returns: a string representing the read-write region
"""
return self._default_leader

@property
def ddl_statements(self):
"""DDL Statements used to define database schema.
Expand Down Expand Up @@ -414,6 +424,7 @@ def reload(self):
self._earliest_version_time = response.earliest_version_time
self._encryption_config = response.encryption_config
self._encryption_info = response.encryption_info
self._default_leader = response.default_leader

def update_ddl(self, ddl_statements, operation_id=""):
"""Update DDL for this database.
Expand Down
84 changes: 84 additions & 0 deletions tests/system/test_system.py
Expand Up @@ -353,16 +353,34 @@ def setUpClass(cls):
SPANNER_OPERATION_TIMEOUT_IN_SECONDS
) # raises on failure / timeout.

# Create a multi-region instance
multi_region_config = "nam3"
ALT_INSTANCE_ID = "new" + unique_resource_id("-")
zoercai marked this conversation as resolved.
Show resolved Hide resolved
config_name = "{}/instanceConfigs/{}".format(
Config.CLIENT.project_name, multi_region_config
)
create_time = str(int(time.time()))
labels = {"python-spanner-systests": "true", "created": create_time}
cls._instance = Config.CLIENT.instance(
instance_id=ALT_INSTANCE_ID, configuration_name=config_name, labels=labels
)
operation = cls._instance.create()
operation.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS)

@classmethod
def tearDownClass(cls):
cls._db.drop()
cls._instance.delete()

def setUp(self):
self.instances_to_delete = []
self.to_delete = []

def tearDown(self):
for doomed in self.to_delete:
doomed.drop()
for instance in self.instances_to_delete:
instance.delete()
zoercai marked this conversation as resolved.
Show resolved Hide resolved

def test_list_databases(self):
# Since `Config.INSTANCE` is newly created in `setUpModule`, the
Expand Down Expand Up @@ -443,6 +461,42 @@ def test_create_database_pitr_success(self):
for result in results:
self.assertEqual(result[0], retention_period)

@unittest.skipIf(
USE_EMULATOR, "Default leader setting is not supported by the emulator"
)
def test_create_database_with_default_leader_success(self):
pool = BurstyPool(labels={"testcase": "create_database_default_leader"})

temp_db_id = "temp_db" + unique_resource_id("_")
default_leader = "us-east4"
ddl_statements = [
"ALTER DATABASE {}"
" SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader)
]
temp_db = self._instance.database(
temp_db_id, pool=pool, ddl_statements=ddl_statements
)
operation = temp_db.create()
self.to_delete.append(temp_db)

# We want to make sure the operation completes.
operation.result(30) # raises on failure / timeout.

database_ids = [database.name for database in self._instance.list_databases()]
self.assertIn(temp_db.name, database_ids)

temp_db.reload()
self.assertEqual(temp_db.default_leader, default_leader)

with temp_db.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT OPTION_VALUE AS default_leader "
"FROM INFORMATION_SCHEMA.DATABASE_OPTIONS "
"WHERE SCHEMA_NAME = '' AND OPTION_NAME = 'default_leader'"
)
for result in results:
self.assertEqual(result[0], default_leader)

def test_table_not_found(self):
temp_db_id = "temp_db" + unique_resource_id("_")

Expand Down Expand Up @@ -551,6 +605,36 @@ def test_update_database_ddl_pitr_success(self):
self.assertEqual(temp_db.version_retention_period, retention_period)
self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements))

@unittest.skipIf(
USE_EMULATOR, "Default leader update is not supported by the emulator"
)
def test_update_database_ddl_default_leader_success(self):
pool = BurstyPool(labels={"testcase": "update_database_ddl_default_leader"})

temp_db_id = "temp_db" + unique_resource_id("_")
default_leader = "us-east4"
temp_db = self._instance.database(temp_db_id, pool=pool)
create_op = temp_db.create()
self.to_delete.append(temp_db)

# We want to make sure the operation completes.
create_op.result(240) # raises on failure / timeout.

self.assertIsNone(temp_db.default_leader)

ddl_statements = DDL_STATEMENTS + [
"ALTER DATABASE {}"
" SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader)
]
operation = temp_db.update_ddl(ddl_statements)

# We want to make sure the operation completes.
operation.result(240) # raises on failure / timeout.

temp_db.reload()
self.assertEqual(temp_db.default_leader, default_leader)
self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements))

def test_db_batch_insert_then_db_snapshot_read(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()
Expand Down
6 changes: 5 additions & 1 deletion tests/unit/test_client.py
Expand Up @@ -40,6 +40,7 @@ class TestClient(unittest.TestCase):
PROCESSING_UNITS = 5000
LABELS = {"test": "true"}
TIMEOUT_SECONDS = 80
LEADER_OPTIONS = ["leader1", "leader2"]

def _get_target_class(self):
from google.cloud import spanner
Expand Down Expand Up @@ -457,7 +458,9 @@ def test_list_instance_configs(self):
instance_config_pbs = ListInstanceConfigsResponse(
instance_configs=[
InstanceConfigPB(
name=self.CONFIGURATION_NAME, display_name=self.DISPLAY_NAME
name=self.CONFIGURATION_NAME,
display_name=self.DISPLAY_NAME,
leader_options=self.LEADER_OPTIONS,
)
]
)
Expand All @@ -473,6 +476,7 @@ def test_list_instance_configs(self):
self.assertIsInstance(instance_config, InstanceConfigPB)
self.assertEqual(instance_config.name, self.CONFIGURATION_NAME)
self.assertEqual(instance_config.display_name, self.DISPLAY_NAME)
self.assertEqual(instance_config.leader_options, self.LEADER_OPTIONS)

expected_metadata = (
("google-cloud-resource-prefix", client.project_name),
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_database.py
Expand Up @@ -333,6 +333,13 @@ def test_encryption_info(self):
]
self.assertEqual(database.encryption_info, encryption_info)

def test_default_leader(self):
instance = _Instance(self.INSTANCE_NAME)
pool = _Pool()
database = self._make_one(self.DATABASE_ID, instance, pool=pool)
default_leader = database._default_leader = "us-east4"
self.assertEqual(database.default_leader, default_leader)

def test_spanner_api_property_w_scopeless_creds(self):

client = _Client()
Expand Down Expand Up @@ -715,6 +722,7 @@ def test_reload_success(self):
kms_key_version="kms_key_version",
)
]
default_leader = "us-east4"
api = client.database_admin_api = self._make_database_admin_api()
api.get_database_ddl.return_value = ddl_pb
db_pb = Database(
Expand All @@ -725,6 +733,7 @@ def test_reload_success(self):
earliest_version_time=_datetime_to_pb_timestamp(timestamp),
encryption_config=encryption_config,
encryption_info=encryption_info,
default_leader=default_leader,
)
api.get_database.return_value = db_pb
instance = _Instance(self.INSTANCE_NAME, client=client)
Expand All @@ -740,6 +749,7 @@ def test_reload_success(self):
self.assertEqual(database._ddl_statements, tuple(DDL_STATEMENTS))
self.assertEqual(database._encryption_config, encryption_config)
self.assertEqual(database._encryption_info, encryption_info)
self.assertEqual(database._default_leader, default_leader)

api.get_database_ddl.assert_called_once_with(
database=self.DATABASE_NAME,
Expand Down