Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: omit NaN values when uploading from insert_rows_from_dataframe #170

Merged

Conversation

tswast
Copy link
Contributor

@tswast tswast commented Jul 15, 2020

NaN values are most often used to indicate a NULL value in pandas. Also,
even when a column is a floating point column, the BigQuery streaming
API JSON parser doesn't seem to be able to handle NaN literals.

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #169 馃

NaN values are most often used to indicate a NULL value in pandas. Also,
even when a column is a floating point column, the BigQuery streaming
API JSON parser doesn't seem to be able to handle NaN literals.
@tswast
Copy link
Contributor Author

tswast commented Jul 15, 2020

Tests pass locally except for two failures

nox > Ran multiple sessions:
nox > * unit-2.7: success
nox > * unit-3.5: success
nox > * unit-3.6: success
nox > * unit-3.7: success
nox > * unit-3.8: success
nox > * system-2.7: failed
nox > * system-3.8: failed
nox > * snippets-2.7: success
nox > * snippets-3.8: success
nox > * cover: success
nox > * lint: success
nox > * lint_setup_py: success
nox > * blacken: success
nox > * docs: success

The two test failures seem to be for permissions issues (which is strange because I think my service account is EDITOR on the project)

_________________________ TestBigQuery.test_load_table_from_file_w_explicit_location __________________________

self = <tests.system.TestBigQuery testMethod=test_load_table_from_file_w_explicit_location>

    def test_load_table_from_file_w_explicit_location(self):
        # Create a temporary bucket for extract files.
        bucket_name = "bq_load_table_eu_extract_test" + unique_resource_id()
        self._create_bucket(bucket_name, location="eu")
    
        # Create a temporary dataset & table in the EU.
        table_bytes = six.BytesIO(b"a,3\nb,2\nc,1\n")
        client = Config.CLIENT
        dataset = self.temp_dataset(_make_dataset_id("eu_load_file"), location="EU")
        table_ref = dataset.table("letters")
        job_config = bigquery.LoadJobConfig()
        job_config.skip_leading_rows = 0
        job_config.schema = [
            bigquery.SchemaField("letter", "STRING"),
            bigquery.SchemaField("value", "INTEGER"),
        ]
    
        # Load the file to an EU dataset with an EU load job.
        load_job = client.load_table_from_file(
            table_bytes, table_ref, location="EU", job_config=job_config
        )
        load_job.result()
        job_id = load_job.job_id
    
        # Can get the job from the EU.
        load_job = client.get_job(job_id, location="EU")
        self.assertEqual(job_id, load_job.job_id)
        self.assertEqual("EU", load_job.location)
        self.assertTrue(load_job.exists())
    
        # Cannot get the job from the US.
        with self.assertRaises(NotFound):
            client.get_job(job_id, location="US")
    
        load_job_us = client.get_job(job_id)
        load_job_us._properties["jobReference"]["location"] = "US"
        self.assertFalse(load_job_us.exists())
        with self.assertRaises(NotFound):
            load_job_us.reload()
    
        # Can cancel the job from the EU.
        self.assertTrue(load_job.cancel())
        load_job = client.cancel_job(job_id, location="EU")
        self.assertEqual(job_id, load_job.job_id)
        self.assertEqual("EU", load_job.location)
    
        # Cannot cancel the job from the US.
        with self.assertRaises(NotFound):
>           client.cancel_job(job_id, location="US")

tests/system.py:1271: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/client.py:1483: in cancel_job
    resource = self._call_api(
google/cloud/bigquery/client.py:562: in _call_api
    return call()
.nox/system-3-8/lib/python3.8/site-packages/google/api_core/retry.py:281: in retry_wrapped_func
    return retry_target(
.nox/system-3-8/lib/python3.8/site-packages/google/api_core/retry.py:184: in retry_target
    return target()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    def api_request(
        self,
        method,
        path,
        query_params=None,
        data=None,
        content_type=None,
        headers=None,
        api_base_url=None,
        api_version=None,
        expect_json=True,
        _target_object=None,
        timeout=_DEFAULT_TIMEOUT,
    ):
        """Make a request over the HTTP transport to the API.
    
        You shouldn't need to use this method, but if you plan to
        interact with the API using these primitives, this is the
        correct one to use.
    
        :type method: str
        :param method: The HTTP method name (ie, ``GET``, ``POST``, etc).
                       Required.
    
        :type path: str
        :param path: The path to the resource (ie, ``'/b/bucket-name'``).
                     Required.
    
        :type query_params: dict or list
        :param query_params: A dictionary of keys and values (or list of
                             key-value pairs) to insert into the query
                             string of the URL.
    
        :type data: str
        :param data: The data to send as the body of the request. Default is
                     the empty string.
    
        :type content_type: str
        :param content_type: The proper MIME type of the data provided. Default
                             is None.
    
        :type headers: dict
        :param headers: extra HTTP headers to be sent with the request.
    
        :type api_base_url: str
        :param api_base_url: The base URL for the API endpoint.
                             Typically you won't have to provide this.
                             Default is the standard API base URL.
    
        :type api_version: str
        :param api_version: The version of the API to call.  Typically
                            you shouldn't provide this and instead use
                            the default for the library.  Default is the
                            latest API version supported by
                            google-cloud-python.
    
        :type expect_json: bool
        :param expect_json: If True, this method will try to parse the
                            response as JSON and raise an exception if
                            that cannot be done.  Default is True.
    
        :type _target_object: :class:`object`
        :param _target_object:
            (Optional) Protected argument to be used by library callers. This
            can allow custom behavior, for example, to defer an HTTP request
            and complete initialization of the object at a later time.
    
        :type timeout: float or tuple
        :param timeout: (optional) The amount of time, in seconds, to wait
            for the server response.
    
            Can also be passed as a tuple (connect_timeout, read_timeout).
            See :meth:`requests.Session.request` documentation for details.
    
        :raises ~google.cloud.exceptions.GoogleCloudError: if the response code
            is not 200 OK.
        :raises ValueError: if the response content type is not JSON.
        :rtype: dict or str
        :returns: The API response payload, either as a raw string or
                  a dictionary if the response is valid JSON.
        """
        url = self.build_api_url(
            path=path,
            query_params=query_params,
            api_base_url=api_base_url,
            api_version=api_version,
        )
    
        # Making the executive decision that any dictionary
        # data will be sent properly as JSON.
        if data and isinstance(data, dict):
            data = json.dumps(data)
            content_type = "application/json"
    
        response = self._make_request(
            method=method,
            url=url,
            data=data,
            content_type=content_type,
            headers=headers,
            target_object=_target_object,
            timeout=timeout,
        )
    
        if not 200 <= response.status_code < 300:
>           raise exceptions.from_http_response(response)
E           google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/swast-scratch/jobs/e8f027b1-4299-46d7-93f6-d74554afe2a9/cancel?projection=full&location=US: Access Denied: Job swast-scratch:US.e8f027b1-4299-46d7-93f6-d74554afe2a9: User does not have bigquery.jobs.update permission for job swast-scratch:US.e8f027b1-4299-46d7-93f6-d74554afe2a9.

.nox/system-3-8/lib/python3.8/site-packages/google/cloud/_http.py:423: Forbidden

Python 2.7:

_________________________ TestBigQuery.test_load_table_from_file_w_explicit_location __________________________

self = <tests.system.TestBigQuery testMethod=test_load_table_from_file_w_explicit_location>

    def test_load_table_from_file_w_explicit_location(self):
        # Create a temporary bucket for extract files.
        bucket_name = "bq_load_table_eu_extract_test" + unique_resource_id()
        self._create_bucket(bucket_name, location="eu")
    
        # Create a temporary dataset & table in the EU.
        table_bytes = six.BytesIO(b"a,3\nb,2\nc,1\n")
        client = Config.CLIENT
        dataset = self.temp_dataset(_make_dataset_id("eu_load_file"), location="EU")
        table_ref = dataset.table("letters")
        job_config = bigquery.LoadJobConfig()
        job_config.skip_leading_rows = 0
        job_config.schema = [
            bigquery.SchemaField("letter", "STRING"),
            bigquery.SchemaField("value", "INTEGER"),
        ]
    
        # Load the file to an EU dataset with an EU load job.
        load_job = client.load_table_from_file(
            table_bytes, table_ref, location="EU", job_config=job_config
        )
        load_job.result()
        job_id = load_job.job_id
    
        # Can get the job from the EU.
        load_job = client.get_job(job_id, location="EU")
        self.assertEqual(job_id, load_job.job_id)
        self.assertEqual("EU", load_job.location)
        self.assertTrue(load_job.exists())
    
        # Cannot get the job from the US.
        with self.assertRaises(NotFound):
            client.get_job(job_id, location="US")
    
        load_job_us = client.get_job(job_id)
        load_job_us._properties["jobReference"]["location"] = "US"
        self.assertFalse(load_job_us.exists())
        with self.assertRaises(NotFound):
            load_job_us.reload()
    
        # Can cancel the job from the EU.
        self.assertTrue(load_job.cancel())
        load_job = client.cancel_job(job_id, location="EU")
        self.assertEqual(job_id, load_job.job_id)
        self.assertEqual("EU", load_job.location)
    
        # Cannot cancel the job from the US.
        with self.assertRaises(NotFound):
>           client.cancel_job(job_id, location="US")

tests/system.py:1271: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/client.py:1484: in cancel_job
    retry, method="POST", path=path, query_params=extra_params, timeout=timeout
google/cloud/bigquery/client.py:562: in _call_api
    return call()
.nox/system-2-7/lib/python2.7/site-packages/google/api_core/retry.py:286: in retry_wrapped_func
    on_error=on_error,
.nox/system-2-7/lib/python2.7/site-packages/google/api_core/retry.py:184: in retry_target
    return target()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery._http.Connection object at 0x7fb808c20490>, method = 'POST'
path = '/projects/swast-scratch/jobs/1ccf70ad-144b-4d17-9dcd-0ed3c1e3f430/cancel'
query_params = {'location': 'US', 'projection': 'full'}, data = None, content_type = None, headers = None
api_base_url = None, api_version = None, expect_json = True, _target_object = None, timeout = None

    def api_request(
        self,
        method,
        path,
        query_params=None,
        data=None,
        content_type=None,
        headers=None,
        api_base_url=None,
        api_version=None,
        expect_json=True,
        _target_object=None,
        timeout=_DEFAULT_TIMEOUT,
    ):
        """Make a request over the HTTP transport to the API.
    
        You shouldn't need to use this method, but if you plan to
        interact with the API using these primitives, this is the
        correct one to use.
    
        :type method: str
        :param method: The HTTP method name (ie, ``GET``, ``POST``, etc).
                       Required.
    
        :type path: str
        :param path: The path to the resource (ie, ``'/b/bucket-name'``).
                     Required.
    
        :type query_params: dict or list
        :param query_params: A dictionary of keys and values (or list of
                             key-value pairs) to insert into the query
                             string of the URL.
    
        :type data: str
        :param data: The data to send as the body of the request. Default is
                     the empty string.
    
        :type content_type: str
        :param content_type: The proper MIME type of the data provided. Default
                             is None.
    
        :type headers: dict
        :param headers: extra HTTP headers to be sent with the request.
    
        :type api_base_url: str
        :param api_base_url: The base URL for the API endpoint.
                             Typically you won't have to provide this.
                             Default is the standard API base URL.
    
        :type api_version: str
        :param api_version: The version of the API to call.  Typically
                            you shouldn't provide this and instead use
                            the default for the library.  Default is the
                            latest API version supported by
                            google-cloud-python.
    
        :type expect_json: bool
        :param expect_json: If True, this method will try to parse the
                            response as JSON and raise an exception if
                            that cannot be done.  Default is True.
    
        :type _target_object: :class:`object`
        :param _target_object:
            (Optional) Protected argument to be used by library callers. This
            can allow custom behavior, for example, to defer an HTTP request
            and complete initialization of the object at a later time.
    
        :type timeout: float or tuple
        :param timeout: (optional) The amount of time, in seconds, to wait
            for the server response.
    
            Can also be passed as a tuple (connect_timeout, read_timeout).
            See :meth:`requests.Session.request` documentation for details.
    
        :raises ~google.cloud.exceptions.GoogleCloudError: if the response code
            is not 200 OK.
        :raises ValueError: if the response content type is not JSON.
        :rtype: dict or str
        :returns: The API response payload, either as a raw string or
                  a dictionary if the response is valid JSON.
        """
        url = self.build_api_url(
            path=path,
            query_params=query_params,
            api_base_url=api_base_url,
            api_version=api_version,
        )
    
        # Making the executive decision that any dictionary
        # data will be sent properly as JSON.
        if data and isinstance(data, dict):
            data = json.dumps(data)
            content_type = "application/json"
    
        response = self._make_request(
            method=method,
            url=url,
            data=data,
            content_type=content_type,
            headers=headers,
            target_object=_target_object,
            timeout=timeout,
        )
    
        if not 200 <= response.status_code < 300:
>           raise exceptions.from_http_response(response)
E           Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/swast-scratch/jobs/1ccf70ad-144b-4d17-9dcd-0ed3c1e3f430/cancel?projection=full&location=US: Access Denied: Job swast-scratch:US.1ccf70ad-144b-4d17-9dcd-0ed3c1e3f430: User does not have bigquery.jobs.update permission for job swast-scratch:US.1ccf70ad-144b-4d17-9dcd-0ed3c1e3f430.

.nox/system-2-7/lib/python2.7/site-packages/google/cloud/_http.py:423: Forbidden

@tswast tswast requested a review from plamut July 15, 2020 22:09
Copy link
Contributor

@plamut plamut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix works and the changes look good.

Comment on lines +5645 to +5647
for call, expected_data in six.moves.zip_longest(
actual_calls, EXPECTED_SENT_DATA
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice pattern!

@plamut plamut merged commit f9f2f45 into googleapis:master Jul 17, 2020
@tswast tswast deleted the issue169-insert_rows_from_dataframe-NaN branch July 17, 2020 14:06
@tswast tswast mentioned this pull request Jul 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

insert_rows_from_dataframe fails when NaN values are present in DataFrame
3 participants