Skip to content

Commit

Permalink
Implement BigQuery Table Schema Update Operator (#15367)
Browse files Browse the repository at this point in the history
Co-authored-by: Jens Larsson <jens.larsson@c02cv73mml85.lan>
  • Loading branch information
thejens and Jens Larsson committed May 4, 2021
1 parent 59be278 commit cf6324e
Show file tree
Hide file tree
Showing 6 changed files with 441 additions and 0 deletions.
Expand Up @@ -36,6 +36,7 @@
BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator,
BigQueryUpdateTableOperator,
BigQueryUpdateTableSchemaOperator,
BigQueryUpsertTableOperator,
)
from airflow.utils.dates import days_ago
Expand Down Expand Up @@ -73,6 +74,18 @@
)
# [END howto_operator_bigquery_create_table]

# [START howto_operator_bigquery_update_table_schema]
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
# [END howto_operator_bigquery_update_table_schema]

# [START howto_operator_bigquery_delete_table]
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
Expand Down Expand Up @@ -216,6 +229,7 @@
delete_view,
]
>> upsert_table
>> update_table_schema
>> delete_materialized_view
>> delete_table
>> delete_dataset
Expand Down
95 changes: 95 additions & 0 deletions airflow/providers/google/cloud/hooks/bigquery.py
Expand Up @@ -1373,6 +1373,101 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
table = self.get_client(project_id=project_id).get_table(table_ref)
return {"fields": [s.to_api_repr() for s in table.schema]}

@GoogleBaseHook.fallback_to_default_project_id
def update_table_schema(
self,
schema_fields_updates: List[Dict[str, Any]],
include_policy_tags: bool,
dataset_id: str,
table_id: str,
project_id: Optional[str] = None,
) -> None:
"""
Update fields within a schema for a given dataset and table. Note that
some fields in schemas are immutable and trying to change them will cause
an exception.
If a new field is included it will be inserted which requires all required fields to be set.
See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
:param include_policy_tags: If set to True policy tags will be included in
the update request which requires special permissions even if unchanged
see https://cloud.google.com/bigquery/docs/column-level-security#roles
:type include_policy_tags: bool
:param dataset_id: the dataset ID of the requested table to be updated
:type dataset_id: str
:param table_id: the table ID of the table to be updated
:type table_id: str
:param schema_fields_updates: a partial schema resource. see
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
**Example**: ::
schema_fields_updates=[
{"name": "emp_name", "description": "Some New Description"},
{"name": "salary", "description": "Some New Description"},
{"name": "departments", "fields": [
{"name": "name", "description": "Some New Description"},
{"name": "type", "description": "Some New Description"}
]},
]
:type schema_fields_updates: List[dict]
:param project_id: The name of the project where we want to update the table.
:type project_id: str
"""

def _build_new_schema(
current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:

# Turn schema_field_updates into a dict keyed on field names
schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}

# Create a new dict for storing the new schema, initated based on the current_schema
# as of Python 3.6, dicts retain order.
new_schema = {field["name"]: field for field in deepcopy(current_schema)}

# Each item in schema_fields_updates contains a potential patch
# to a schema field, iterate over them
for field_name, patched_value in schema_fields_updates.items():
# If this field already exists, update it
if field_name in new_schema:
# If this field is of type RECORD and has a fields key we need to patch it recursively
if "fields" in patched_value:
patched_value["fields"] = _build_new_schema(
new_schema[field_name]["fields"], patched_value["fields"]
)
# Update the new_schema with the patched value
new_schema[field_name].update(patched_value)
# This is a new field, just include the whole configuration for it
else:
new_schema[field_name] = patched_value

return list(new_schema.values())

def _remove_policy_tags(schema: List[Dict[str, Any]]):
for field in schema:
if "policyTags" in field:
del field["policyTags"]
if "fields" in field:
_remove_policy_tags(field["fields"])

current_table_schema = self.get_schema(
dataset_id=dataset_id, table_id=table_id, project_id=project_id
)["fields"]
new_schema = _build_new_schema(current_table_schema, schema_fields_updates)

if not include_policy_tags:
_remove_policy_tags(new_schema)

self.update_table(
table_resource={"schema": {"fields": new_schema}},
fields=["schema"],
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
)

@GoogleBaseHook.fallback_to_default_project_id
def poll_job_complete(
self,
Expand Down
112 changes: 112 additions & 0 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -2039,6 +2039,118 @@ def execute(self, context) -> None:
)


class BigQueryUpdateTableSchemaOperator(BaseOperator):
"""
Update BigQuery Table Schema
Updates fields on a table schema based on contents of the supplied schema_fields_updates
parameter. The supplied schema does not need to be complete, if the field
already exists in the schema you only need to supply keys & values for the
items you want to patch, just ensure the "name" key is set.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BigQueryUpdateTableSchemaOperator`
:param schema_fields_updates: a partial schema resource. see
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
**Example**: ::
schema_fields_updates=[
{"name": "emp_name", "description": "Some New Description"},
{"name": "salary", "policyTags": {'names': ['some_new_policy_tag']},},
{"name": "departments", "fields": [
{"name": "name", "description": "Some New Description"},
{"name": "type", "description": "Some New Description"}
]},
]
:type schema_fields_updates: List[dict]
:param include_policy_tags: (Optional) If set to True policy tags will be included in
the update request which requires special permissions even if unchanged (default False)
see https://cloud.google.com/bigquery/docs/column-level-security#roles
:type include_policy_tags: bool
:param dataset_id: A dotted
``(<project>.|<project>:)<dataset>`` that indicates which dataset
will be updated. (templated)
:type dataset_id: str
:param table_id: The table ID of the requested table. (templated)
:type table_id: str
:param project_id: The name of the project where we want to update the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:type gcp_conn_id: str
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:type bigquery_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: str
:param location: The location used for the operation.
:type location: str
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
"""

template_fields = (
'schema_fields_updates',
'dataset_id',
'table_id',
'project_id',
'impersonation_chain',
)
template_fields_renderers = {"schema_fields_updates": "json"}
ui_color = BigQueryUIColors.TABLE.value

@apply_defaults
def __init__(
self,
*,
schema_fields_updates: List[Dict[str, Any]],
include_policy_tags: Optional[bool] = False,
dataset_id: Optional[str] = None,
table_id: Optional[str] = None,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
self.schema_fields_updates = schema_fields_updates
self.include_policy_tags = include_policy_tags
self.table_id = table_id
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
super().__init__(**kwargs)

def execute(self, context):
bq_hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)

return bq_hook.update_table_schema(
schema_fields_updates=self.schema_fields_updates,
include_policy_tags=self.include_policy_tags,
dataset_id=self.dataset_id,
table_id=self.table_id,
project_id=self.project_id,
)


# pylint: disable=too-many-arguments
class BigQueryInsertJobOperator(BaseOperator):
"""
Expand Down
17 changes: 17 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
Expand Up @@ -245,6 +245,23 @@ in the given dataset.
:start-after: [START howto_operator_bigquery_upsert_table]
:end-before: [END howto_operator_bigquery_upsert_table]

.. _howto/operator:BigQueryUpdateTableSchemaOperator:

Update table schema
"""""""""""""""""""

To update the schema of a table you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator`.

This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful
for instance to set new field descriptions on an existing table schema.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_update_table_schema]
:end-before: [END howto_operator_bigquery_update_table_schema]

.. _howto/operator:BigQueryDeleteTableOperator:

Delete table
Expand Down

0 comments on commit cf6324e

Please sign in to comment.