Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): support variadic appends #5102

Merged
merged 10 commits into from Dec 1, 2021

Conversation

shollyman
Copy link
Contributor

@shollyman shollyman commented Nov 9, 2021

BREAKING CHANGE: adds a variadic option to the AppendRows() method, removes offset argument

This updates the call signature to allow variadic appends, and introduces two new AppendOption options: one for setting the offset in an optional fashion (WithOffset()), and one for updating the schema (UpdateSchemaDescriptor()).

Due to current API limitations, this means that we need to close/reconnect the open connection when this option is passed. However, this should be resolved in the backend eventually. Internal issue 205756033 tracks this.

In practice this means the following changes need attention by consumers of this library:

For the "don't set an offset" behavior:

  • Old: mystream.AppendRows(ctx, data, managedwriter.NoStreamOffset)
  • New: mystream.AppendRows(ctx, data)

For the "set an an offset" behavior:

  • Old: mystream.AppendRows(ctx, data, offset)
  • New: mystream.AppendRows(ctx, data, managedwriter.WithOffset(offset))

@product-auto-label product-auto-label bot added the api: bigquery Issues related to the BigQuery API. label Nov 9, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Nov 9, 2021
@shollyman shollyman marked this pull request as ready for review November 9, 2021 22:08
@shollyman shollyman requested a review from a team November 9, 2021 22:08
@shollyman shollyman requested a review from a team as a code owner November 9, 2021 22:08
@shollyman shollyman requested review from tswast and codyoss and removed request for loferris November 9, 2021 23:08
Copy link
Member

@codyoss codyoss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from a Go perspective. I will let Tim sign off from the BQ perspective.

Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions.

@@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
// append request.
type pendingWrite struct {
request *storagepb.AppendRowsRequest
result *AppendResult
// for schema evolution cases, accept a new schema
newSchema *descriptorpb.DescriptorProto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this property need to be removed once 205756033 is resolved? It might be redundant with AppendRowsRequest.proto_rows.writer_schema.proto_descriptor.

Or am I understanding that email thread that we won't be exposing the underlying AppendRowsRequest to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this veneer I'm wrapping the append request. My expectation is we'll end up with per-stream caches of schema and per-stream append queues for the multiplexing case. Currently we just retain a single schema and append queue (though in go it's a channel rather than a queue).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also the reason why you can optionally have a new schema updated as part of a return error. I was puzzled how one can automatically fix schema updates on runtime though? Is there any realistic example of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the returned schema is the notification of schema extension. It's a case where you do something like an ALTER TABLE ADD COLUMN or extend schema via tables.update while streaming data, and the change gets acknowledged by the streaming backend by setting the new schema. My expectation is we'll add a callback registration for this as well, but not in this change.


// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. So the users don't see the stream ID, either? I guess that gives us enough flexibility for the current schema evolution workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's effectively two paths to getting a managed stream: allow NewManagedStream() to deal with the stream construction by specifying table/type/etc, or do it yourself and pass it in via WithStreamName() option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect users who explicitly do stream construction to be more likely to be doing dynamic proto schema stuff, as one of the things you get back from stream creation is table schema.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only thing I was wondering is why in a typed language we are in this API passing it as a string. Wouldn't it make much more sense to actually request 3 separate parameters or some kind of struct if you want to make some of it optional. Having to format it ourselves feels a bit weird. I mean, it's possible, and I've done so, but it feels odd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons: this is a bit of a standard in the APIs (see aip.dev for more context), and we want to avoid some circular dependencies by having the managedwriter depend on cloud.google.com/go/bigquery directly.

#5017 will make it easier to generate the string for the table resources if you end up using this option from a bigquery resource.

return ms.arc, ms.pending, nil
}
if arc != ms.arc && forceReconnect && ms.arc != nil {
// TODO: is closing send sufficient?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO was verified by integration test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we need to create a new stream if it's not the default stream?

// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible
// with the schema currently set for the stream.
//
// Use the sentinel value NoStreamOffset to omit sending of the offset value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my testing, they must send an offset if using PENDING mode. And they can't if using default stream, right? I didn't test with BUFFERED, so maybe it's optional there? Maybe there's a better way to communicate when NoStreamOffset should be set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly enough, this gets to the other breaking change I've been considering here. Remove offset as a static argument from the AppendRows() function and add a WithOffset() AppendOption.

The origin of the NoStreamOffset was to simplify the AppendResult, as Go's lack of null vs default value makes it more complex to deal with the optional offset. I didn't want to do *int64, but it's an option to consider as well.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is already a kind of managed writer i wonder if this cannot be abstracted away completely? But I guess to not force how to handle errors in a retry-able manner it is needed to expose it? Either way, I do not find the NoStreamOffset sentient value a big deal, works fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went ahead and made the offset part of the variadic options.

This changes the signatures for appending to:

  • No offset set: <ManagedStream>.AppendRows(ctx, data)
  • Offset set: <ManagedStream>.AppendRows(ctx, data, WithOffset(offset))


// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only thing I was wondering is why in a typed language we are in this API passing it as a string. Wouldn't it make much more sense to actually request 3 separate parameters or some kind of struct if you want to make some of it optional. Having to format it ourselves feels a bit weird. I mean, it's possible, and I've done so, but it feels odd.

Value: proto.Int64(180),
Other: proto.String("hello evolution"),
}
descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I find one of the more harder parts as a beginner to start making use of proto models for streaming into the storage API. It's quite a chain of commands that aren't really something you would ever figure out by just looking at the API. Only way I learned how to do this was by checking these examples. I wonder if somehow there isn't an easier way we can make it just pass in a the generated proto type somehow. Dunno. Haven't found an easy way myself for my bqwriter wrapper, otherwise I would have already done it.

return ms.arc, ms.pending, nil
}
if arc != ms.arc && forceReconnect && ms.arc != nil {
// TODO: is closing send sufficient?
(*ms.arc).CloseSend()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this smells fishy, or is it just me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a temporary issue until the backend allows schema change on an already open stream connection; I'm not enamored of it but this will get cleaned up.

// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible
// with the schema currently set for the stream.
//
// Use the sentinel value NoStreamOffset to omit sending of the offset value.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is already a kind of managed writer i wonder if this cannot be abstracted away completely? But I guess to not force how to handle errors in a retry-able manner it is needed to expose it? Either way, I do not find the NoStreamOffset sentient value a big deal, works fine.

Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppendOption design LGTM, but one worry:

By hiding the fact that we're recreating the stream, does it make the use of offset harder to understand / make it a breaking change when the API no longer requires a stream reset? Though, perhaps there's some signal we send at the end of the stream so that folks don't need to know specifically that a schema change could cause that?

@shollyman
Copy link
Contributor Author

AppendOption design LGTM, but one worry:

By hiding the fact that we're recreating the stream, does it make the use of offset harder to understand / make it a breaking change when the API no longer requires a stream reset? Though, perhaps there's some signal we send at the end of the stream so that folks don't need to know specifically that a schema change could cause that?

Management of the network stream connection is abstracted from the user. This PR uses a CloseSend() on the network stream to cleanly signal a new connection: existing appends in flight will still process on the recv side, and the next append (either new or due to a retry) will pick up a new connection. Schema change notification from the backend isn't currently in this veneer (but will come in a future PR).

If you change the schema in a compatible way, retrying an old proto message with a new schema that has additional fields/tags shouldn't be an issue as that's the power of proto extension in a nutshell. If the table is changed to an incompatible schema, the stream itself is invalid (for explicitly created streams). Default streams are a special case here, but essentially a similar metadata inconsistency like the existing tabledata.insertall when schema changes arrive.

Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after clarifying offline that it's reconnecting but not creating a new (backend) stream. Hooray for ambiguous streams 🙄

@shollyman shollyman merged commit 014b314 into googleapis:main Dec 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the BigQuery API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants