Skip to content

Commit

Permalink
docs: add sample for writing data with Beam (#80)
Browse files Browse the repository at this point in the history
Co-authored-by: Tres Seaver <tseaver@palladion.com>
  • Loading branch information
billyjacobson and tseaver committed Aug 6, 2020
1 parent 2c973e6 commit 6900290
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 0 deletions.
64 changes: 64 additions & 0 deletions samples/beam/hello_world_write.py
@@ -0,0 +1,64 @@
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

import apache_beam as beam
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.bigtable import row


class BigtableOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bigtable-project',
help='The Bigtable project ID, this can be different than your '
'Dataflow project',
default='bigtable-project')
parser.add_argument(
'--bigtable-instance',
help='The Bigtable instance ID',
default='bigtable-instance')
parser.add_argument(
'--bigtable-table',
help='The Bigtable table ID in the instance.',
default='bigtable-table')


class CreateRowFn(beam.DoFn):
def process(self, key):
direct_row = row.DirectRow(row_key=key)
direct_row.set_cell(
"stats_summary",
b"os_build",
b"android",
datetime.datetime.now())
return [direct_row]


def run(argv=None):
"""Build and run the pipeline."""
options = BigtableOptions(argv)
with beam.Pipeline(options=options) as p:
p | beam.Create(["phone#4c410523#20190501",
"phone#4c410523#20190502"]) | beam.ParDo(
CreateRowFn()) | WriteToBigTable(
project_id=options.bigtable_project,
instance_id=options.bigtable_instance,
table_id=options.bigtable_table)


if __name__ == '__main__':
run()
57 changes: 57 additions & 0 deletions samples/beam/hello_world_write_test.py
@@ -0,0 +1,57 @@
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid

from google.cloud import bigtable
import pytest

import hello_world_write

PROJECT = os.environ['GOOGLE_CLOUD_PROJECT']
BIGTABLE_INSTANCE = os.environ['BIGTABLE_INSTANCE']
TABLE_ID_PREFIX = 'mobile-time-series-{}'


@pytest.fixture(scope="module", autouse=True)
def table_id():
client = bigtable.Client(project=PROJECT, admin=True)
instance = client.instance(BIGTABLE_INSTANCE)

table_id = TABLE_ID_PREFIX.format(str(uuid.uuid4())[:16])
table = instance.table(table_id)
if table.exists():
table.delete()

table.create(column_families={'stats_summary': None})
yield table_id

table.delete()


def test_hello_world_write(table_id):
hello_world_write.run([
'--bigtable-project=%s' % PROJECT,
'--bigtable-instance=%s' % BIGTABLE_INSTANCE,
'--bigtable-table=%s' % table_id])

client = bigtable.Client(project=PROJECT, admin=True)
instance = client.instance(BIGTABLE_INSTANCE)
table = instance.table(table_id)

rows = table.read_rows()
count = 0
for _ in rows:
count += 1
assert count == 2
224 changes: 224 additions & 0 deletions samples/beam/noxfile.py
@@ -0,0 +1,224 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import os
from pathlib import Path
import sys

import nox


# WARNING - WARNING - WARNING - WARNING - WARNING
# WARNING - WARNING - WARNING - WARNING - WARNING
# DO NOT EDIT THIS FILE EVER!
# WARNING - WARNING - WARNING - WARNING - WARNING
# WARNING - WARNING - WARNING - WARNING - WARNING

# Copy `noxfile_config.py` to your directory and modify it instead.


# `TEST_CONFIG` dict is a configuration hook that allows users to
# modify the test configurations. The values here should be in sync
# with `noxfile_config.py`. Users will copy `noxfile_config.py` into
# their directory and modify it.

TEST_CONFIG = {
# You can opt out from the test for specific Python versions.
'ignored_versions': ["2.7"],

# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT',
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',

# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
'envs': {},
}


try:
# Ensure we can import noxfile_config in the project's directory.
sys.path.append('.')
from noxfile_config import TEST_CONFIG_OVERRIDE
except ImportError as e:
print("No user noxfile_config found: detail: {}".format(e))
TEST_CONFIG_OVERRIDE = {}

# Update the TEST_CONFIG with the user supplied values.
TEST_CONFIG.update(TEST_CONFIG_OVERRIDE)


def get_pytest_env_vars():
"""Returns a dict for pytest invocation."""
ret = {}

# Override the GCLOUD_PROJECT and the alias.
env_key = TEST_CONFIG['gcloud_project_env']
# This should error out if not set.
ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key]

# Apply user supplied envs.
ret.update(TEST_CONFIG['envs'])
return ret


# DO NOT EDIT - automatically generated.
# All versions used to tested samples.
ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"]

# Any default versions that should be ignored.
IGNORED_VERSIONS = TEST_CONFIG['ignored_versions']

TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS])

INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False))
#
# Style Checks
#


def _determine_local_import_names(start_dir):
"""Determines all import names that should be considered "local".
This is used when running the linter to insure that import order is
properly checked.
"""
file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)]
return [
basename
for basename, extension in file_ext_pairs
if extension == ".py"
or os.path.isdir(os.path.join(start_dir, basename))
and basename not in ("__pycache__")
]


# Linting with flake8.
#
# We ignore the following rules:
# E203: whitespace before ‘:’
# E266: too many leading ‘#’ for block comment
# E501: line too long
# I202: Additional newline in a section of imports
#
# We also need to specify the rules which are ignored by default:
# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121']
FLAKE8_COMMON_ARGS = [
"--show-source",
"--builtin=gettext",
"--max-complexity=20",
"--import-order-style=google",
"--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py",
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202",
"--max-line-length=88",
]


@nox.session
def lint(session):
session.install("flake8", "flake8-import-order")

local_names = _determine_local_import_names(".")
args = FLAKE8_COMMON_ARGS + [
"--application-import-names",
",".join(local_names),
"."
]
session.run("flake8", *args)


#
# Sample Tests
#


PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"]


def _session_tests(session, post_install=None):
"""Runs py.test for a particular project."""
if os.path.exists("requirements.txt"):
session.install("-r", "requirements.txt")

if os.path.exists("requirements-test.txt"):
session.install("-r", "requirements-test.txt")

if INSTALL_LIBRARY_FROM_SOURCE:
session.install("-e", _get_repo_root())

if post_install:
post_install(session)

session.run(
"pytest",
*(PYTEST_COMMON_ARGS + session.posargs),
# Pytest will return 5 when no tests are collected. This can happen
# on travis where slow and flaky tests are excluded.
# See http://doc.pytest.org/en/latest/_modules/_pytest/main.html
success_codes=[0, 5],
env=get_pytest_env_vars()
)


@nox.session(python=ALL_VERSIONS)
def py(session):
"""Runs py.test for a sample using the specified version of Python."""
if session.python in TESTED_VERSIONS:
_session_tests(session)
else:
session.skip("SKIPPED: {} tests are disabled for this sample.".format(
session.python
))


#
# Readmegen
#


def _get_repo_root():
""" Returns the root folder of the project. """
# Get root of this repository. Assume we don't have directories nested deeper than 10 items.
p = Path(os.getcwd())
for i in range(10):
if p is None:
break
if Path(p / ".git").exists():
return str(p)
p = p.parent
raise Exception("Unable to detect repository root.")


GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")])


@nox.session
@nox.parametrize("path", GENERATED_READMES)
def readmegen(session, path):
"""(Re-)generates the readme for a sample."""
session.install("jinja2", "pyyaml")
dir_ = os.path.dirname(path)

if os.path.exists(os.path.join(dir_, "requirements.txt")):
session.install("-r", os.path.join(dir_, "requirements.txt"))

in_file = os.path.join(dir_, "README.rst.in")
session.run(
"python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file
)
1 change: 1 addition & 0 deletions samples/beam/requirements-test.txt
@@ -0,0 +1 @@
pytest==6.0.1
3 changes: 3 additions & 0 deletions samples/beam/requirements.txt
@@ -0,0 +1,3 @@
apache-beam==2.23.0
google-cloud-bigtable==1.4.0
google-cloud-core==1.3.0

0 comments on commit 6900290

Please sign in to comment.