Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Column addition in Spark Bigquery Connector 2.12_0.29.0 vs 2.12_0.20.0 #945

Closed
bbarodia opened this issue Apr 10, 2023 · 23 comments · Fixed by #958
Closed

New Column addition in Spark Bigquery Connector 2.12_0.29.0 vs 2.12_0.20.0 #945

bbarodia opened this issue Apr 10, 2023 · 23 comments · Fixed by #958
Assignees

Comments

@bbarodia
Copy link

Hi,

We wanted to upgrade to 0.29.0 from 0.20.0 but noticed the following behavior.

Setup to write to BigQuery :

final_df.write.format("bigquery") \
            .option("temporaryGcsBucket", c.TEMPORARY_GCS_BUCKET) \
            .option("table", self.gbq_project + ":" + final_dataset_name + "." + output_short_table_name) \
            .option("createDisposition", "CREATE_IF_NEEDED") \
            .option("schemaUpdateOptions", "ALLOW_FIELD_ADDITION" if use_gbq_overwrite_mode else '') \
            .option("decimal", "NUMERIC") \
            .option("partitionField", partition_column) \
            .option("partitionType", "DAY") \
            .mode('overwrite' if use_gbq_overwrite_mode else 'append') \
            .save()

Test Case :

  1. Write dataframe to table
  2. Add a column to dataframe and write to table

expected behaviour : new column is added to table.

With 0.29.0 :

We do not see the new column in the table.

we see this in logs : autodetect=null and

 schema=Schema{fields=[Field{name=field_1, type=STRING, mode=NULLABLE, description=null, policyTags=null,
, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=test_string_field, type=STRING, mode=REQUIRED, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null},

with 0.20.0:
We DO see the new column in the table.

we see autodetect=true
and schema=null

@bbarodia bbarodia changed the title Spark Bigquery Connector 2.12_0.29.0 vs 2.12_0.20.0 New Column addition in Spark Bigquery Connector 2.12_0.29.0 vs 2.12_0.20.0 Apr 10, 2023
@pricemg
Copy link

pricemg commented Apr 11, 2023

I think I'm seeing this exact issue right now actually.

Having the following options set in my spark session config

        # Allow new fields to be added to struct(record in BQ) column that
        # differ from what is already present in the BQ table being written to.
        spark.conf.set("allowFieldAddition", "true")
        # Any fields different between current data to be appended and already
        # in BQ table are filled with Null.
        spark.conf.set("allowFieldRelaxation", "true"

With dataproc-serverless v1.0 (where we were using 2.12-0.23.2 of the connector) could append a dataframe to a table where there was not a perfect overlap of columns and the new columns would be added (with Null set in the rows were the new columns weren't present previously). In dataproc-serverless v2.0 (where we're now using the 2.13-0.28.0 connector) I am still seeing the rows being appended but the additional columns are not present.

@suryasoma suryasoma self-assigned this Apr 14, 2023
@pricemg
Copy link

pricemg commented Apr 16, 2023

I can put together mwe if it's helpful, but I think @bbarodia has probably explained the premise well.

@bbarodia
Copy link
Author

Hi Folks, we are observing similar behaviour with 0.30 too. Please let us know if this is expected or a bug.

Also BigQuery Console can handle adding a column schema without deleting previous partitions. The only way to write a dataframe that has a changed schema to BigQuery is to use the mode: overwrite which will delete the old partitions. Can this be handled with mode: append where we do not need to delete old partitions ?

@suryasoma
Copy link
Contributor

Yeah this is a bug, we are investigating on what has changed which has caused this. Will update the ticket soon.
Thanks for pointing out the issue.

@bbarodia
Copy link
Author

hi @davidrabinowitz :

When adding a field or deleting a field in an existing table for a particular partition day, when we use mode: overwrite for BigQuery, the older partitions will get deleted and only the partition that we wrote remains.

Is that the expected behavior from BigQuery ? Can we have old data in our partitions when changing schemas ? Are there any flags that we can use for BigQuery ?

We have tried using the flags, allowFieldAddition during write but that does not work.

@suryasoma
Copy link
Contributor

hey, so in overwrite the data itself is overwrited so if you want the older data as well, mode: append is what you should use.
and with append to update the schema, need to set the allowFieldAddition option.
The connector is passing the the option to the BigQuery load job, but looks like there is an issue in load job. We have raised a bug on this and tracking it.
Will update the issue as soon as the bug is resolved.

Also found a similar issue googleapis/python-bigquery#1095

@bbarodia
Copy link
Author

HI @suryasoma ,

Thank you for the clarification. I was expecting the same behavior as well. If possible, can you comment on what the timeline will look like ? will there be a new connector version released ?

Thanks

@chalmerlowe
Copy link

I am the primary caretaker the Python BigQuery library and have been looking at googleapis/python-bigquery#1095, but do not have a solution yet.

I am going to try and adjust my schedule for next week to see what I can do to direct some attention to the issue in the Python BQ library.

@bbarodia
Copy link
Author

bbarodia commented May 3, 2023

hi @suryasoma / @chalmerlowe :
Any updates on timelines to expect these changes ?

@suryasoma suryasoma linked a pull request May 3, 2023 that will close this issue
@suryasoma
Copy link
Contributor

The fix for this is merged @bbarodia. You can find it in the next release.

@pricemg
Copy link

pricemg commented May 4, 2023

Amazing, thank you @suryasoma. My own ignorance, but how often are releases typically?

@bbarodia
Copy link
Author

@suryasoma : could you please comment on when this would be released ?

@suryasoma
Copy link
Contributor

We plan to have a release soon, in the upcoming weeks. I will update the ticket once the release is done.
Thanks :)

@suryasoma
Copy link
Contributor

@bbarodia, please find the fix in the latest release 0.31.0

@bbarodia
Copy link
Author

bbarodia commented Jun 2, 2023

Thanks, will check it out

@pricemg
Copy link

pricemg commented Jun 7, 2023

@suryasoma I've just tried this and finding it is still not behaving exactly as it was before.

Running the below with Dataproc batches=2.0 and --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.31.0.jar

    from pyspark.sql import (
        SparkSession,
        types as T,
    )

    spark = SparkSession.builder.getOrCreate()
    spark.conf.set("viewsEnabled", "true")
    spark.conf.set("materializationDataset", 'test_space')
    spark.conf.set('temporaryGcsBucket', staging_bucket)
    spark.conf.set("intermediateFormat", "orc")
    # Allow new fields to be added to struct(record in BQ) column that
    # differ from what is already present in the BQ table being written to.
    spark.conf.set("allowFieldAddition", "true")
    # Any fields different between current data to be appended and already
    # in BQ table are filled with Null.
    spark.conf.set("allowFieldRelaxation", "true")

    output_table_path = 'test_space.column_check'

    print('Case 1')
    df = spark.createDataFrame(
        data=[
            (1, 'a'),
            (2, 'b'),
        ],
        schema=T.StructType([
            T.StructField('number', T.IntegerType(), True),
            T.StructField('letter', T.StringType(), True),
        ])
    )
    df.show()
    df.printSchema()
    df.write.save(output_table_path, format="bigquery", mode='append')

    print('Case 2')
    df = spark.createDataFrame(
        data=[
            (3, 'c', True),
            (4, 'd', False),
        ],
        schema=T.StructType([
            T.StructField('number', T.IntegerType(), True),
            T.StructField('letter', T.StringType(), True),
            T.StructField('bool', T.BooleanType(), True),
        ])
    )
    df.show()
    df.printSchema()
    df.write.save(output_table_path, format="bigquery", mode='append')

    print('Case 3')
    df = spark.createDataFrame(
        data=[
            (5, 'e', (55, 'ee')),
            (6, 'f', (66, 'ff')),
        ],
        schema=T.StructType([
            T.StructField('number', T.IntegerType(), True),
            T.StructField('letter', T.StringType(), True),
            T.StructField(
                'struct_column',
                T.StructType([
                    T.StructField('more_numbers', T.IntegerType(), True),
                    T.StructField('more_letters', T.StringType(), True),
                ]),
                True
            ),
        ])
    )
    df.show()
    df.printSchema()
    df.write.save(output_table_path, format="bigquery", mode='append')

    print('Case 4')
    df = spark.createDataFrame(
        data=[
            (7, 'g', (77, 'gg', True)),
            (8, 'h', (88, 'hh', False)),
        ],
        schema=T.StructType([
            T.StructField('number', T.IntegerType(), True),
            T.StructField('letter', T.StringType(), True),
            T.StructField(
                'struct_column',
                T.StructType([
                    T.StructField('more_numbers', T.IntegerType(), True),
                    T.StructField('more_letters', T.StringType(), True),
                    T.StructField('more_bools', T.BooleanType(), True),
                ]),
                True
            ),
        ])
    )
    df.show()
    df.printSchema()
    df.write.save(output_table_path, format="bigquery", mode='append')

works for writing out case 1, and then case 2 which adds a new column, however it then crashes when attempting to write case 3.

@bbarodia
Copy link
Author

Found an issue when adding a new numeric field : #997

@bkinzle
Copy link

bkinzle commented Sep 17, 2023

Fyi, this is still not working for me (new fields are not being created in the bigquery table I'm loading into) with the latest version of the connector:
spark-bigquery-with-dependencies_2.13-0.32.2.jar

I'm not sure if it should matter but in addtion to schemaUpdateOptions=[ALLOW_FIELD_ADDITION], my load job is also using:

  • writeDisposition=WRITE_TRUNCATE
  • The target table is partitioned by DAY and a specific datePartition is being input.

@jherrmannNetfonds
Copy link

jherrmannNetfonds commented Oct 18, 2023

I have the same problem also with spark-bigquery-with-dependencies_2.12-0.32.2.jar having a partitioned table by date, set intermediateFormat to avro, useAvroLogicalTypes to true, allowFieldAddition to true and using override. I am using pyspark
Getting:
com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table XXXX. Cannot add fields (field: xxxxxxx)

The LoadJobConfiguration in the logs shows schemaUpdateOptions=null, I would expect ALLOW_FIELD_ADDITION

@jherrmannNetfonds
Copy link

I solved my issue when using pyspark:
I was using:
df.write.format("bigquery").option("table", "table_name").option("allowFieldAddition ", True)
this is not working. If I add allowFieldAddition to spark config via spark.conf.set("allowFieldAddition", "true"), it is working.
The strange think is, if I add allowFieldRelaxation as an option to the writer, the schemaUpdateOptions correctly contains ALLOW_FIELD_RELAXATION but not ALLOW_FIELD_ADDITION if it is added as an option to the writer.

@rui-castro-ebury
Copy link

I have a similar case to @pricemg
adding new fields is working, if those new fields are at the dataframe root level!
if i have a struct field and i change it by adding a new field, it will fail
Provided Schema does not match Table XXXX. Cannot add required fields to an existing schema. (field: struct.struct_c)

any field addition to a "sub-level" fails.

@vishalkarve15
Copy link
Contributor

@rui-castro-ebury can you please create a new issue? You can add a reference to this issue if needed.

@erajabi
Copy link

erajabi commented Apr 25, 2024

The following worked for me in big query pyspark connector:
.option("allowFieldAddition ", True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants