Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add sample for writing data with Beam #80

Merged
merged 9 commits into from Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 64 additions & 0 deletions samples/beam/hello_world_write.py
@@ -0,0 +1,64 @@
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

import apache_beam as beam
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.bigtable import row


class BigtableOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bigtable-project',
help='The Bigtable project ID, this can be different than your '
'Dataflow project',
default='bigtable-project')
parser.add_argument(
'--bigtable-instance',
help='The Bigtable instance ID',
default='bigtable-instance')
parser.add_argument(
'--bigtable-table',
help='The Bigtable table ID in the instance.',
default='bigtable-table')


class CreateRowFn(beam.DoFn):
def process(self, key):
direct_row = row.DirectRow(row_key=key)
direct_row.set_cell(
"stats_summary",
b"os_build",
b"android",
datetime.datetime.now())
return [direct_row]


def run(argv=None):
"""Build and run the pipeline."""
options = BigtableOptions(argv)
with beam.Pipeline(options=options) as p:
p | beam.Create(["phone#4c410523#20190501",
"phone#4c410523#20190502"]) | beam.ParDo(
CreateRowFn()) | WriteToBigTable(
project_id=options.bigtable_project,
instance_id=options.bigtable_instance,
table_id=options.bigtable_table)


if __name__ == '__main__':
run()
57 changes: 57 additions & 0 deletions samples/beam/hello_world_write_test.py
@@ -0,0 +1,57 @@
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid

from google.cloud import bigtable
import pytest

import hello_world_write

PROJECT = os.environ['GOOGLE_CLOUD_PROJECT']
BIGTABLE_INSTANCE = os.environ['BIGTABLE_INSTANCE']
busunkim96 marked this conversation as resolved.
Show resolved Hide resolved
TABLE_ID_PREFIX = 'mobile-time-series-{}'


@pytest.fixture(scope="module", autouse=True)
def table_id():
client = bigtable.Client(project=PROJECT, admin=True)
instance = client.instance(BIGTABLE_INSTANCE)

table_id = TABLE_ID_PREFIX.format(str(uuid.uuid4())[:16])
table = instance.table(table_id)
if table.exists():
table.delete()

table.create(column_families={'stats_summary': None})
yield table_id

table.delete()


def test_hello_world_write(table_id):
hello_world_write.run([
'--bigtable-project=%s' % PROJECT,
'--bigtable-instance=%s' % BIGTABLE_INSTANCE,
'--bigtable-table=%s' % table_id])
billyjacobson marked this conversation as resolved.
Show resolved Hide resolved

client = bigtable.Client(project=PROJECT, admin=True)
instance = client.instance(BIGTABLE_INSTANCE)
table = instance.table(table_id)

rows = table.read_rows()
count = 0
for _ in rows:
count += 1
assert count == 2
224 changes: 224 additions & 0 deletions samples/beam/noxfile.py
@@ -0,0 +1,224 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import os
from pathlib import Path
import sys

import nox


# WARNING - WARNING - WARNING - WARNING - WARNING
# WARNING - WARNING - WARNING - WARNING - WARNING
# DO NOT EDIT THIS FILE EVER!
# WARNING - WARNING - WARNING - WARNING - WARNING
# WARNING - WARNING - WARNING - WARNING - WARNING

# Copy `noxfile_config.py` to your directory and modify it instead.


# `TEST_CONFIG` dict is a configuration hook that allows users to
# modify the test configurations. The values here should be in sync
# with `noxfile_config.py`. Users will copy `noxfile_config.py` into
# their directory and modify it.

TEST_CONFIG = {
# You can opt out from the test for specific Python versions.
'ignored_versions': ["2.7"],

# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT',
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',

# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
'envs': {},
}


try:
# Ensure we can import noxfile_config in the project's directory.
sys.path.append('.')
from noxfile_config import TEST_CONFIG_OVERRIDE
except ImportError as e:
print("No user noxfile_config found: detail: {}".format(e))
TEST_CONFIG_OVERRIDE = {}

# Update the TEST_CONFIG with the user supplied values.
TEST_CONFIG.update(TEST_CONFIG_OVERRIDE)


def get_pytest_env_vars():
"""Returns a dict for pytest invocation."""
ret = {}

# Override the GCLOUD_PROJECT and the alias.
env_key = TEST_CONFIG['gcloud_project_env']
# This should error out if not set.
ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key]

# Apply user supplied envs.
ret.update(TEST_CONFIG['envs'])
return ret


# DO NOT EDIT - automatically generated.
# All versions used to tested samples.
ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"]

# Any default versions that should be ignored.
IGNORED_VERSIONS = TEST_CONFIG['ignored_versions']

TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS])

INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False))
#
# Style Checks
#


def _determine_local_import_names(start_dir):
"""Determines all import names that should be considered "local".

This is used when running the linter to insure that import order is
properly checked.
"""
file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)]
return [
basename
for basename, extension in file_ext_pairs
if extension == ".py"
or os.path.isdir(os.path.join(start_dir, basename))
and basename not in ("__pycache__")
]


# Linting with flake8.
#
# We ignore the following rules:
# E203: whitespace before ‘:’
# E266: too many leading ‘#’ for block comment
# E501: line too long
# I202: Additional newline in a section of imports
#
# We also need to specify the rules which are ignored by default:
# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121']
FLAKE8_COMMON_ARGS = [
"--show-source",
"--builtin=gettext",
"--max-complexity=20",
"--import-order-style=google",
"--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py",
"--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202",
"--max-line-length=88",
]


@nox.session
def lint(session):
session.install("flake8", "flake8-import-order")

local_names = _determine_local_import_names(".")
args = FLAKE8_COMMON_ARGS + [
"--application-import-names",
",".join(local_names),
"."
]
session.run("flake8", *args)


#
# Sample Tests
#


PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"]


def _session_tests(session, post_install=None):
"""Runs py.test for a particular project."""
if os.path.exists("requirements.txt"):
session.install("-r", "requirements.txt")

if os.path.exists("requirements-test.txt"):
session.install("-r", "requirements-test.txt")

if INSTALL_LIBRARY_FROM_SOURCE:
session.install("-e", _get_repo_root())

if post_install:
post_install(session)

session.run(
"pytest",
*(PYTEST_COMMON_ARGS + session.posargs),
# Pytest will return 5 when no tests are collected. This can happen
# on travis where slow and flaky tests are excluded.
# See http://doc.pytest.org/en/latest/_modules/_pytest/main.html
success_codes=[0, 5],
env=get_pytest_env_vars()
)


@nox.session(python=ALL_VERSIONS)
def py(session):
"""Runs py.test for a sample using the specified version of Python."""
if session.python in TESTED_VERSIONS:
_session_tests(session)
else:
session.skip("SKIPPED: {} tests are disabled for this sample.".format(
session.python
))


#
# Readmegen
#


def _get_repo_root():
""" Returns the root folder of the project. """
# Get root of this repository. Assume we don't have directories nested deeper than 10 items.
p = Path(os.getcwd())
for i in range(10):
if p is None:
break
if Path(p / ".git").exists():
return str(p)
p = p.parent
raise Exception("Unable to detect repository root.")


GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")])


@nox.session
@nox.parametrize("path", GENERATED_READMES)
def readmegen(session, path):
"""(Re-)generates the readme for a sample."""
session.install("jinja2", "pyyaml")
dir_ = os.path.dirname(path)

if os.path.exists(os.path.join(dir_, "requirements.txt")):
session.install("-r", os.path.join(dir_, "requirements.txt"))

in_file = os.path.join(dir_, "README.rst.in")
session.run(
"python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file
)
1 change: 1 addition & 0 deletions samples/beam/requirements-test.txt
@@ -0,0 +1 @@
pytest==6.0.1
3 changes: 3 additions & 0 deletions samples/beam/requirements.txt
@@ -0,0 +1,3 @@
apache-beam==2.23.0
google-cloud-bigtable==1.4.0
google-cloud-core==1.3.0
billyjacobson marked this conversation as resolved.
Show resolved Hide resolved