Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Organize, document, and simplify transforms.py #726

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

moradology
Copy link
Contributor

@moradology moradology commented Apr 4, 2024

What I've attempted to do here:

  1. Unified a type union here. This runtime bug is actually what got me started on the PR. The fix for this is turning compile-time (AST-parse-time) values into beam.pvalues which can then be used as side inputs as though they were, themselves, computed by the cluster. Where sensible, I've attempted to logically separate the provisioning of compile-time and runtime arguments. This takes the form of pushing side-inputs (runtime) into the expand method and constructing classes with compile-time values.
  2. Broke out some transforms that were little more than beam.Map calls into their functions. We can simply call beam.Map in the higher-level transform. Should make reuse more flexible and testing easier (not that I think we need more tests honestly)
  3. Added documentation where I thought it would be helpful. renamed functions to better clarify their intent.
  4. Reduced function surface area in storetozarr and documented how a user can write equivalent dynamic chunking functions

Copy link
Member

@cisaacstern cisaacstern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moradology very awesome insight re: type union, and elegant fixes! I'll freely confess some of the fixes fly above my current level of understanding of Beam composition, but generally feel quite elegant, and if the tests pass, they pass!

Made a few comments, one of which is just a gut check on function naming (the bytes thing)... and the only other of real importance I think is the question about blocking behavior in StoreToZarr.

Really inspiring work! I always learn something from reading your code! Thanks so much.

@@ -63,12 +63,13 @@ class MergeDim(CombineDim):
operation: ClassVar[CombineOp] = CombineOp.MERGE


def augment_index_with_start_stop(
def augment_index_with_byte_range(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is indexing logical dimensions in the dataset, not bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me as though this function uses the logical position of the provided index to calculate start/stop in terms of bytes, no? Like, the sum of item lengths calculated prior to it is determined based on logical position but the resulting integer is supposed to be the byte-range start is how i read it

position: Position,
item_lens: List[int],
append_offset: int = 0,
) -> IndexedPosition:
"""Take an index _without_ start / stop and add them based on the lens defined in sequence_lens.
"""Take an index _without_ start / stop (byte range) and add them based on the lens defined in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above re: bytes.

)
| beam.GroupByKey() # this has major performance implication
| beam.MapTuple(combine_fragments)
| "group by write chunk key" >> beam.GroupByKey() # group by key ensures locality
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! this is very helpful developer commentary 😃

Comment on lines -656 to -657
# TODO: make it so we don't have to explicitly specify combine_dims
# Could be inferred from the pattern instead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're abandoning this idea? I have had my doubts that it's possible, since it would be a meta-operation over the pipeline. But i've been wrong about what's possible in beam before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this reads to me right now, we'd need to pass the same argument to a couple of different transforms (which isn't a bad pattern afaict). To reuse such a value, we'd need to thread the input through as some kind of computed result and I doubt that juice is worth the squeeze

I'm not opposed to keeping the TODO open, if that's desirable. I just couldn't quite imagine how this could be facilitated at the level of recipes

Comment on lines +690 to +691
| "make template dataset" >> beam.Map(schema_to_template_ds)
| "generate chunks dynamically" >> beam.Map(self.dynamic_chunking_fn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these labels really help readability! tysm!

Comment on lines 711 to 717
n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store)
singleton_target_store = (
n_target_stores
| beam.combiners.Sample.FixedSizeGlobally(1)
| beam.FlatMap(lambda x: x) # https://stackoverflow.com/a/47146582
).expand(schema, beam.pvalue.AsSingleton(target_chunks_pcoll))

# Actually attempt to write datasets to their target bytes/files
rechunked_datasets | "write chunks" >> beam.Map(
store_dataset_fragment, target_store=beam.pvalue.AsSingleton(target_store)
)

return singleton_target_store
# return the target store pcollection (a singleton of the fsspec target)
return target_store
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we did this n_target_stores -> FixedSizeGlobally(1) nonsense before (and sorry for not documenting this in comments or otherwise!) ... was that without doing this, Beam will not wait on all chunks to be finished writing before yielding back the target store... and in this case, I'm pretty sure we do want to enforce blocking behavior because if we don't, then if we try to do:

... | StoreToZarr() | ConsolidateMetadata()
... | StoreToZarr() | ValidateStoredZarr()

or what-have-you, then the transform that is chained onto StoreToZarr will receive it's input before chunks have finished writing. Which will make it impossible to consolidate or validate, etc.

So basically, n_target_stores -> FixedSizeGlobally(1) ensures we block, and don't let the output of StoreToZarr be emitted downstream until all chunks are written.

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Makes total sense

Comment on lines 644 to -665
dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None
dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an advantage to lower arity here aside from an aesthetically tidier signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tidy signature is much of the benefit I see but also hopefully passing responsibility downstream to pipeline writers/users in a way that can't hide any magic we might be applying. On this conception of things, the user has one thing to worry about: a function that produces the chunking they desire given a template dataset

Comment on lines 617 to 635

Example of using a wrapper function to reduce the arity of a more complex dynamic_chunking_fn:

Suppose there's a function `calculate_dynamic_chunks` that requires extra parameters: an
`xarray.Dataset`, a `target_chunk_size` in bytes, and a `dim_name` along which to chunk.
To fit the expected signature for `dynamic_chunking_fn`, we can define a wrapper function
that presets `target_chunk_size` and `dim_name`:

```python
def calculate_dynamic_chunks(ds, target_chunk_size, dim_name) -> Dict[str, int]:
...

def dynamic_chunking_wrapper(ds: xarray.Dataset) -> Dict[str, int]:
target_chunk_size = 1024 * 1024 * 10
dim_name = 'time'
return calculate_dynamic_chunks(ds, target_chunk_size, dim_name)

StoreToZarr(..., dynamic_chunking_fn=dynamic_chunking_wrapper, ...)
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this not rendering correctly. Not sure what your intended design is here, but maybe the paragraph starting with Suppose.. should be dedented and the ```python fence is not necessary if you're indented?

See https://pangeo-forge--726.org.readthedocs.build/en/726/api_reference.html#ptransforms
Screen Shot 2024-04-04 at 4 05 58 PM

@moradology
Copy link
Contributor Author

I should have called this out in the PR description, but I somehow failed to include it. The one piece I'm not yet sure about is this small change to tests - the argument works as expected but it isn't obvious to me why that's now required. Before +1, I'll need to double-check expected behavior

https://github.com/pangeo-forge/pangeo-forge-recipes/pull/726/files#diff-a4a614c9bc88758962ff69a4056481f9f9ae8930d92e4ce1ad8daab54a452b30R277

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants