Skip to content

Commit

Permalink
tests: refactor samples fixtures (#423)
Browse files Browse the repository at this point in the history
* tests: refactor quickstart sample to use shared fixtures

* tests: refactor autocommit sample to use shared fixtures

* tests: refactor backup sample to use shared fixtures

* tests: refactor snippets to use shared fixtures

* tests: add 'pytest-dependency' plugin

Closes #390.
Closes #418.
  • Loading branch information
tseaver committed Jul 20, 2021
1 parent ee31e8a commit fc1bc56
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 383 deletions.
59 changes: 8 additions & 51 deletions samples/samples/autocommit_test.py
Expand Up @@ -4,72 +4,29 @@
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

import time
import uuid

from google.api_core.exceptions import Aborted
from google.cloud import spanner
import pytest
from test_utils.retry import RetryErrors

import autocommit
from snippets_test import cleanup_old_instances


def unique_instance_id():
"""Creates a unique id for the database."""
return f"test-instance-{uuid.uuid4().hex[:10]}"


def unique_database_id():
"""Creates a unique id for the database."""
return f"test-db-{uuid.uuid4().hex[:10]}"


INSTANCE_ID = unique_instance_id()
DATABASE_ID = unique_database_id()


@pytest.fixture(scope="module")
def spanner_instance():
spanner_client = spanner.Client()
cleanup_old_instances(spanner_client)
instance_config = "{}/instanceConfigs/{}".format(
spanner_client.project_name, "regional-us-central1"
)
instance = spanner_client.instance(
INSTANCE_ID,
instance_config,
labels={
"cloud_spanner_samples": "true",
"sample_name": "autocommit",
"created": str(int(time.time()))
}
)
op = instance.create()
op.result(120) # block until completion
yield instance
instance.delete()


@pytest.fixture(scope="module")
def database(spanner_instance):
"""Creates a temporary database that is removed after testing."""
db = spanner_instance.database(DATABASE_ID)
db.create()
yield db
db.drop()
def sample_name():
return "autocommit"


@RetryErrors(exception=Aborted, max_tries=2)
def test_enable_autocommit_mode(capsys, database):
def test_enable_autocommit_mode(capsys, instance_id, sample_database):
# Delete table if it exists for retry attempts.
table = database.table('Singers')
table = sample_database.table('Singers')
if table.exists():
op = database.update_ddl(["DROP TABLE Singers"])
op = sample_database.update_ddl(["DROP TABLE Singers"])
op.result()

autocommit.enable_autocommit_mode(INSTANCE_ID, DATABASE_ID)
autocommit.enable_autocommit_mode(
instance_id, sample_database.database_id,
)
out, _ = capsys.readouterr()
assert "Autocommit mode is enabled." in out
assert "SingerId: 13, AlbumId: Russell, AlbumTitle: Morales" in out
140 changes: 58 additions & 82 deletions samples/samples/backup_sample_test.py
Expand Up @@ -11,21 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import uuid

from google.api_core.exceptions import DeadlineExceeded
from google.cloud import spanner
import pytest
from test_utils.retry import RetryErrors

import backup_sample
from snippets_test import cleanup_old_instances


def unique_instance_id():
""" Creates a unique id for the database. """
return f"test-instance-{uuid.uuid4().hex[:10]}"
@pytest.fixture(scope="module")
def sample_name():
return "backup"


def unique_database_id():
Expand All @@ -38,8 +35,6 @@ def unique_backup_id():
return f"test-backup-{uuid.uuid4().hex[:10]}"


INSTANCE_ID = unique_instance_id()
DATABASE_ID = unique_database_id()
RESTORE_DB_ID = unique_database_id()
BACKUP_ID = unique_backup_id()
CMEK_RESTORE_DB_ID = unique_database_id()
Expand All @@ -48,121 +43,100 @@ def unique_backup_id():
RETENTION_PERIOD = "7d"


@pytest.fixture(scope="module")
def spanner_instance():
spanner_client = spanner.Client()
cleanup_old_instances(spanner_client)
instance_config = "{}/instanceConfigs/{}".format(
spanner_client.project_name, "regional-us-central1"
)
instance = spanner_client.instance(
INSTANCE_ID,
instance_config,
labels={
"cloud_spanner_samples": "true",
"sample_name": "backup",
"created": str(int(time.time()))
}
)
op = instance.create()
op.result(120) # block until completion
yield instance
for database_pb in instance.list_databases():
database = instance.database(database_pb.name.split("/")[-1])
database.drop()
for backup_pb in instance.list_backups():
backup = instance.backup(backup_pb.name.split("/")[-1])
backup.delete()
instance.delete()


@pytest.fixture(scope="module")
def database(spanner_instance):
""" Creates a temporary database that is removed after testing. """
db = spanner_instance.database(DATABASE_ID)
db.create()
yield db
db.drop()


def test_create_backup(capsys, database):
@pytest.mark.dependency(name="create_backup")
def test_create_backup(capsys, instance_id, sample_database):
version_time = None
with database.snapshot() as snapshot:
with sample_database.snapshot() as snapshot:
results = snapshot.execute_sql("SELECT CURRENT_TIMESTAMP()")
version_time = list(results)[0][0]

backup_sample.create_backup(INSTANCE_ID, DATABASE_ID, BACKUP_ID, version_time)
backup_sample.create_backup(
instance_id,
sample_database.database_id,
BACKUP_ID,
version_time,
)
out, _ = capsys.readouterr()
assert BACKUP_ID in out


def test_create_backup_with_encryption_key(capsys, spanner_instance, database):
kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
spanner_instance._client.project, "us-central1", "spanner-test-keyring", "spanner-test-cmek"
@pytest.mark.dependency(name="create_backup_with_encryption_key")
def test_create_backup_with_encryption_key(
capsys, instance_id, sample_database, kms_key_name,
):
backup_sample.create_backup_with_encryption_key(
instance_id,
sample_database.database_id,
CMEK_BACKUP_ID,
kms_key_name,
)
backup_sample.create_backup_with_encryption_key(INSTANCE_ID, DATABASE_ID, CMEK_BACKUP_ID, kms_key_name)
out, _ = capsys.readouterr()
assert CMEK_BACKUP_ID in out
assert kms_key_name in out


# Depends on test_create_backup having run first
@pytest.mark.dependency(depends=["create_backup"])
@RetryErrors(exception=DeadlineExceeded, max_tries=2)
def test_restore_database(capsys):
backup_sample.restore_database(INSTANCE_ID, RESTORE_DB_ID, BACKUP_ID)
def test_restore_database(capsys, instance_id, sample_database):
backup_sample.restore_database(instance_id, RESTORE_DB_ID, BACKUP_ID)
out, _ = capsys.readouterr()
assert (DATABASE_ID + " restored to ") in out
assert (sample_database.database_id + " restored to ") in out
assert (RESTORE_DB_ID + " from backup ") in out
assert BACKUP_ID in out


# Depends on test_create_backup having run first
@pytest.mark.dependency(depends=["create_backup_with_encryption_key"])
@RetryErrors(exception=DeadlineExceeded, max_tries=2)
def test_restore_database_with_encryption_key(capsys, spanner_instance):
kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
spanner_instance._client.project, "us-central1", "spanner-test-keyring", "spanner-test-cmek"
)
backup_sample.restore_database_with_encryption_key(INSTANCE_ID, CMEK_RESTORE_DB_ID, CMEK_BACKUP_ID, kms_key_name)
def test_restore_database_with_encryption_key(
capsys, instance_id, sample_database, kms_key_name,
):
backup_sample.restore_database_with_encryption_key(
instance_id, CMEK_RESTORE_DB_ID, CMEK_BACKUP_ID, kms_key_name)
out, _ = capsys.readouterr()
assert (DATABASE_ID + " restored to ") in out
assert (sample_database.database_id + " restored to ") in out
assert (CMEK_RESTORE_DB_ID + " from backup ") in out
assert CMEK_BACKUP_ID in out
assert kms_key_name in out


# Depends on test_create_backup having run first
def test_list_backup_operations(capsys, spanner_instance):
backup_sample.list_backup_operations(INSTANCE_ID, DATABASE_ID)
@pytest.mark.dependency(depends=["create_backup"])
def test_list_backup_operations(capsys, instance_id, sample_database):
backup_sample.list_backup_operations(
instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert BACKUP_ID in out
assert DATABASE_ID in out
assert sample_database.database_id in out


# Depends on test_create_backup having run first
def test_list_backups(capsys, spanner_instance):
backup_sample.list_backups(INSTANCE_ID, DATABASE_ID, BACKUP_ID)
@pytest.mark.dependency(depends=["create_backup"])
def test_list_backups(capsys, instance_id, sample_database):
backup_sample.list_backups(
instance_id, sample_database.database_id, BACKUP_ID,
)
out, _ = capsys.readouterr()
id_count = out.count(BACKUP_ID)
assert id_count == 7


# Depends on test_create_backup having run first
def test_update_backup(capsys):
backup_sample.update_backup(INSTANCE_ID, BACKUP_ID)
@pytest.mark.dependency(depends=["create_backup"])
def test_update_backup(capsys, instance_id):
backup_sample.update_backup(instance_id, BACKUP_ID)
out, _ = capsys.readouterr()
assert BACKUP_ID in out


# Depends on test_create_backup having run first
def test_delete_backup(capsys, spanner_instance):
backup_sample.delete_backup(INSTANCE_ID, BACKUP_ID)
@pytest.mark.dependency(depends=["create_backup"])
def test_delete_backup(capsys, instance_id):
backup_sample.delete_backup(instance_id, BACKUP_ID)
out, _ = capsys.readouterr()
assert BACKUP_ID in out


# Depends on test_create_backup having run first
def test_cancel_backup(capsys):
backup_sample.cancel_backup(INSTANCE_ID, DATABASE_ID, BACKUP_ID)
@pytest.mark.dependency(depends=["create_backup"])
def test_cancel_backup(capsys, instance_id, sample_database):
backup_sample.cancel_backup(
instance_id, sample_database.database_id, BACKUP_ID,
)
out, _ = capsys.readouterr()
cancel_success = "Backup creation was successfully cancelled." in out
cancel_failure = ("Backup was created before the cancel completed." in out) and (
Expand All @@ -172,10 +146,12 @@ def test_cancel_backup(capsys):


@RetryErrors(exception=DeadlineExceeded, max_tries=2)
def test_create_database_with_retention_period(capsys, spanner_instance):
backup_sample.create_database_with_version_retention_period(INSTANCE_ID, RETENTION_DATABASE_ID, RETENTION_PERIOD)
def test_create_database_with_retention_period(capsys, sample_instance):
backup_sample.create_database_with_version_retention_period(
sample_instance.instance_id, RETENTION_DATABASE_ID, RETENTION_PERIOD,
)
out, _ = capsys.readouterr()
assert (RETENTION_DATABASE_ID + " created with ") in out
assert ("retention period " + RETENTION_PERIOD) in out
database = spanner_instance.database(RETENTION_DATABASE_ID)
database = sample_instance.database(RETENTION_DATABASE_ID)
database.drop()

0 comments on commit fc1bc56

Please sign in to comment.