-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-dbt] Refactor row count collection code #21943
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @benpankow and the rest of your teammates on Graphite |
4fd65d3
to
ab4cd89
Compare
02be352
to
abb8b7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments
@@ -697,7 +697,9 @@ def stream( | |||
def my_dbt_assets(context, dbt: DbtCliResource): | |||
yield from dbt.cli(["run"], context=context).stream() | |||
""" | |||
return DbtEventIterator(self._stream_asset_events(), self) | |||
return DbtEventIterator( | |||
self._stream_asset_events(), self, ThreadPoolExecutor(STREAM_EVENTS_THREADPOOL_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should ensure that the threadpool's context is properly handled.
STREAM_EVENTS_THREADPOOL_SIZE
should potentially be modifiable by the user. Perhaps it should be set similar totermination_timeout_seconds
, so that it can be overridden if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to a num_threads
param on each call which gives users the ability to control how much fan-out they want at each step & avoids us having to thread through the threadpool to each iterator, where managing lifecycle can be tricky (since no single iterator "owns" the pool). Since the number of chained calls is small, the overhead from opening a new pool a few more times should be minimal.
# as the one dbt uses, is open. | ||
try: | ||
from dbt.adapters.duckdb import DuckDBAdapter | ||
with pushd(str(self._dbt_cli_invocation.project_dir)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid that this folder management scheme might bite us in the future...
In a separate PR, could we consider lumping this pushd
behavior explicitly when the .adapter
property of DbtCliInvocation
is accessed?
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
abb8b7e
to
b35eeb9
Compare
|
||
@public | ||
@experimental | ||
def fetch_row_counts( | ||
self, | ||
self, *, num_threads=DEFAULT_EVENT_POSTPROCESSING_THREADPOOL_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels off, since this implies that num_threads
would also be an argument for other methods that emit metadata for the DbtEventIterator
(e.g. .fetch_column_schema
).
Could we instead just have it as a property on DbtCliInvocation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to have it as a kwarg param on each of the chained/builder methods, so users could specify a different threadpool size for each.
The previous shared threadpool approach is tricky because of its lifecycle - there's no clear "owner" or with
-context scope we can use. Could move the configuration option to DbtCliInvocation
but still have separate threadpools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could move the configuration option to DbtCliInvocation but still have separate threadpools?
Yeah I think this is preferable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to expose num_threads
-- it should just be a property on DbtCliInvocation
for now.
965d062
to
5285767
Compare
|
||
@public | ||
@experimental | ||
def fetch_row_counts( | ||
self, | ||
self, *, num_threads=DEFAULT_EVENT_POSTPROCESSING_THREADPOOL_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could move the configuration option to DbtCliInvocation but still have separate threadpools?
Yeah I think this is preferable.
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
5285767
to
d7fe9db
Compare
): | ||
with ThreadPoolExecutor( | ||
max_workers=self._dbt_cli_invocation.postprocessing_threadpool_num_threads, | ||
thread_name_prefix="fetch_row_counts_", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there's already a separator added, so no need to add that manually
thread_name_prefix="fetch_row_counts_", | |
thread_name_prefix="fetch_row_counts", |
0a33daf
to
a74481a
Compare
Summary
In preparation for supporting other post-dbt-materialization logic, moves the bulk of the "map operation across dbt events" logic to a util. Stacked PR will move to implementing an even more generic "map" style operation on
DbtEventIterator
.Test Plan
Existing unit tests.