Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-dbt] Add general attach_metadata method to chain async metadata fetches to dbt execution #21947

Merged
merged 6 commits into from
May 23, 2024

Conversation

benpankow
Copy link
Member

@benpankow benpankow commented May 17, 2024

Summary

Continues the restructuring of chaining async dbt work from #12943 to specify a attach_metadata method which can be used to specify a function which will run in a thread after each dbt event completes and will attach any additional user-specified metadata to that event before it is emitted.

Right now, it's classified as private so we can adjust the signature if need be before making it public - I'd like to unblock future work stacking on this shared impl, but I think we'll want to pass a single, proper context class rather than the loose invocation, event params.

Used internally for the fetch_row_count implementation.

Test Plan

Example unit test with SUMMARIZE SQL metadata.

Copy link
Member

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if it's ready for review -- It's still in draft, so I'm not sure if you were planning on more changes.

@benpankow benpankow force-pushed the benpankow/generic-dbt-3 branch 2 times, most recently from daf0867 to 7af52c3 Compare May 21, 2024 18:25
@benpankow
Copy link
Member Author

Marked as ready-for-review, changed method to be non-public for now since I am not confident in what signature we'll want to expose and want to unblock stacking on it (it's useful as an internal abstraction at least)

@benpankow benpankow marked this pull request as ready for review May 21, 2024 22:28
@benpankow benpankow requested a review from rexledesma May 21, 2024 22:28
@benpankow benpankow force-pushed the benpankow/generic-dbt-3 branch 2 times, most recently from 9adf0cb to fcfc015 Compare May 22, 2024 16:05
Base automatically changed from benpankow/generify-dbt-2 to master May 22, 2024 18:18
@benpankow benpankow force-pushed the benpankow/generic-dbt-3 branch 2 times, most recently from d3d3066 to 6609790 Compare May 23, 2024 03:44
@@ -985,6 +987,32 @@ def fetch_row_counts(
A set of corresponding Dagster events for dbt models, with row counts attached,
yielded in the order they are emitted by dbt.
"""
return self._attach_metadata(_fetch_row_count_metadata)

def _attach_metadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're planning on making this a public method in the future, right? We should think of that name here. We don't have to add the @public decorator though. I think map could be a compelling name here. We'd need to make the fn type simpler: Callable[DbtDagsterEventType, DbtDagsterEventType].

I can imagine this method to operate on modifying the general event stream, rather than just attaching metadata. This could be a pattern that exists outside of dagster-dbt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the function is is useful as an internal simplification/util. If we want to expose it publicly, landing on the right name/signature is more tricky and warrants a more deliberate discussion. There are trade-offs either way - if we want to make it easy for users to easily attach metadata, we can more easily cache results or fold map calls together vs giving them power to map over all events. Either way I think it's beyond the scope of this PR and I don't feel that it's blocking what's here - we're not committing to anything name or signature wise.

"""Runs a threaded task to attach metadata to each event in the iterator.

Args:
fn (Callable[[DbtCliInvocation, DbtDagsterEventType], Optional[Dict[str, Any]]]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think including DbtCliInvocation is unnecessary.

  • We can access it from self, if we make _dbt_cli_invocation public.
  • From the user's perspective, DbtCliInvocation will be accessible from the scope outside of the closure, so it's repetitive.

Copy link
Member Author

@benpankow benpankow May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the PR description, I think the signature here will change pretty meaningfully if we expose it publicly - probably some sort of context class rather than the loose arguments that it takes now. I'd prefer to leave as-is for now and re-examine it down the line if we want to expose this method publicly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I missed that. Makes sense.

Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]
):
with ThreadPoolExecutor(
max_workers=self._dbt_cli_invocation.postprocessing_threadpool_num_threads,
thread_name_prefix="fetch_row_counts",
thread_name_prefix=fn.__name__,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we would have to check this: I'm wondering what would happen if the user passes in a lambda, rather than a declared function. We might have to manage the name on their behalf or something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a prefix to the fn name which at least points to this being the mapping fn, but not sure what else we can do here. If a user ends up defining many custom lambda postprocessing steps and has to figure out which isn't working and can't figure it out from stacktrace/other context they can always give them names.

if result is None:
return event

return event.with_metadata({**event.metadata, **result})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing here: if we simplify the fn type, we'd expect users to call .with_metadata.

I think a better name for .with_metadata is .replace_metadata.

And for most users, they would call something like .add_metadata, which would call .replace_metadata, but tack on the **event.metadata on the user's behalf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - we tend to use the with_ pattern in many places for replacement behavior (e.g. AssetsDefinition) but the behavior is definitely not clear from name alone (vs a clear distinction if we used with_added_ and replace_ instead)

Copy link
Member

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving since this is basically just an internal refactor

@benpankow benpankow merged commit 361184c into master May 23, 2024
1 check passed
@benpankow benpankow deleted the benpankow/generic-dbt-3 branch May 23, 2024 16:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants