Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonStreamWriter fails to append rows with updated schemas right after updating the table #1465

Closed
bphenriques opened this issue Jan 6, 2022 · 14 comments
Assignees
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. type: question Request for information or clarification. Not an issue.

Comments

@bphenriques
Copy link
Contributor

bphenriques commented Jan 6, 2022

Hello,

Me and my team are exploring the BigQuery Storage API to stream information to BigQuery (BUFFERED mode). The problem comes with schema updates. We can successfully update the BigQuery table's schema once we deem necessary, however appending rows with the new schemas using JsonStreamWriter fails as the underlying table schema is outdated leading to JSONObject has fields unknown to BigQuery: root.col2..

I am aware that #1447 adds support for table schemas, however it depends on the client making new calls to the append until the underlying schema is updated (looking at the test).

For reference, I copied your integration tests to our project (Scala) to ease discussion:

Works with `BUFFERED` mode
test("Simulate JsonStreamWriter with schema migrations using the BQ API directly") {
  val client = BigQueryWriteClient.create()
  val bigquery = BigQueryOptions.newBuilder
    .setProjectId("some-project")
    .build
    .getService
  val DATASET = "it-test"
  val tableName =
    "SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
  val tableId = TableId.of(DATASET, tableName)
  val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
  val originalSchema = Schema.of(col1)
  val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
  bigquery.create(tableInfo)
  val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
  val writeStream = client.createWriteStream(
    CreateWriteStreamRequest.newBuilder
      .setParent(parent.toString)
      .setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
      .build
  )
  try {
    val jsonStreamWriter =
      JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
    try { // write the 1st row
      var currentOffset = 0L
      val foo = new JSONObject()
      foo.put("col1", "aaa")
      val jsonArr = new JSONArray()
      jsonArr.put(foo)
      val response = jsonStreamWriter.append(jsonArr, currentOffset)
      currentOffset += jsonArr.length() - 1

      assertEquals(0L, response.get.getAppendResult.getOffset.getValue)

      assertEquals(
        0L,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // update schema with a new column
      val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
      val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
      val updatedTableInfo =
        TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
      val updatedTable = bigquery.update(updatedTableInfo)
      assertEquals(
        updatedSchema,
        updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
      )
      // continue writing rows until backend acknowledges schema update
      val foo2 = new JSONObject()
      foo2.put("col1", "bbb")
      val jsonArr2 = new JSONArray()
      jsonArr2.put(foo2)
      var next = 0
      breakable {
        for (i <- 1 until 100) {
          val response2 = jsonStreamWriter.append(jsonArr2, i)
          currentOffset += jsonArr2.length()
          assertEquals(i.toLong, response2.get.getAppendResult.getOffset.getValue)
          if (response2.get.hasUpdatedSchema) {
            next = i
            break
          } else Thread.sleep(1000)
        }
      }
      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // write rows with updated schema.
      val updatedFoo = new JSONObject()
      updatedFoo.put("col1", "ccc")
      updatedFoo.put("col2", "ddd")
      val updatedJsonArr = new JSONArray()
      updatedJsonArr.put(updatedFoo)
      for (i <- 0 until 10) {
        currentOffset += updatedJsonArr.length()
        val response3 = jsonStreamWriter.append(updatedJsonArr, currentOffset)
        assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
      }

      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // verify table data correctness
      val rowsIter = bigquery.listTableData(tableId).getValues.iterator
      // 1 row of aaa
      assertEquals("aaa", rowsIter.next.get(0).getStringValue)
      // a few rows of bbb
      for (j <- 1 to next) {
        assertEquals("bbb", rowsIter.next.get(0).getStringValue)
      }
      // 10 rows of ccc, ddd
      for (j <- next + 1 until next + 1 + 10) {
        val temp = rowsIter.next
        assertEquals("ccc", temp.get(0).getStringValue)
        assertEquals("ddd", temp.get(1).getStringValue)
      }
      assertFalse(rowsIter.hasNext)
    } finally if (jsonStreamWriter != null) jsonStreamWriter.close()
  }
}
Does not work if appending rows with new schema right after updating table
test("Appending rows with new schema right after updating table") {
  val client = BigQueryWriteClient.create()
  val bigquery = BigQueryOptions.newBuilder
    .setProjectId("some-project")
    .build
    .getService
  val DATASET = "it-test"
  val tableName =
    "SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
  val tableId = TableId.of(DATASET, tableName)
  val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
  val originalSchema = Schema.of(col1)
  val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
  bigquery.create(tableInfo)
  val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
  val writeStream = client.createWriteStream(
    CreateWriteStreamRequest.newBuilder
      .setParent(parent.toString)
      .setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
      .build
  )
  try {
    val jsonStreamWriter =
      JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
    try { // write the 1st row
      var currentOffset = 0L
      val foo = new JSONObject()
      foo.put("col1", "aaa")
      val jsonArr = new JSONArray()
      jsonArr.put(foo)
      val response = jsonStreamWriter.append(jsonArr, currentOffset)
      currentOffset += jsonArr.length() - 1

      assertEquals(0L, response.get.getAppendResult.getOffset.getValue)

      assertEquals(
        0L,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // update schema with a new column
      val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
      val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
      val updatedTableInfo =
        TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
      val updatedTable = bigquery.update(updatedTableInfo)
      assertEquals(
        updatedSchema,
        updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
      )

      // write rows with updated schema.
      val updatedFoo = new JSONObject()
      updatedFoo.put("col1", "ccc")
      updatedFoo.put("col2", "ddd")
      val updatedJsonArr = new JSONArray()
      updatedJsonArr.put(updatedFoo)
      for (i <- 0 until 10) {
        currentOffset += updatedJsonArr.length()
        val response3 = jsonStreamWriter.append(updatedJsonArr, currentOffset) // Crashes here.
        assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
      }

      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // verify table data correctness
      val rowsIter = bigquery.listTableData(tableId).getValues.iterator
      // 1 row of aaa
      assertEquals("aaa", rowsIter.next.get(0).getStringValue)
      // 10 rows of ccc, ddd
      for (_ <- 1 until 10) {
        val temp = rowsIter.next
        assertEquals("ccc", temp.get(0).getStringValue)
        assertEquals("ddd", temp.get(1).getStringValue)
      }
      assertFalse(rowsIter.hasNext)
    } finally if (jsonStreamWriter != null) jsonStreamWriter.close()
  }

Error:

java.lang.IllegalArgumentException: JSONObject has fields unknown to BigQuery: root.col2.

Describe the solution you'd like
Follows two ideas but you are best suited to take design decisions.

  1. Transparent to the caller by, e.g., JsonStreamWriter recovering this error by checking the latest TableDefinition, however this API call can be costly depending on how often schema updates occur (in our case, rare). It may be a configurable option. Works for our use-case.
  2. Delegate the responsibility to the caller, e.g., JsonStreamWriter could support changing the underlying schema: JsonStreamWriter::setTableDefinition that does something similar to this. Edit: This introduces mutability so it is not the best solution.

Describe alternatives you've considered
Attempting to recreate the WriteStream, then JsonStreamWriter but it is disruptive given our Streaming approach.

Environment details

OS type and version: Mac OS M1
Java version: Coretto 11.0.12
bigquerystorage version(s): 2.8.0
bigquery: 2.5.1

Steps to reproduce

See second code sample provided. Happy to help!

Code example

See second code sample provided. I essentially removed the logic // continue writing rows until backend acknowledges schema update from the first sample with the BUFFERED mode.

Stack trace

java.lang.IllegalArgumentException: JSONObject has fields unknown to BigQuery: root.col2.
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. label Jan 6, 2022
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Jan 7, 2022
@stephaniewang526 stephaniewang526 added type: question Request for information or clarification. Not an issue. and removed triage me I really want to be triaged. labels Jan 9, 2022
@stephaniewang526 stephaniewang526 self-assigned this Jan 9, 2022
@stephaniewang526
Copy link
Contributor

Hi Bruno, thank you for your feedback.

We are aware of this limitation (delayed response of JsonStreamWriter in knowing when the schema is updated due to time taken in metadata updates). We added a new feature ignoreUnknownField which could mitigate the situation in error handling but is not solving the problem completely. Would it help with your use case?

The best way to solve this is for BigQueryStorage Write API to support JSON natively. This is something being discussed internally (cc @yirutang @yayi-google).

Meanwhile, we don't expect schema update to happen very frequently (as you've acknowledged). Please let us know if ignoreUnknownField helps.

Thanks!

@bphenriques
Copy link
Contributor Author

bphenriques commented Jan 10, 2022

Hello!

We are of of ignoreUnknownField but that approach leads to incomplete rows (as it ignores columns absent from the underlying schema) which is not acceptable in our-use.

Thank you

@bphenriques
Copy link
Contributor Author

bphenriques commented Jan 10, 2022

Update: We opted for a workaround: re-instantiate both WriteStream and JsonStreamWriter after updating the Table's schema. It is not ideal but it is not as disruptive as I initially thought, and is functionally correct.

Hopefully this workaround suffices in the foreseeable future. In any case, will leave the ticket open as it is still valid.

@stephaniewang526
Copy link
Contributor

How is that different than what has been implemented in #1447?

@bphenriques
Copy link
Contributor Author

WriteStream's automatic schema updates depends on further calls to append while the workaround does not.

In detail, WriteStream only updates the schema iff the response yields hasUpdatedSchema (source). This means that, if we want to trigger an update we need to keep calling the append API until the backend acknowledges (as also stated here) which is not feasible as we do not have rows to insert following the old schema (we are building a data-pipeline where the the schema may change anytime and we can't afford to have incomplete rows).

Given this scenario, the only way to have an up-to-date WriteStream+JsonStreamWriter without calling append is to re-instantiate them as stated in my workaround.

@yirutang
Copy link
Contributor

If you know explicitly when your schema is updated and when you need the new data in, you could just recreate a new JsonWriter instead if reusing the existing Json Writer.

@yirutang
Copy link
Contributor

Also why do you want to use BUFFERED mode to write your data?

@bphenriques
Copy link
Contributor Author

Thank you both for providing feedback.

If you know explicitly when your schema is updated and when you need the new data in, you could just recreate a new JsonWriter instead if reusing the existing Json Writer.

Oh, I see that we can use JsonStreamWriter.newBuilder("streamName", TableSchema.newBuilder()...). Is it possible to convert a com.google.cloud.bigquery.Schema to a com.google.cloud.bigquery.storage.v1.TableSchema? We have access to the former as it is required to update the table using the update table API. They seem to represent the same thing but are part of different Google Big Query libraries.

Also why do you want to use BUFFERED mode to write your data?

Seemed more suitable for our use-case: ingest information from our data pipelines to Big Query (Streaming approach). Probably the pending suffices but we haven't got the opportunity to explore that option.

@yirutang
Copy link
Contributor

yirutang commented Jan 10, 2022

There is something in the samples that does the conversion.
https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java

Yeah, in most cases pending mode is good enough for batching behavior, and committed is good for streaming behavior.

@bphenriques
Copy link
Contributor Author

There is something in the samples that does the conversion. https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java

Yeah, in most cases pending mode is good enough for batching behavior, and committed is good for streaming behavior.

Thank you! That really helped! That utility class seems useful enough to be exposed to others (rather than a sample).

As reference to others, I will leave the updated integration test that verifies that your solution works.

Manually updating the JsonStreamWriter Schema
test("Schema migrations using JsonStreamWriter fails if appending rows with new schema") {
  val client = BigQueryWriteClient.create()
  val bigquery = BigQueryOptions.newBuilder
    .setProjectId("project")
    .build
    .getService
  val DATASET = "dataset"
  val tableName =
    "SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
  val tableId = TableId.of(DATASET, tableName)
  val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
  val originalSchema = Schema.of(col1)
  val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
  bigquery.create(tableInfo)
  val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
  val writeStream = client.createWriteStream(
    CreateWriteStreamRequest.newBuilder
      .setParent(parent.toString)
      .setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
      .build
  )
  try {
    val jsonStreamWriter =
      JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
    try { // write the 1st row
      var currentOffset = 0L
      val foo = new JSONObject()
      foo.put("col1", "aaa")
      val jsonArr = new JSONArray()
      jsonArr.put(foo)
      val response = jsonStreamWriter.append(jsonArr, currentOffset)
      currentOffset += jsonArr.length() - 1

      assertEquals(0L, response.get.getAppendResult.getOffset.getValue)

      assertEquals(
        0L,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // update schema with a new column
      val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
      val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
      val updatedTableInfo =
        TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
      val updatedTable = bigquery.update(updatedTableInfo)
      assertEquals(
        updatedSchema,
        updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
      )

      // NEW: Manually update the underlying schema to the new one.
      Thread.sleep(30000) // wait a bit until BQ's backend is ready to accept new requests with the new schema. 30 is likely too much but works.
      val updatedJsonWriter = JsonStreamWriter.newBuilder(writeStream.getName, BqToBqStorageSchemaConverter.convertTableSchema(updatedSchema)).build()

      // write rows with updated schema.
      val updatedFoo = new JSONObject()
      updatedFoo.put("col1", "ccc")
      updatedFoo.put("col2", "ddd")
      val updatedJsonArr = new JSONArray()
      updatedJsonArr.put(updatedFoo)
      for (i <- 0 until 10) {
        currentOffset += updatedJsonArr.length()
        val response3 = updatedJsonWriter.append(updatedJsonArr, currentOffset)
        assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
      }

      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // verify table data correctness
      val rowsIter = bigquery.listTableData(tableId).getValues.iterator
      // 1 row of aaa
      assertEquals("aaa", rowsIter.next.get(0).getStringValue)
      // 10 rows of ccc, ddd
      for (_ <- 0 until 10) {
        val temp = rowsIter.next
        assertEquals("ccc", temp.get(0).getStringValue)
        assertEquals("ddd", temp.get(1).getStringValue)
      }
      assertFalse(rowsIter.hasNext)
    } finally if (jsonStreamWriter != null) jsonStreamWriter.close()
  }
}

// Add the content of https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java

@yirutang
Copy link
Contributor

Yeah, it was in the client lib, but the bigquery storage cannot reference bigquery, thus the function cannot be included in the client lib...

@stephaniewang526
Copy link
Contributor

Here is a bit more context on that decision. As things evolve, we can explore ways to improve the client if you have other suggestions.

@bphenriques
Copy link
Contributor Author

bphenriques commented Jan 11, 2022

Yeah, it was in the client lib, but the bigquery storage cannot reference bigquery, thus the function cannot be included in the client lib...

I see. Have you considered having a separate dependency that just holds the "model" (table definition, field definition, etc)? That way, both Big Query API and Big Query Storage API would use the same API to represent Big Query entities.

As things evolve, we can explore ways to improve the client if you have other suggestions.

Thank you both for your help. It still requires closing the previous JsonStreamWriter and re-create it. Ideally, the client could handle this transparently, the first proposal, where the JsonStreamWriter attempts to recover from schema mismatches.

In any case, we are not functionally blocked. The ticket itself still makes sense but I'll leave the decision to you on how you want to triage this. Happy to help if you need

@stephaniewang526
Copy link
Contributor

Thanks, closing for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants