Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

docs: add sample to include run notification #173

Merged
merged 3 commits into from Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions owlbot.py
Expand Up @@ -14,12 +14,16 @@

"""This script is used to synthesize generated parts of this library."""

import pathlib

import synthtool as s

from synthtool import gcp
from synthtool.languages import python


REPO_ROOT = pathlib.Path(__file__).parent.absolute()

common = gcp.CommonTemplates()

# ----------------------------------------------------------------------------
Expand Down Expand Up @@ -58,3 +62,5 @@
python.py_samples(skip_readmes=True)

s.shell.run(["nox", "-s", "blacken"], hide_output=False)
for noxfile in REPO_ROOT.glob("samples/**/noxfile.py"):
s.shell.run(["nox", "-s", "blacken"], cwd=noxfile.parent, hide_output=False)
55 changes: 51 additions & 4 deletions samples/snippets/conftest.py
Expand Up @@ -14,16 +14,48 @@

import datetime
import os
import random
import uuid

from google.api_core import client_options
import google.api_core.exceptions
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
from google.cloud import pubsub_v1
import pytest


RESOURCE_PREFIX = "python_bigquery_datatransfer_samples_snippets"
RESOURCE_DATE_FORMAT = "%Y%m%d%H%M%S"
RESOURCE_DATE_LENGTH = 4 + 2 + 2 + 2 + 2 + 2


def resource_prefix() -> str:
timestamp = datetime.datetime.utcnow().strftime(RESOURCE_DATE_FORMAT)
random_string = hex(random.randrange(1000000))[2:]
return f"{RESOURCE_PREFIX}_{timestamp}_{random_string}"


def resource_name_to_date(resource_name: str):
start_date = len(RESOURCE_PREFIX) + 1
date_string = resource_name[start_date : start_date + RESOURCE_DATE_LENGTH]
parsed_date = datetime.datetime.strptime(date_string, RESOURCE_DATE_FORMAT)
return parsed_date


@pytest.fixture(scope="session", autouse=True)
def cleanup_pubsub_topics(pubsub_client: pubsub_v1.PublisherClient, project_id):
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
for topic in pubsub_client.list_topics(project=f"projects/{project_id}"):
topic_id = topic.name.split("/")[-1]
if (
topic_id.startswith(RESOURCE_PREFIX)
and resource_name_to_date(topic_id) < yesterday
):
pubsub_client.delete_topic(topic=topic.name)


def temp_suffix():
now = datetime.datetime.now()
return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
Expand All @@ -35,6 +67,21 @@ def bigquery_client(default_credentials):
return bigquery.Client(credentials=credentials, project=project_id)


@pytest.fixture(scope="session")
def pubsub_client(default_credentials):
credentials, _ = default_credentials
return pubsub_v1.PublisherClient(credentials=credentials)


@pytest.fixture(scope="session")
def pubsub_topic(pubsub_client: pubsub_v1.PublisherClient, project_id):
topic_id = resource_prefix()
topic_path = pubsub_v1.PublisherClient.topic_path(project_id, topic_id)
pubsub_client.create_topic(name=topic_path)
yield topic_path
pubsub_client.delete_topic(topic=topic_path)


@pytest.fixture(scope="session")
def dataset_id(bigquery_client, project_id):
dataset_id = f"bqdts_{temp_suffix()}"
Expand All @@ -56,10 +103,10 @@ def project_id():
@pytest.fixture(scope="session")
def service_account_name(default_credentials):
credentials, _ = default_credentials
# Note: this property is not available when running with user account
# credentials, but only service account credentials are used in our test
# infrastructure.
return credentials.service_account_email
# The service_account_email attribute is not available when running with
# user account credentials, but should be available when running from our
# continuous integration tests.
return getattr(credentials, "service_account_email", None)


@pytest.fixture(scope="session")
Expand Down
4 changes: 1 addition & 3 deletions samples/snippets/manage_transfer_configs.py
Expand Up @@ -135,9 +135,7 @@ def schedule_backfill(override_values={}):
)

response = transfer_client.schedule_transfer_runs(
parent=transfer_config_name,
start_time=start_time,
end_time=end_time,
parent=transfer_config_name, start_time=start_time, end_time=end_time,
)

print("Started transfer runs:")
Expand Down
4 changes: 1 addition & 3 deletions samples/snippets/manage_transfer_configs_test.py
Expand Up @@ -52,9 +52,7 @@ def test_update_credentials_with_service_account(

def test_schedule_backfill(capsys, transfer_config_name):
runs = manage_transfer_configs.schedule_backfill(
{
"transfer_config_name": transfer_config_name,
}
{"transfer_config_name": transfer_config_name}
)
out, _ = capsys.readouterr()
assert "Started transfer runs:" in out
Expand Down
39 changes: 21 additions & 18 deletions samples/snippets/noxfile.py
Expand Up @@ -38,31 +38,29 @@

TEST_CONFIG = {
# You can opt out from the test for specific Python versions.
'ignored_versions': ["2.7"],

"ignored_versions": ["2.7"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
'enforce_type_hints': False,

"enforce_type_hints": False,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT',
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
'envs': {},
"envs": {},
}


try:
# Ensure we can import noxfile_config in the project's directory.
sys.path.append('.')
sys.path.append(".")
from noxfile_config import TEST_CONFIG_OVERRIDE
except ImportError as e:
print("No user noxfile_config found: detail: {}".format(e))
Expand All @@ -77,12 +75,12 @@ def get_pytest_env_vars() -> Dict[str, str]:
ret = {}

# Override the GCLOUD_PROJECT and the alias.
env_key = TEST_CONFIG['gcloud_project_env']
env_key = TEST_CONFIG["gcloud_project_env"]
# This should error out if not set.
ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key]
ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key]

# Apply user supplied envs.
ret.update(TEST_CONFIG['envs'])
ret.update(TEST_CONFIG["envs"])
return ret


Expand All @@ -91,7 +89,7 @@ def get_pytest_env_vars() -> Dict[str, str]:
ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8", "3.9"]

# Any default versions that should be ignored.
IGNORED_VERSIONS = TEST_CONFIG['ignored_versions']
IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"]

TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS])

Expand Down Expand Up @@ -140,7 +138,7 @@ def _determine_local_import_names(start_dir: str) -> List[str]:

@nox.session
def lint(session: nox.sessions.Session) -> None:
if not TEST_CONFIG['enforce_type_hints']:
if not TEST_CONFIG["enforce_type_hints"]:
session.install("flake8", "flake8-import-order")
else:
session.install("flake8", "flake8-import-order", "flake8-annotations")
Expand All @@ -149,9 +147,11 @@ def lint(session: nox.sessions.Session) -> None:
args = FLAKE8_COMMON_ARGS + [
"--application-import-names",
",".join(local_names),
"."
".",
]
session.run("flake8", *args)


#
# Black
#
Expand All @@ -164,6 +164,7 @@ def blacken(session: nox.sessions.Session) -> None:

session.run("black", *python_files)


#
# Sample Tests
#
Expand All @@ -172,7 +173,9 @@ def blacken(session: nox.sessions.Session) -> None:
PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"]


def _session_tests(session: nox.sessions.Session, post_install: Callable = None) -> None:
def _session_tests(
session: nox.sessions.Session, post_install: Callable = None
) -> None:
if TEST_CONFIG["pip_version_override"]:
pip_version = TEST_CONFIG["pip_version_override"]
session.install(f"pip=={pip_version}")
Expand Down Expand Up @@ -202,7 +205,7 @@ def _session_tests(session: nox.sessions.Session, post_install: Callable = None)
# on travis where slow and flaky tests are excluded.
# See http://doc.pytest.org/en/latest/_modules/_pytest/main.html
success_codes=[0, 5],
env=get_pytest_env_vars()
env=get_pytest_env_vars(),
)


Expand All @@ -212,9 +215,9 @@ def py(session: nox.sessions.Session) -> None:
if session.python in TESTED_VERSIONS:
_session_tests(session)
else:
session.skip("SKIPPED: {} tests are disabled for this sample.".format(
session.python
))
session.skip(
"SKIPPED: {} tests are disabled for this sample.".format(session.python)
)


#
Expand Down
1 change: 1 addition & 0 deletions samples/snippets/requirements-test.txt
@@ -1,3 +1,4 @@
google-cloud-bigquery==2.20.0
google-cloud-pubsub==2.6.0
pytest==6.2.4
mock==4.0.3
44 changes: 44 additions & 0 deletions samples/snippets/run_notification.py
@@ -0,0 +1,44 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def run_notification(transfer_config_name, pubsub_topic):
orig_transfer_config_name = transfer_config_name
orig_pubsub_topic = pubsub_topic
# [START bigquerydatatransfer_run_notification]
transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
pubsub_topic = "projects/PROJECT-ID/topics/TOPIC-ID"
# [END bigquerydatatransfer_run_notification]
transfer_config_name = orig_transfer_config_name
pubsub_topic = orig_pubsub_topic

# [START bigquerydatatransfer_run_notification]
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.notification_pubsub_topic = pubsub_topic
update_mask = field_mask_pb2.FieldMask(paths=["notification_pubsub_topic"])

transfer_config = transfer_client.update_transfer_config(
{"transfer_config": transfer_config, "update_mask": update_mask}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"Notification Pub/Sub topic: '{transfer_config.notification_pubsub_topic}'")
# [END bigquerydatatransfer_run_notification]
# Return the config name for testing purposes, so that it can be deleted.
return transfer_config
26 changes: 26 additions & 0 deletions samples/snippets/run_notification_test.py
@@ -0,0 +1,26 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import run_notification


def test_run_notification(capsys, transfer_config_name, pubsub_topic):
run_notification.run_notification(
transfer_config_name=transfer_config_name, pubsub_topic=pubsub_topic,
)
out, _ = capsys.readouterr()
assert "Updated config:" in out
assert transfer_config_name in out
assert "Notification Pub/Sub topic:" in out
assert pubsub_topic in out