Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow intra-file source parallelism #380

Open
davidselassie opened this issue Jan 22, 2024 · 1 comment
Open

Allow intra-file source parallelism #380

davidselassie opened this issue Jan 22, 2024 · 1 comment
Labels
needs triage New issue, needs triage type:feature New feature request

Comments

@davidselassie
Copy link
Contributor

Inspired by #379

{CSV,Dir,File}Source all currently use each entire file as the unit of parallelism and emit a single partition in list_parts for each file name. (With some trickery surrounding get_fs_id so that you can mark files with the same name as being unique if they're on different mounts.)

It would be a nice addition to add a feature which allows stateful parallel reading of a single file that is accessible from multiple workers. The core idea is to define a partition not as a whole file, but a specific chunk of lines in a file and allow multiple workers to declare they can read that.

Implementation

We could add an argument to these sources part_line_size. list_parts look at the files that worker has available, their sizes, and declares a partition for each chunk up to the size part_line_size. (Or use byte offsets instead of lines. There's some tricky implementation details here to find consistent line boundaries across chunks to accidentally prevent double reading a line.) Then in build_part, parse the partition to see what offsets in the file you should read.

Caveats

As with basically all performance based features, turning this knob up does not always result in more performance. It is best used in the case where worker_count >> file_count and every worker can read all files. Then you can get more parallelism.

@davidselassie davidselassie added the type:feature New feature request label Jan 22, 2024
@github-actions github-actions bot added the needs triage New issue, needs triage label Jan 22, 2024
@davidselassie davidselassie removed the needs triage New issue, needs triage label Jan 22, 2024
@davidselassie davidselassie added the needs triage New issue, needs triage label Jan 22, 2024
@davidselassie
Copy link
Contributor Author

davidselassie commented Jan 22, 2024

There is an example implementation of chunk finding logic that could be adapted in

bytewax/examples/1brc.py

Lines 43 to 58 in 05a4f37

def build(
self, now: datetime, worker_index: int, worker_count: int
) -> FilePartition:
file_size = self._path.stat().st_size
chunk_size = file_size // worker_count
start_offset = worker_index * chunk_size
end_offset = (worker_index + 1) * chunk_size
with open(self._path, "rb") as f:
if start_offset > 0:
f.seek(start_offset)
f.readline()
start_offset = f.tell()
if end_offset < file_size:
f.seek(end_offset)
f.readline()
end_offset = f.tell()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage New issue, needs triage type:feature New feature request
Projects
None yet
Development

No branches or pull requests

1 participant