Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support partitioning, clustering, and protection properties for BQ tables #116

Merged
merged 5 commits into from Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 80 additions & 0 deletions datasets/google_trends/_terraform/top_terms_pipeline.tf
Expand Up @@ -21,7 +21,47 @@ resource "google_bigquery_table" "top_terms" {
table_id = "top_terms"

description = "Daily top 25 terms in the United States with score, ranking, time, and designated market area"
time_partitioning {
type = "DAY"

field = "refresh_date"

require_partition_filter = false
}


schema = <<EOF
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
EOF
depends_on = [
google_bigquery_dataset.google_trends
]
Expand All @@ -41,7 +81,47 @@ resource "google_bigquery_table" "top_rising_terms" {
table_id = "top_rising_terms"

description = "Daily top rising terms in the United States with score, ranking, time, and designated market area"
time_partitioning {
type = "DAY"

field = "refresh_date"

require_partition_filter = false
}


schema = <<EOF
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
EOF
depends_on = [
google_bigquery_dataset.google_trends
]
Expand Down
70 changes: 70 additions & 0 deletions datasets/google_trends/top_terms/pipeline.yaml
Expand Up @@ -17,10 +17,80 @@ resources:
- type: bigquery_table
table_id: top_terms
description: "Daily top 25 terms in the United States with score, ranking, time, and designated market area"
time_partitioning:
type: "DAY"
field: "refresh_date"
require_partition_filter: false
schema: |-
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]

- type: bigquery_table
table_id: top_rising_terms
description: "Daily top rising terms in the United States with score, ranking, time, and designated market area"
time_partitioning:
type: "DAY"
field: "refresh_date"
require_partition_filter: false
schema: |-
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]

dag:
initialize:
Expand Down
22 changes: 22 additions & 0 deletions samples/pipeline.yaml
Expand Up @@ -32,9 +32,31 @@ resources:
# table_id
table_id: PIPELINE_FOLDER_NAME

# Optional Properties:
# Description of the table
description: "This is a table description."

# Time-based partitioning configuration. There is no need for this property
# if you have a relatively small dataset to host on a BigQuery table.
time_partitioning:

# The supported types are DAY, HOUR, MONTH, and YEAR, which will generate one partition per day, hour, month, and year, respectively.
type: "DAY"

# If set to true, queries over this table require a partition filter that can be used for partition elimination to be specified.
require_partition_filter: false

# Specifies column names to use for data clustering. Up to four top-level columns are allowed, and should be specified in descending priority order.
clustering:
- "column_1"
- "column_2"
- "column_3"

# The table cannot be deleted without first disabling this property.
# Unless this field is set to false in Terraform state, a `terraform destroy`
# or `terraform apply` that would delete the table will fail.
deletion_protection: true

dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
Expand Down
13 changes: 13 additions & 0 deletions templates/terraform/google_bigquery_table.tf.jinja2
Expand Up @@ -23,6 +23,19 @@ resource "google_bigquery_table" "{{ tf_resource_name }}" {
{% if description -%}
description = {{ description|tojson }}
{%- endif %}
{% if time_partitioning -%}
time_partitioning {
{%- for key, val in time_partitioning.items() %}
{{ key }} = {{ val|tojson }}
{% endfor -%}
}
{%- endif %}
{% if clustering -%}
clustering = {{ clustering|tojson }}
{%- endif %}
{% if deletion_protection -%}
deletion_protection = {{ deletion_protection|tojson }}
{%- endif %}
{% if schema -%}
schema = <<EOF
{{ schema }}
Expand Down
20 changes: 16 additions & 4 deletions tests/scripts/test_generate_terraform.py
Expand Up @@ -542,7 +542,7 @@ def test_dataset_tf_has_no_bq_dataset_description_when_unspecified(
assert not re.search(r"description\s+\=", result.group(1))


def test_pipeline_tf_contains_bq_table_description_when_specified(
def test_pipeline_tf_contains_optional_properties_when_specified(
dataset_path,
pipeline_path,
project_id,
Expand Down Expand Up @@ -571,10 +571,13 @@ def test_pipeline_tf_contains_bq_table_description_when_specified(
)
assert bq_table
assert bq_table["description"]
assert bq_table["time_partitioning"]
assert bq_table["clustering"]
assert bq_table["deletion_protection"]

# Match the "google_bigquery_table" properties, i.e. any lines between the
# curly braces, in the *_pipeline.tf file
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)\}"
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)^\}"
bq_table_tf_string = re.compile(regexp, flags=re.MULTILINE | re.DOTALL)

for path_prefix in (
Expand All @@ -587,9 +590,12 @@ def test_pipeline_tf_contains_bq_table_description_when_specified(

assert re.search(r"table_id\s+\=", result.group(1))
assert re.search(r"description\s+\=", result.group(1))
assert re.search(r"time_partitioning\s+\{", result.group(1))
assert re.search(r"clustering\s+\=", result.group(1))
assert re.search(r"deletion_protection\s+\=", result.group(1))


def test_pipeline_tf_has_no_bq_table_description_when_unspecified(
def test_pipeline_tf_has_no_optional_properties_when_unspecified(
dataset_path,
pipeline_path,
project_id,
Expand All @@ -608,6 +614,9 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(
(r for r in config["resources"] if r["type"] == "bigquery_table"), None
)
del bq_table["description"]
del bq_table["time_partitioning"]
del bq_table["clustering"]
del bq_table["deletion_protection"]
with open(pipeline_path / "pipeline.yaml", "w") as file:
yaml.dump(config, file)

Expand All @@ -624,7 +633,7 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(

# Match the "google_bigquery_table" properties, i.e. any lines between the
# curly braces, in the *_pipeline.tf file
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)\}"
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)^\}"
bq_table_tf_string = re.compile(regexp, flags=re.MULTILINE | re.DOTALL)

for path_prefix in (
Expand All @@ -637,6 +646,9 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(

assert re.search(r"table_id\s+\=", result.group(1))
assert not re.search(r"description\s+\=", result.group(1))
assert not re.search(r"time_partitioning\s+\{", result.group(1))
assert not re.search(r"clustering\s+\=", result.group(1))
assert not re.search(r"deletion_protection\s+\=", result.group(1))


def test_bq_table_can_have_a_description_with_newlines_and_quotes(
Expand Down