Skip to content

Commit

Permalink
refactor(bigquery): update code samples of load table autodetect and …
Browse files Browse the repository at this point in the history
…truncate (#28)

Co-authored-by: Peter Lamut <plamut@users.noreply.github.com>
  • Loading branch information
HemangChothani and plamut committed May 13, 2020
1 parent da40b62 commit 18eb9e8
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 168 deletions.
168 changes: 0 additions & 168 deletions docs/snippets.py
Expand Up @@ -25,7 +25,6 @@
import time

import pytest
import six

try:
import fastparquet
Expand Down Expand Up @@ -585,173 +584,6 @@ def test_manage_views(client, to_delete):
# [END bigquery_grant_view_access]


def test_load_table_from_uri_autodetect(client, to_delete, capsys):
"""Load table from a GCS URI using various formats and auto-detected schema
Each file format has its own tested load from URI sample. Because most of
the code is common for autodetect, append, and truncate, this sample
includes snippets for all supported formats but only calls a single load
job.
This code snippet is made up of shared code, then format-specific code,
followed by more shared code. Note that only the last format in the
format-specific code section will be tested in this test.
"""
dataset_id = "load_table_from_uri_auto_{}".format(_millis())
project = client.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset)
to_delete.append(dataset)

# Shared code
# [START bigquery_load_table_gcs_csv_autodetect]
# [START bigquery_load_table_gcs_json_autodetect]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = bigquery.DatasetReference(project, dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
# [END bigquery_load_table_gcs_csv_autodetect]
# [END bigquery_load_table_gcs_json_autodetect]

# Format-specific code
# [START bigquery_load_table_gcs_csv_autodetect]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
# [END bigquery_load_table_gcs_csv_autodetect]
# unset csv-specific attribute
del job_config._properties["load"]["skipLeadingRows"]

# [START bigquery_load_table_gcs_json_autodetect]
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"
# [END bigquery_load_table_gcs_json_autodetect]

# Shared code
# [START bigquery_load_table_gcs_csv_autodetect]
# [START bigquery_load_table_gcs_json_autodetect]
load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_csv_autodetect]
# [END bigquery_load_table_gcs_json_autodetect]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_truncate(client, to_delete, capsys):
"""Replaces table data with data from a GCS URI using various formats
Each file format has its own tested load from URI sample. Because most of
the code is common for autodetect, append, and truncate, this sample
includes snippets for all supported formats but only calls a single load
job.
This code snippet is made up of shared code, then format-specific code,
followed by more shared code. Note that only the last format in the
format-specific code section will be tested in this test.
"""
dataset_id = "load_table_from_uri_trunc_{}".format(_millis())
project = client.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset)
to_delete.append(dataset)

job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
]
table_ref = dataset.table("us_states")
body = six.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_ref, job_config=job_config).result()
previous_rows = client.get_table(table_ref).num_rows
assert previous_rows > 0

# Shared code
# [START bigquery_load_table_gcs_avro_truncate]
# [START bigquery_load_table_gcs_csv_truncate]
# [START bigquery_load_table_gcs_json_truncate]
# [START bigquery_load_table_gcs_parquet_truncate]
# [START bigquery_load_table_gcs_orc_truncate]
# from google.cloud import bigquery
# client = bigquery.Client()
# table_ref = client.dataset('my_dataset').table('existing_table')

job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
# [END bigquery_load_table_gcs_avro_truncate]
# [END bigquery_load_table_gcs_csv_truncate]
# [END bigquery_load_table_gcs_json_truncate]
# [END bigquery_load_table_gcs_parquet_truncate]
# [END bigquery_load_table_gcs_orc_truncate]

# Format-specific code
# [START bigquery_load_table_gcs_avro_truncate]
job_config.source_format = bigquery.SourceFormat.AVRO
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"
# [END bigquery_load_table_gcs_avro_truncate]

# [START bigquery_load_table_gcs_csv_truncate]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
# [END bigquery_load_table_gcs_csv_truncate]
# unset csv-specific attribute
del job_config._properties["load"]["skipLeadingRows"]

# [START bigquery_load_table_gcs_json_truncate]
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"
# [END bigquery_load_table_gcs_json_truncate]

# [START bigquery_load_table_gcs_parquet_truncate]
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
# [END bigquery_load_table_gcs_parquet_truncate]

# [START bigquery_load_table_gcs_orc_truncate]
job_config.source_format = bigquery.SourceFormat.ORC
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.orc"
# [END bigquery_load_table_gcs_orc_truncate]

# Shared code
# [START bigquery_load_table_gcs_avro_truncate]
# [START bigquery_load_table_gcs_csv_truncate]
# [START bigquery_load_table_gcs_json_truncate]
# [START bigquery_load_table_gcs_parquet_truncate]
# [START bigquery_load_table_gcs_orc_truncate]
load_job = client.load_table_from_uri(
uri, table_ref, job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(table_ref)
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_avro_truncate]
# [END bigquery_load_table_gcs_csv_truncate]
# [END bigquery_load_table_gcs_json_truncate]
# [END bigquery_load_table_gcs_parquet_truncate]
# [END bigquery_load_table_gcs_orc_truncate]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_add_column(client, to_delete):
dataset_id = "load_table_add_column_{}".format(_millis())
project = client.project
Expand Down
59 changes: 59 additions & 0 deletions docs/usage/tables.rst
Expand Up @@ -132,6 +132,22 @@ Load an ORC file from Cloud Storage:
See also: `Loading ORC data from Cloud Storage
<https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-orc>`_.

Load a CSV file from Cloud Storage and auto-detect schema:

.. literalinclude:: ../samples/load_table_uri_autodetect_csv.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_csv_autodetect]
:end-before: [END bigquery_load_table_gcs_csv_autodetect]

Load a JSON file from Cloud Storage and auto-detect schema:

.. literalinclude:: ../samples/load_table_uri_autodetect_json.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_json_autodetect]
:end-before: [END bigquery_load_table_gcs_json_autodetect]

Updating a Table
^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -220,3 +236,46 @@ Restore a deleted table from a snapshot by using the
:dedent: 4
:start-after: [START bigquery_undelete_table]
:end-before: [END bigquery_undelete_table]

Overwrite a Table
^^^^^^^^^^^^^^^^^

Replace the table data with an Avro file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_truncate_avro.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_avro_truncate]
:end-before: [END bigquery_load_table_gcs_avro_truncate]

Replace the table data with a CSV file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_truncate_csv.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_csv_truncate]
:end-before: [END bigquery_load_table_gcs_csv_truncate]

Replace the table data with a JSON file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_truncate_json.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_json_truncate]
:end-before: [END bigquery_load_table_gcs_json_truncate]

Replace the table data with an ORC file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_truncate_orc.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_orc_truncate]
:end-before: [END bigquery_load_table_gcs_orc_truncate]

Replace the table data with a Parquet file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_truncate_parquet.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_parquet_truncate]
:end-before: [END bigquery_load_table_gcs_parquet_truncate]
45 changes: 45 additions & 0 deletions samples/load_table_uri_autodetect_csv.py
@@ -0,0 +1,45 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def load_table_uri_autodetect_csv(table_id):

# [START bigquery_load_table_gcs_csv_autodetect]
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
# kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
# "cloud-samples-tests", "us", "test", "test"
# )
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_csv_autodetect]
42 changes: 42 additions & 0 deletions samples/load_table_uri_autodetect_json.py
@@ -0,0 +1,42 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def load_table_uri_autodetect_json(table_id):

# [START bigquery_load_table_gcs_json_autodetect]
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
# kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
# "cloud-samples-tests", "us", "test", "test"
# )
job_config = bigquery.LoadJobConfig(
autodetect=True, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_json_autodetect]
55 changes: 55 additions & 0 deletions samples/load_table_uri_truncate_avro.py
@@ -0,0 +1,55 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def load_table_uri_truncate_avro(table_id):

# [START bigquery_load_table_gcs_avro_truncate]
import six

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
)

body = six.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.AVRO,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.

load_job.result() # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_avro_truncate]

0 comments on commit 18eb9e8

Please sign in to comment.