Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-dbt] Refactor row count collection code #21943

Merged
merged 7 commits into from
May 22, 2024

Conversation

benpankow
Copy link
Member

@benpankow benpankow commented May 17, 2024

Summary

In preparation for supporting other post-dbt-materialization logic, moves the bulk of the "map operation across dbt events" logic to a util. Stacked PR will move to implementing an even more generic "map" style operation on DbtEventIterator.

Test Plan

Existing unit tests.

@benpankow benpankow changed the title refactor dbt mapping logic [dagster-dbt] Refactor row count collection code May 17, 2024
@benpankow benpankow requested review from sryza and rexledesma and removed request for sryza May 17, 2024 18:52
@benpankow benpankow marked this pull request as ready for review May 17, 2024 18:52
@benpankow benpankow force-pushed the benpankow/generify-dbt-2 branch 2 times, most recently from 02be352 to abb8b7e Compare May 20, 2024 23:22
Copy link
Member

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments

@@ -697,7 +697,9 @@ def stream(
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
"""
return DbtEventIterator(self._stream_asset_events(), self)
return DbtEventIterator(
self._stream_asset_events(), self, ThreadPoolExecutor(STREAM_EVENTS_THREADPOOL_SIZE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We should ensure that the threadpool's context is properly handled.
  • STREAM_EVENTS_THREADPOOL_SIZE should potentially be modifiable by the user. Perhaps it should be set similar to termination_timeout_seconds, so that it can be overridden if needed.

Copy link
Member Author

@benpankow benpankow May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to a num_threads param on each call which gives users the ability to control how much fan-out they want at each step & avoids us having to thread through the threadpool to each iterator, where managing lifecycle can be tricky (since no single iterator "owns" the pool). Since the number of chained calls is small, the overhead from opening a new pool a few more times should be minimal.

# as the one dbt uses, is open.
try:
from dbt.adapters.duckdb import DuckDBAdapter
with pushd(str(self._dbt_cli_invocation.project_dir)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that this folder management scheme might bite us in the future...

In a separate PR, could we consider lumping this pushd behavior explicitly when the .adapter property of DbtCliInvocation is accessed?


@public
@experimental
def fetch_row_counts(
self,
self, *, num_threads=DEFAULT_EVENT_POSTPROCESSING_THREADPOOL_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels off, since this implies that num_threads would also be an argument for other methods that emit metadata for the DbtEventIterator (e.g. .fetch_column_schema).

Could we instead just have it as a property on DbtCliInvocation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to have it as a kwarg param on each of the chained/builder methods, so users could specify a different threadpool size for each.

The previous shared threadpool approach is tricky because of its lifecycle - there's no clear "owner" or with-context scope we can use. Could move the configuration option to DbtCliInvocation but still have separate threadpools?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move the configuration option to DbtCliInvocation but still have separate threadpools?

Yeah I think this is preferable.

Copy link
Member

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to expose num_threads -- it should just be a property on DbtCliInvocation for now.


@public
@experimental
def fetch_row_counts(
self,
self, *, num_threads=DEFAULT_EVENT_POSTPROCESSING_THREADPOOL_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move the configuration option to DbtCliInvocation but still have separate threadpools?

Yeah I think this is preferable.

):
with ThreadPoolExecutor(
max_workers=self._dbt_cli_invocation.postprocessing_threadpool_num_threads,
thread_name_prefix="fetch_row_counts_",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there's already a separator added, so no need to add that manually

Suggested change
thread_name_prefix="fetch_row_counts_",
thread_name_prefix="fetch_row_counts",

@benpankow benpankow merged commit b3cdcb2 into master May 22, 2024
1 check passed
@benpankow benpankow deleted the benpankow/generify-dbt-2 branch May 22, 2024 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants