Skip to content

Commit

Permalink
fix: avoid policy tags 403 error in load_table_from_dataframe (#557)
Browse files Browse the repository at this point in the history
* WIP: fix: don't set policy tags in load job from dataframe

* copy fields parameter for struct support

* update tests to allow missing description property

* fix load from dataframe test on python 3.6

Also, check that sent schema matches DataFrame order, not table order
  • Loading branch information
tswast committed Mar 19, 2021
1 parent 7447f05 commit 84e646e
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 149 deletions.
13 changes: 11 additions & 2 deletions google/cloud/bigquery/client.py
Expand Up @@ -2291,9 +2291,18 @@ def load_table_from_dataframe(
name
for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
)
# schema fields not present in the dataframe are not needed
job_config.schema = [
field for field in table.schema if field.name in columns_and_indexes
# Field description and policy tags are not needed to
# serialize a data frame.
SchemaField(
field.name,
field.field_type,
mode=field.mode,
fields=field.fields,
)
# schema fields not present in the dataframe are not needed
for field in table.schema
if field.name in columns_and_indexes
]

job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
Expand Down
43 changes: 21 additions & 22 deletions google/cloud/bigquery/schema.py
Expand Up @@ -19,6 +19,7 @@
from google.cloud.bigquery_v2 import types


_DEFAULT_VALUE = object()
_STRUCT_TYPES = ("RECORD", "STRUCT")

# SQL types reference:
Expand Down Expand Up @@ -73,14 +74,18 @@ def __init__(
name,
field_type,
mode="NULLABLE",
description=None,
description=_DEFAULT_VALUE,
fields=(),
policy_tags=None,
):
self._name = name
self._field_type = field_type
self._mode = mode
self._description = description
self._properties = {
"name": name,
"type": field_type,
}
if mode is not None:
self._properties["mode"] = mode.upper()
if description is not _DEFAULT_VALUE:
self._properties["description"] = description
self._fields = tuple(fields)
self._policy_tags = policy_tags

Expand All @@ -98,7 +103,7 @@ def from_api_repr(cls, api_repr):
"""
# Handle optional properties with default values
mode = api_repr.get("mode", "NULLABLE")
description = api_repr.get("description")
description = api_repr.get("description", _DEFAULT_VALUE)
fields = api_repr.get("fields", ())

return cls(
Expand All @@ -113,7 +118,7 @@ def from_api_repr(cls, api_repr):
@property
def name(self):
"""str: The name of the field."""
return self._name
return self._properties["name"]

@property
def field_type(self):
Expand All @@ -122,7 +127,7 @@ def field_type(self):
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
"""
return self._field_type
return self._properties["type"]

@property
def mode(self):
Expand All @@ -131,17 +136,17 @@ def mode(self):
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.mode
"""
return self._mode
return self._properties.get("mode")

@property
def is_nullable(self):
"""bool: whether 'mode' is 'nullable'."""
return self._mode == "NULLABLE"
return self.mode == "NULLABLE"

@property
def description(self):
"""Optional[str]: description for the field."""
return self._description
return self._properties.get("description")

@property
def fields(self):
Expand All @@ -164,13 +169,7 @@ def to_api_repr(self):
Returns:
Dict: A dictionary representing the SchemaField in a serialized form.
"""
# Put together the basic representation. See http://bit.ly/2hOAT5u.
answer = {
"mode": self.mode.upper(),
"name": self.name,
"type": self.field_type.upper(),
"description": self.description,
}
answer = self._properties.copy()

# If this is a RECORD type, then sub-fields are also included,
# add this to the serialized representation.
Expand All @@ -193,10 +192,10 @@ def _key(self):
Tuple: The contents of this :class:`~google.cloud.bigquery.schema.SchemaField`.
"""
return (
self._name,
self._field_type.upper(),
self._mode.upper(),
self._description,
self.name,
self.field_type.upper(),
self.mode.upper(),
self.description,
self._fields,
self._policy_tags,
)
Expand Down
12 changes: 2 additions & 10 deletions tests/unit/job/test_load_config.py
Expand Up @@ -434,13 +434,11 @@ def test_schema_setter_fields(self):
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
age_repr = {
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
}
self.assertEqual(
config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]}
Expand All @@ -449,24 +447,18 @@ def test_schema_setter_fields(self):
def test_schema_setter_valid_mappings_list(self):
config = self._get_target_class()()

schema = [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
config.schema = schema

full_name_repr = {
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
age_repr = {
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
}
schema = [full_name_repr, age_repr]
config.schema = schema
self.assertEqual(
config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]}
)
Expand Down
113 changes: 74 additions & 39 deletions tests/unit/test_client.py
Expand Up @@ -1596,18 +1596,8 @@ def test_create_table_w_schema_and_query(self):
{
"schema": {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
},
{
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
},
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
},
"view": {"query": query},
Expand Down Expand Up @@ -1641,18 +1631,8 @@ def test_create_table_w_schema_and_query(self):
},
"schema": {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
},
{
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
},
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INTEGER", "mode": "REQUIRED"},
]
},
"view": {"query": query, "useLegacySql": False},
Expand Down Expand Up @@ -2602,7 +2582,7 @@ def test_update_table(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "New field description",
},
]
},
Expand All @@ -2613,8 +2593,10 @@ def test_update_table(self):
}
)
schema = [
SchemaField("full_name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField("full_name", "STRING", mode="REQUIRED", description=None),
SchemaField(
"age", "INTEGER", mode="REQUIRED", description="New field description"
),
]
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
Expand Down Expand Up @@ -2647,7 +2629,7 @@ def test_update_table(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "New field description",
},
]
},
Expand Down Expand Up @@ -2773,13 +2755,24 @@ def test_update_table_w_query(self):
"name": "age",
"type": "INTEGER",
"mode": "REQUIRED",
"description": None,
"description": "this is a column",
},
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
]
}
schema = [
SchemaField("full_name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField(
"full_name",
"STRING",
mode="REQUIRED",
# Explicitly unset the description.
description=None,
),
SchemaField(
"age", "INTEGER", mode="REQUIRED", description="this is a column"
),
# Omit the description to not make updates to it.
SchemaField("country", "STRING"),
]
resource = self._make_table_resource()
resource.update(
Expand Down Expand Up @@ -7658,18 +7651,47 @@ def test_load_table_from_file_w_invalid_job_config(self):
def test_load_table_from_dataframe(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import PolicyTagList, SchemaField

client = self._make_client()
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
dataframe = pandas.DataFrame(records)
records = [
{"id": 1, "age": 100, "accounts": [2, 3]},
{"id": 2, "age": 60, "accounts": [5]},
{"id": 3, "age": 40, "accounts": []},
]
# Mixup column order so that we can verify sent schema matches the
# serialized order, not the table column order.
column_order = ["age", "accounts", "id"]
dataframe = pandas.DataFrame(records, columns=column_order)
table_fields = {
"id": SchemaField(
"id",
"INTEGER",
mode="REQUIRED",
description="integer column",
policy_tags=PolicyTagList(names=("foo", "bar")),
),
"age": SchemaField(
"age",
"INTEGER",
mode="NULLABLE",
description="age column",
policy_tags=PolicyTagList(names=("baz",)),
),
"accounts": SchemaField(
"accounts", "INTEGER", mode="REPEATED", description="array column",
),
}
get_table_schema = [
table_fields["id"],
table_fields["age"],
table_fields["accounts"],
]

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
),
return_value=mock.Mock(schema=get_table_schema),
)
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
Expand All @@ -7695,8 +7717,21 @@ def test_load_table_from_dataframe(self):
sent_file = load_table_from_file.mock_calls[0][1][1]
assert sent_file.closed

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
sent_config = load_table_from_file.mock_calls[0][2]["job_config"].to_api_repr()[
"load"
]
assert sent_config["sourceFormat"] == job.SourceFormat.PARQUET
for field_index, field in enumerate(sent_config["schema"]["fields"]):
assert field["name"] == column_order[field_index]
table_field = table_fields[field["name"]]
assert field["name"] == table_field.name
assert field["type"] == table_field.field_type
assert field["mode"] == table_field.mode
assert len(field.get("fields", [])) == len(table_field.fields)
# Omit unnecessary fields when they come from getting the table
# (not passed in via job_config)
assert "description" not in field
assert "policyTags" not in field

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
Expand Down
9 changes: 1 addition & 8 deletions tests/unit/test_external_config.py
Expand Up @@ -77,14 +77,7 @@ def test_to_api_repr_base(self):
ec.schema = [schema.SchemaField("full_name", "STRING", mode="REQUIRED")]

exp_schema = {
"fields": [
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED",
"description": None,
}
]
"fields": [{"name": "full_name", "type": "STRING", "mode": "REQUIRED"}]
}
got_resource = ec.to_api_repr()
exp_resource = {
Expand Down

0 comments on commit 84e646e

Please sign in to comment.