-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-dbt] Add general attach_metadata method to chain async metadata fetches to dbt execution #21947
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @benpankow and the rest of your teammates on Graphite |
4fd65d3
to
ab4cd89
Compare
274a65e
to
f42ccaa
Compare
ab4cd89
to
02be352
Compare
f42ccaa
to
216a3d2
Compare
02be352
to
abb8b7e
Compare
216a3d2
to
aa8a372
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if it's ready for review -- It's still in draft, so I'm not sure if you were planning on more changes.
abb8b7e
to
b35eeb9
Compare
daf0867
to
7af52c3
Compare
Marked as ready-for-review, changed method to be non-public for now since I am not confident in what signature we'll want to expose and want to unblock stacking on it (it's useful as an internal abstraction at least) |
965d062
to
5285767
Compare
6d1efcc
to
5c33b4d
Compare
5285767
to
d7fe9db
Compare
9adf0cb
to
fcfc015
Compare
0a33daf
to
a74481a
Compare
fcfc015
to
e70d560
Compare
d3d3066
to
6609790
Compare
@@ -985,6 +987,32 @@ def fetch_row_counts( | |||
A set of corresponding Dagster events for dbt models, with row counts attached, | |||
yielded in the order they are emitted by dbt. | |||
""" | |||
return self._attach_metadata(_fetch_row_count_metadata) | |||
|
|||
def _attach_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're planning on making this a public method in the future, right? We should think of that name here. We don't have to add the @public
decorator though. I think map
could be a compelling name here. We'd need to make the fn type simpler: Callable[DbtDagsterEventType, DbtDagsterEventType]
.
I can imagine this method to operate on modifying the general event stream, rather than just attaching metadata. This could be a pattern that exists outside of dagster-dbt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the function is is useful as an internal simplification/util. If we want to expose it publicly, landing on the right name/signature is more tricky and warrants a more deliberate discussion. There are trade-offs either way - if we want to make it easy for users to easily attach metadata, we can more easily cache results or fold map calls together vs giving them power to map over all events. Either way I think it's beyond the scope of this PR and I don't feel that it's blocking what's here - we're not committing to anything name or signature wise.
"""Runs a threaded task to attach metadata to each event in the iterator. | ||
|
||
Args: | ||
fn (Callable[[DbtCliInvocation, DbtDagsterEventType], Optional[Dict[str, Any]]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think including DbtCliInvocation
is unnecessary.
- We can access it from
self
, if we make_dbt_cli_invocation
public. - From the user's perspective,
DbtCliInvocation
will be accessible from the scope outside of the closure, so it's repetitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the PR description, I think the signature here will change pretty meaningfully if we expose it publicly - probably some sort of context class rather than the loose arguments that it takes now. I'd prefer to leave as-is for now and re-examine it down the line if we want to expose this method publicly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I missed that. Makes sense.
Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]] | ||
): | ||
with ThreadPoolExecutor( | ||
max_workers=self._dbt_cli_invocation.postprocessing_threadpool_num_threads, | ||
thread_name_prefix="fetch_row_counts", | ||
thread_name_prefix=fn.__name__, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm we would have to check this: I'm wondering what would happen if the user passes in a lambda, rather than a declared function. We might have to manage the name on their behalf or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a prefix to the fn name which at least points to this being the mapping fn, but not sure what else we can do here. If a user ends up defining many custom lambda postprocessing steps and has to figure out which isn't working and can't figure it out from stacktrace/other context they can always give them names.
if result is None: | ||
return event | ||
|
||
return event.with_metadata({**event.metadata, **result}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing here: if we simplify the fn
type, we'd expect users to call .with_metadata
.
I think a better name for .with_metadata
is .replace_metadata
.
And for most users, they would call something like .add_metadata
, which would call .replace_metadata
, but tack on the **event.metadata
on the user's behalf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - we tend to use the with_
pattern in many places for replacement behavior (e.g. AssetsDefinition
) but the behavior is definitely not clear from name alone (vs a clear distinction if we used with_added_
and replace_
instead)
6609790
to
2dad48e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving since this is basically just an internal refactor
Summary
Continues the restructuring of chaining async dbt work from #12943 to specify a
attach_metadata
method which can be used to specify a function which will run in a thread after each dbt event completes and will attach any additional user-specified metadata to that event before it is emitted.Right now, it's classified as private so we can adjust the signature if need be before making it public - I'd like to unblock future work stacking on this shared impl, but I think we'll want to pass a single, proper context class rather than the loose
invocation
,event
params.Used internally for the
fetch_row_count
implementation.Test Plan
Example unit test with
SUMMARIZE
SQL metadata.