Skip to content

Commit

Permalink
chore: Allow multiple system test runs (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimfulton committed May 18, 2021
1 parent 463eb12 commit 9dd3cf4
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 137 deletions.
107 changes: 30 additions & 77 deletions tests/system/conftest.py
Expand Up @@ -7,12 +7,11 @@
import datetime
import pathlib
import random
from typing import List

import pytest
import google.api_core.exceptions
from google.cloud import bigquery

from typing import List
from google.cloud import bigquery


DATA_DIR = pathlib.Path(__file__).parent / "data"
Expand Down Expand Up @@ -61,102 +60,56 @@ def bigquery_dataset(
bigquery_client: bigquery.Client, bigquery_schema: List[bigquery.SchemaField]
):
project_id = bigquery_client.project
dataset_id = "test_pybigquery"
dataset_id = f"test_pybigquery_{temp_suffix()}"
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
dataset = bigquery_client.create_dataset(dataset)
sample_table_id = f"{project_id}.{dataset_id}.sample"
try:
# Since the data changes rarely and the tests are mostly read-only,
# only create the tables if they don't already exist.
# TODO: Create shared sample data tables in bigquery-public-data that
# include test values for all data types.
bigquery_client.get_table(sample_table_id)
except google.api_core.exceptions.NotFound:
job1 = load_sample_data(sample_table_id, bigquery_client, bigquery_schema)
job1.result()
job1 = load_sample_data(sample_table_id, bigquery_client, bigquery_schema)
job1.result()
one_row_table_id = f"{project_id}.{dataset_id}.sample_one_row"
try:
bigquery_client.get_table(one_row_table_id)
except google.api_core.exceptions.NotFound:
job2 = load_sample_data(
one_row_table_id,
bigquery_client,
bigquery_schema,
filename="sample_one_row.json",
)
job2.result()
job2 = load_sample_data(
one_row_table_id,
bigquery_client,
bigquery_schema,
filename="sample_one_row.json",
)
job2.result()
view = bigquery.Table(f"{project_id}.{dataset_id}.sample_view",)
view.view_query = f"SELECT string FROM `{dataset_id}.sample`"
bigquery_client.create_table(view, exists_ok=True)
return dataset_id


@pytest.fixture(scope="session", autouse=True)
def bigquery_dml_dataset(bigquery_client: bigquery.Client):
project_id = bigquery_client.project
dataset_id = "test_pybigquery_dml"
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
# Add default table expiration in case cleanup fails.
dataset.default_table_expiration_ms = 1000 * int(
datetime.timedelta(days=1).total_seconds()
)
dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
return dataset_id
bigquery_client.create_table(view)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture(scope="session", autouse=True)
def bigquery_empty_table(
bigquery_dataset: str,
bigquery_dml_dataset: str,
bigquery_client: bigquery.Client,
bigquery_schema: List[bigquery.SchemaField],
):
project_id = bigquery_client.project
# Cleanup the sample_dml table, if it exists.
old_table_id = f"{project_id}.{bigquery_dataset}.sample_dml"
bigquery_client.delete_table(old_table_id, not_found_ok=True)
# Create new table in its own dataset.
dataset_id = bigquery_dml_dataset
table_id = f"{project_id}.{dataset_id}.sample_dml_{temp_suffix()}"
dataset_id = bigquery_dataset
table_id = f"{project_id}.{dataset_id}.sample_dml_empty"
empty_table = bigquery.Table(table_id, schema=bigquery_schema)
bigquery_client.create_table(empty_table)
yield table_id
bigquery_client.delete_table(empty_table)


@pytest.fixture(scope="session", autouse=True)
def bigquery_alt_dataset(
bigquery_client: bigquery.Client, bigquery_schema: List[bigquery.SchemaField]
):
project_id = bigquery_client.project
dataset_id = "test_pybigquery_alt"
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
sample_table_id = f"{project_id}.{dataset_id}.sample_alt"
try:
bigquery_client.get_table(sample_table_id)
except google.api_core.exceptions.NotFound:
job = load_sample_data(sample_table_id, bigquery_client, bigquery_schema)
job.result()
return dataset_id
return table_id


@pytest.fixture(scope="session", autouse=True)
def bigquery_regional_dataset(bigquery_client, bigquery_schema):
project_id = bigquery_client.project
dataset_id = "test_pybigquery_location"
dataset_id = f"test_pybigquery_location_{temp_suffix()}"
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
dataset.location = "asia-northeast1"
dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
dataset = bigquery_client.create_dataset(dataset)
sample_table_id = f"{project_id}.{dataset_id}.sample_one_row"
try:
bigquery_client.get_table(sample_table_id)
except google.api_core.exceptions.NotFound:
job = load_sample_data(
sample_table_id,
bigquery_client,
bigquery_schema,
filename="sample_one_row.json",
)
job.result()
return dataset_id
job = load_sample_data(
sample_table_id,
bigquery_client,
bigquery_schema,
filename="sample_one_row.json",
)
job.result()
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)

0 comments on commit 9dd3cf4

Please sign in to comment.