diff --git a/samples/beam/hello_world_write.py b/samples/beam/hello_world_write.py new file mode 100644 index 000000000..894edc46f --- /dev/null +++ b/samples/beam/hello_world_write.py @@ -0,0 +1,64 @@ +# Copyright 2020 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +import apache_beam as beam +from apache_beam.io.gcp.bigtableio import WriteToBigTable +from apache_beam.options.pipeline_options import PipelineOptions +from google.cloud.bigtable import row + + +class BigtableOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--bigtable-project', + help='The Bigtable project ID, this can be different than your ' + 'Dataflow project', + default='bigtable-project') + parser.add_argument( + '--bigtable-instance', + help='The Bigtable instance ID', + default='bigtable-instance') + parser.add_argument( + '--bigtable-table', + help='The Bigtable table ID in the instance.', + default='bigtable-table') + + +class CreateRowFn(beam.DoFn): + def process(self, key): + direct_row = row.DirectRow(row_key=key) + direct_row.set_cell( + "stats_summary", + b"os_build", + b"android", + datetime.datetime.now()) + return [direct_row] + + +def run(argv=None): + """Build and run the pipeline.""" + options = BigtableOptions(argv) + with beam.Pipeline(options=options) as p: + p | beam.Create(["phone#4c410523#20190501", + "phone#4c410523#20190502"]) | beam.ParDo( + CreateRowFn()) | WriteToBigTable( + project_id=options.bigtable_project, + instance_id=options.bigtable_instance, + table_id=options.bigtable_table) + + +if __name__ == '__main__': + run() diff --git a/samples/beam/hello_world_write_test.py b/samples/beam/hello_world_write_test.py new file mode 100644 index 000000000..cdbecc661 --- /dev/null +++ b/samples/beam/hello_world_write_test.py @@ -0,0 +1,57 @@ +# Copyright 2020 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import uuid + +from google.cloud import bigtable +import pytest + +import hello_world_write + +PROJECT = os.environ['GOOGLE_CLOUD_PROJECT'] +BIGTABLE_INSTANCE = os.environ['BIGTABLE_INSTANCE'] +TABLE_ID_PREFIX = 'mobile-time-series-{}' + + +@pytest.fixture(scope="module", autouse=True) +def table_id(): + client = bigtable.Client(project=PROJECT, admin=True) + instance = client.instance(BIGTABLE_INSTANCE) + + table_id = TABLE_ID_PREFIX.format(str(uuid.uuid4())[:16]) + table = instance.table(table_id) + if table.exists(): + table.delete() + + table.create(column_families={'stats_summary': None}) + yield table_id + + table.delete() + + +def test_hello_world_write(table_id): + hello_world_write.run([ + '--bigtable-project=%s' % PROJECT, + '--bigtable-instance=%s' % BIGTABLE_INSTANCE, + '--bigtable-table=%s' % table_id]) + + client = bigtable.Client(project=PROJECT, admin=True) + instance = client.instance(BIGTABLE_INSTANCE) + table = instance.table(table_id) + + rows = table.read_rows() + count = 0 + for _ in rows: + count += 1 + assert count == 2 diff --git a/samples/beam/noxfile.py b/samples/beam/noxfile.py new file mode 100644 index 000000000..ba55d7ce5 --- /dev/null +++ b/samples/beam/noxfile.py @@ -0,0 +1,224 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +from pathlib import Path +import sys + +import nox + + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +# Copy `noxfile_config.py` to your directory and modify it instead. + + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + 'ignored_versions': ["2.7"], + + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + 'envs': {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append('.') + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars(): + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG['gcloud_project_env'] + # This should error out if not set. + ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG['envs']) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to tested samples. +ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False)) +# +# Style Checks +# + + +def _determine_local_import_names(start_dir): + """Determines all import names that should be considered "local". + + This is used when running the linter to insure that import order is + properly checked. + """ + file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] + return [ + basename + for basename, extension in file_ext_pairs + if extension == ".py" + or os.path.isdir(os.path.join(start_dir, basename)) + and basename not in ("__pycache__") + ] + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--import-order-style=google", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session): + session.install("flake8", "flake8-import-order") + + local_names = _determine_local_import_names(".") + args = FLAKE8_COMMON_ARGS + [ + "--application-import-names", + ",".join(local_names), + "." + ] + session.run("flake8", *args) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests(session, post_install=None): + """Runs py.test for a particular project.""" + if os.path.exists("requirements.txt"): + session.install("-r", "requirements.txt") + + if os.path.exists("requirements-test.txt"): + session.install("-r", "requirements-test.txt") + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars() + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session): + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip("SKIPPED: {} tests are disabled for this sample.".format( + session.python + )) + + +# +# Readmegen +# + + +def _get_repo_root(): + """ Returns the root folder of the project. """ + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session, path): + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/beam/requirements-test.txt b/samples/beam/requirements-test.txt new file mode 100644 index 000000000..7e460c8c8 --- /dev/null +++ b/samples/beam/requirements-test.txt @@ -0,0 +1 @@ +pytest==6.0.1 diff --git a/samples/beam/requirements.txt b/samples/beam/requirements.txt new file mode 100644 index 000000000..2d7898e42 --- /dev/null +++ b/samples/beam/requirements.txt @@ -0,0 +1,3 @@ +apache-beam==2.23.0 +google-cloud-bigtable==1.4.0 +google-cloud-core==1.3.0 \ No newline at end of file