Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data.equals: add unit test & migrate to Dask #254

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
6eafaf5
Remove redundant tests on log messages; now test_Data_equals passes
sadielbartholomew Nov 5, 2021
2f16574
Flesh out docstring for new internal utility method _da_ma_allclose()
sadielbartholomew Nov 8, 2021
22a0fee
Document undoc'd kwargs for Data.equals() method
sadielbartholomew Nov 8, 2021
309e91d
Add note in Data.equals() providing & regarding old log message
sadielbartholomew Nov 8, 2021
419e6de
Set all daskified decorator log levels to default, to adapt as req.
sadielbartholomew Aug 27, 2021
9735498
Add basic (excluding parameters) unit test for Data.equals
sadielbartholomew Aug 27, 2021
caad9cb
Migrate Data.equals from LAMA to Dask
sadielbartholomew Aug 27, 2021
90af93b
Address several DeprecationWarning warnings raised by pytest
sadielbartholomew Aug 31, 2021
6c1fff5
Improve test by adding assertions on raised log messages
sadielbartholomew Aug 31, 2021
0e916c0
Fix equality test w/ numpy inf & NaN by comparing to array of same dtype
sadielbartholomew Aug 31, 2021
ea54b2c
Complete Data.equals unit test by covering all valid parameters
sadielbartholomew Aug 31, 2021
bf3d58a
Add missing closibng parenthesis in log message for differing Data units
sadielbartholomew Aug 31, 2021
d85b5cf
Conversions from str.format() to f-strings
sadielbartholomew Sep 2, 2021
ee6ed32
New approach to equals() migration: define own da.ma.allclose
sadielbartholomew Nov 5, 2021
8a4005b
Potential alt. approach: pre-compute result inside Data.equals()
sadielbartholomew Nov 9, 2021
7a01cbe
Add developer notes on questions RE laziness of execution
sadielbartholomew Nov 9, 2021
73ebee7
Update developer notes with further points from discussion
sadielbartholomew Nov 9, 2021
db3b871
Update _da_ma_allclose to use blockwise() in line with DH suggestion
sadielbartholomew Nov 10, 2021
cbc67a5
Add checking on scalar inputs to test_Data_equals
sadielbartholomew Nov 10, 2021
053c467
Add test module for the Data utility functions, test_Data_utils
sadielbartholomew Nov 10, 2021
845255b
Some fixes to logic in _da_ma_allclose
sadielbartholomew Nov 11, 2021
2b73fad
Move _da_ma_allclose from data.utils to data.dask_utils
sadielbartholomew Nov 12, 2021
1cf0312
Improve handling of scalars and 0-d arrays in _da_ma_allclose
sadielbartholomew Nov 12, 2021
97478b2
Update _da_ma_allclose test to highlight difference w/ Data.equals
sadielbartholomew Nov 15, 2021
8db307b
Update masked array coverage in test_Data_equals
sadielbartholomew Nov 15, 2021
63161e6
Add self-equality & masked string array checks to test_Data_equals
sadielbartholomew Nov 16, 2021
b78551b
_da_ma_allclose: set kwargs in inner function w/ note explaining why
sadielbartholomew Nov 17, 2021
153dee0
Update developer notes with agreed approach to cf.Data method laziness
sadielbartholomew Nov 17, 2021
71f1110
Re-organise developer notes to tidy
sadielbartholomew Nov 17, 2021
69a8d33
Add utility function to check if an array is of numeric dtype
sadielbartholomew Nov 18, 2021
d98feca
Data.equals: use appropriate data comparison for non-numeric dtypes
sadielbartholomew Nov 18, 2021
e14f246
Debugging of data comparison: add print statements to illustrate
sadielbartholomew Nov 19, 2021
3053ccb
CI: add test_Data_utils to Dask testing workflow
sadielbartholomew Nov 24, 2021
c227e49
Address DH feedback by using cf defaults for rtol & atol values
sadielbartholomew Jan 10, 2022
f1094d3
Update self-equality tests in test_Data_equals to use copies
sadielbartholomew Jan 10, 2022
1d2a674
Address DH feedback: update expected test result for special case
sadielbartholomew Jan 11, 2022
66e7a05
Update development notes for cf.Data RE arrays that may be masked
sadielbartholomew Nov 24, 2021
3e1aa67
Remove maximum version spec. for cfdm for ease of development
sadielbartholomew Nov 24, 2021
b09a57a
Indicate tests failing due to inconsistent cfdm Data.equals
sadielbartholomew Jan 12, 2022
f71353f
Set masked_equal=True inside Data.equals to fix self-equality tests
sadielbartholomew Jan 12, 2022
b16f610
Merge branch 'lama-to-dask' into lama-to-dask-6-new
sadielbartholomew Jan 12, 2022
d3d5198
Tidy PR 254: remove all dev. aid print statements
sadielbartholomew Jan 12, 2022
2f14b3e
Update Data.equals to check for strict datatype equality
sadielbartholomew Jan 13, 2022
e296061
Comments in preparation for eventual equal_nan kwarg to Data.equals
sadielbartholomew Jan 17, 2022
5d6838a
Update cf/data/utils.py: more accurate docstring
sadielbartholomew Jan 18, 2022
201a91f
Update cf/data/dask_utils.py: improve docstring formatting
sadielbartholomew Jan 18, 2022
981ad20
Update cf/test/test_Data.py: test equality to self (not a copy)
sadielbartholomew Jan 18, 2022
bc9a434
Update cf/data/dask_utils.py: document kwarg type
sadielbartholomew Jan 18, 2022
4c86612
Merge branch 'lama-to-dask' into lama-to-dask-6-new
sadielbartholomew Jan 18, 2022
42e6263
Address DH feedback by removing loop over pairs in test_Data_equals
sadielbartholomew Jan 18, 2022
3846d07
Address DH feedback: do not check numpy arrays in test_Data_utils
sadielbartholomew Jan 18, 2022
35b5adb
Address DH feedback: adjust test_Daya_equals RE string dtypes
sadielbartholomew Jan 18, 2022
7cd2e89
Address DH feedback: use multiple dask chunks in test_Data_equals
sadielbartholomew Jan 19, 2022
aedec2f
Update test_Data verbosity management and testing
sadielbartholomew Jan 20, 2022
8467f6e
Address isort & black linting failures from master merge & PR
sadielbartholomew Jan 20, 2022
f272842
Address all flake linting failures except is*_small
sadielbartholomew Jan 20, 2022
07d937d
Address remaining flake linting failures: is*_small
sadielbartholomew Jan 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/dask-migration-testing.yml
Expand Up @@ -100,13 +100,20 @@ jobs:
- name: Notify about starting testing
run: echo Setup complete. Now starting to run the cf-python test suite...

# Finally run test_Data.py!
# Finally run the relevant tests: firstly test_Data.py...
- name: Run the test_Data test module
shell: bash -l {0}
run: |
cd ${{ github.workspace }}/main/cf/test
python test_Data.py

# ... and finally test_Data_utils.py.
- name: Run the test_Data test module
shell: bash -l {0}
run: |
cd ${{ github.workspace }}/main/cf/test
python test_Data_utils.py

# End with a message indicating the suite has completed its run
- name: Notify about a completed run
run: |
Expand Down
6 changes: 3 additions & 3 deletions cf/cellmethod.py
Expand Up @@ -104,8 +104,8 @@ def create(cls, cell_methods_string=None):
#
# ['lat:', 'mean', '(', 'interval:', '1', 'hour', ')']
# ------------------------------------------------------------
cell_methods = re.sub("\((?=[^\s])", "( ", cell_methods_string)
cell_methods = re.sub("(?<=[^\s])\)", " )", cell_methods).split()
cell_methods = re.sub(r"\((?=[^\s])", "( ", cell_methods_string)
cell_methods = re.sub(r"(?<=[^\s])\)", " )", cell_methods).split()

while cell_methods:
cm = cls()
Expand Down Expand Up @@ -156,7 +156,7 @@ def create(cls, cell_methods_string=None):
if not (re.search("^(interval|comment):$", cell_methods[0])):
cell_methods.insert(0, "comment:")

while not re.search("^\)$", cell_methods[0]):
while not re.search(r"^\)$", cell_methods[0]):
term = cell_methods.pop(0)[:-1]

if term == "interval":
Expand Down
174 changes: 173 additions & 1 deletion cf/data/README.rst
@@ -1,8 +1,78 @@
`cf.Data` developer notes
=========================

Masked arrays
-------------

Whether there is a mask or not
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

For methods such as `equals`, we need to consider whether an array is
a masked one, and if so, we need to consider the *masks* (e.g. whether they
are equal), as well as the *data* (equality or otherwise).

But the difficulty is that some level of inspection, i.e. computation, is
required to know whether the object in question is masked or not! (This is
due to, fundamentally, the underlying netCDF or PP representation.)
And we want to avoid early computation, as again it is inefficient.

Consider, for example, the case of a set of computations in which an
array may acquire a mask, or may not: until the `compute` is run,
we don't know whether there is a mask at the end. Note there is a
distinction here between a standard `array` and a `masked` array
which may have a trivial (say, all `False`) or non-trivial mask, e.g.
for Dask array cases (similarly for `np.ma` etc.):

**Masked array with a non-trivial mask:**

.. code-block:: python

>>> dx = da.from_array(np.ma.array([1, 2, 3], mask=[1, 0, 0]))
>>> dx
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.MaskedArray>

**Masked array with a trivial i.e. all-Falsy mask:**

.. code-block:: python

>>> dy = da.from_array(np.ma.array([1, 2, 3], mask=[0, 0, 0]))
>>> dy
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.MaskedArray>

**Standard array i.e. no mask:**

.. code-block:: python

>>> dz = da.from_array(np.array([1, 2, 3]))
>>> dz
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray>


Solution
########

To work around the complication of not being able to know whether an array
is a masked one or not in any cases of computation where a mask may be
added, we will, for all these cases, use the fact that standard arrays (i.e.
example 3 above) can also be queried with `da.ma.getmaskarray`, returning
an all-False mask (just like a masked array with an all-False mask, i.e.
example 2 above, would):

.. code-block:: python

>>> dz = da.from_array(np.array([1, 2, 3])) # i.e. example 3 above
>>> mz = da.ma.getmaskarray(dz)
>>> mz.compute()
array([False, False, False])

>>> dy = da.from_array(np.ma.array([1, 2, 3], mask=[0, 0, 0])) # i.e. example 2
>>> my = da.ma.getmaskarray(dy)
>>> my.compute()
array([False, False, False])


Hardness of the mask
--------------------
^^^^^^^^^^^^^^^^^^^^

Any `cf.Data` method that changes the dask array should consider
whether or not the mask hardness needs resetting before
Expand All @@ -22,3 +92,105 @@ The mask hardness is most easily reset with the

`cf.Data.__setitem__` and `cf.Data.where` are examples of methods that
need to reset the mask in this manner.


Laziness
--------

To *be* lazy, or *not to be* lazy (in `cf.Data` itself)?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Central to Dask is lazy execution i.e. delayed computation:
Dask operations essentially construct a graph of calculations
or transformations (etc.) that are ready to run later,
and only get evaluated together when requested with
a `<dask object>.compute` call.

We want to utilise this laziness because it is central to the
efficiency from using Dask, but to what extent to do we want
to incorporate laziness into `cf.Data`? Namely, for
an arbitary `cf.Data` method previously returning some result
(say, a Boolean or an array), which of these should we return:

1. The **pre-computed result**, i.e. the outcome from running
`compute` on the result graph constructed in the method
(e.g. the same Boolean or an array, etc., as before); or
2. The **uncomputed result**, i.e. a Dask object which only
evaluates to the result in (1) when either the user or
the code under-the-hood, later, runs a `compute`?

Arguments for choice (1) [advantages to (1) and disadvantages to (2)]:

* The simpler choice:

* means output is the same as before so documentation is easier and
less change relative to previous versions;
* logging and error handling can remain simple and as-is, whereas
choice (2) would mean we don't know whether a given log or error
message, dependent on the outcome, is applicable, so we can't
call it immediately (perhaps at all?). We might have to defer to
standard Dask messages, which would reduce specificity and clarity.
* Testing will be simpler, as with (2) we would have to add `compute`
calls in at appropriate points before running test assertions, etc.
* Inspection methods can return as they do now, whereas with choice (2)
we would have to work out what to show when certain aspects aren't
yet computed.

Arguments for choice (2):

* The technically more complicated but more efficient choice, overall:

* This choice is more efficient when we build up chains of operations,
because it avoids intermediate computation meaning parallelisation can
be optimised more comprehensively by Dask.

As well as choice (1) or (2) outright, there are further options for
a mixture or a flexible choice of return object in this respect:

3. Make use of a common keyword argument such as `precompute`
on methods so users and under-the-hood in
the code we can dictate whether or not to return the pre-computed or
uncomputed result? That would give extra flexibility, but mean more
boilerplate code (which can be consolidated somewhat, but at best
will require some extra lines per method).

If this option is chosen, what would the best default be, `True`
or `False`?

4. (DH's suggestion) Methods that return new cf.Data objects
(such as transpose) should be lazy and other methods should not be
(e.g. __repr__ and equals).

**We have agreed that (4) is the most sensible approach to take, therefore
the working plan is** that:

* **any method (previously) returning a cf.Data object will,
post-daskification, belazy and return the uncomputed result**, i.e. a
Dask object that, when computed, will evaluate to the final cf.Data
object (e.g. if computed immediately after the method runs, the result
would be the same cf.Data object as that previously returned); but
* **any method returning another object, such as a Boolean or a string
representation of the object, will not be lazy and
return the pre-computed object as before**.


Logging and error handling
^^^^^^^^^^^^^^^^^^^^^^^^^^

When Dask operations are uncomputed, we don't know whether certain logging
and error messages are applicable or not.

Can we raise these in a delayed way, when we don't want to compute
early, in the case we are in the middle of under-the-hood operations and
also perhaps if we choose case (2) from the above points on extent of
laziness? How can it be done? Possible ideas include:

* Using a `try/except` block whenever a custom error message is required,
catching the corresponding Dask errors and raising our own messages.


Inheritance from `cfdm`
-----------------------

Generally, how do we deal with optimisation for objects and logic inherited
from `cfdm`, since the current plan is not to Daskify `cfdm.Data`?
2 changes: 2 additions & 0 deletions cf/data/abstract/compressedsubarray.py
Expand Up @@ -2,6 +2,8 @@
from functools import reduce
from operator import mul

from ...functions import inspect as cf_inspect


class CompressedSubarray(abc.ABC):
"""Abstract base class for a compressed sub-array container."""
Expand Down
44 changes: 22 additions & 22 deletions cf/data/creation.py
Expand Up @@ -2,26 +2,21 @@
from functools import lru_cache
from uuid import uuid4

import numpy as np

import dask.array as da
import numpy as np
from dask.array.core import getter, normalize_chunks, slices_from_chunks
from dask.utils import SerializableLock
from dask.base import tokenize
from dask.config import config

from ..units import Units

from .utils import chunk_shapes, chunk_positions
from dask.utils import SerializableLock

from . import (
FilledArray,
GatheredSubarray,
RaggedContiguousSubarray,
RaggedIndexedSubarray,
RaggedIndexedContiguousSubarray,
RaggedIndexedSubarray,
)

from .utils import chunk_positions, chunk_shapes

# Cache of axis identities
_cached_axes = {}
Expand Down Expand Up @@ -139,8 +134,9 @@ def compressed_to_dask(array):

count = array.get_count().dask_array(copy=False)

if is_small(count):
count = count.compute()
# TODODASK: remove with #297 merge
# if is_small(count):
# count = count.compute()

# Find the chunk sizes and positions of the uncompressed
# array. Each chunk will contain the data for one instance,
Expand Down Expand Up @@ -198,8 +194,9 @@ def compressed_to_dask(array):

_, inverse = da.unique(index, return_inverse=True)

if is_very_small(index):
inverse = inverse.compute()
# TODODASK: remove with #297 merge
# if is_very_small(index):
# inverse = inverse.compute()

chunks = normalize_chunks(
(1,) + (-1,) * (uncompressed_ndim - 1),
Expand Down Expand Up @@ -236,14 +233,16 @@ def compressed_to_dask(array):
index = array.get_index().dask_array(copy=False)
count = array.get_count().dask_array(copy=False)

if is_small(index):
index = index.compute()
index_is_dask = False
else:
index_is_dask = True
# TODODASK: remove with #297 merge
# if is_small(index):
# index = index.compute()
# index_is_dask = False
# else:
index_is_dask = True

if is_small(count):
count = count.compute()
# TODODASK: remove with #297 merge
# if is_small(count):
# count = count.compute()

cumlative_count = count.cumsum(axis=0)

Expand All @@ -268,8 +267,9 @@ def compressed_to_dask(array):
xprofile_indices = np.where(index == i)[0]
if index_is_dask:
xprofile_indices.compute_chunk_sizes()
if is_small(xprofile_indices):
xprofile_indices = xprofile_indices.compute()
# TODODASK: remove with #297 merge
# if is_small(xprofile_indices):
# xprofile_indices = xprofile_indices.compute()
# --- End: if

# Find the number of actual profiles in this instance
Expand Down