Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined save and calculation is using excessive memory #10991

Open
pp-mo opened this issue Mar 11, 2024 · 3 comments
Open

Combined save and calculation is using excessive memory #10991

pp-mo opened this issue Mar 11, 2024 · 3 comments
Labels
needs triage Needs a response from a contributor

Comments

@pp-mo
Copy link

pp-mo commented Mar 11, 2024

Not clear if this is truly a "bug", but it seems highly undesirable + I can't work out how to avoid it

Describe the issue:

We are trying to save large data arrays to file, and at the same time check the data for any occurrences of a given value
All while, hopefully, loading and processing each data chunk only once.

For each array, we use a dask.delayed function which takes as argument both the (delayed) da.store and the collision check computation on the same data.
But this seems to cause Dask to load the entire dataset into memory,
which, for big enough data, simply crashes.

a bit more detail

When saving data to netcdf, we create a file and stream data from Dask arrays into the file variables with 'dask.array.store'.
( typically, out source data is also derived from netcdf files -- but this is not required to provoke the problem ).
We need at the same time to perform a check on the data, which determines whether there are any data points which are masked, or unmasked points matching a proposed "fill value".

Our existing code combines a delayed 'store' operation with computing the check function.
Since one is a "store" and one a "compute", they are combined by creating a delayed function which takes both as arguments.

The aim of this is that the data should only be fetched once, and streamed to the file one chunk at a time, so that we can handle files larger than memory.

What we have found is that in some cases, this operation is using memory equivalent to the size of the entire data variable, rather than a small number of its chunks

Minimal Complete Verifiable Example

import tracemalloc
import dask
import dask.array as da
import numpy as np

# construct test data as a stack of random arrays
nt, nd = 50, 1000000
lazydata_all = da.stack([
    da.random.uniform(
        0, 1,
        size=nd
    )
    for _ in range(nt)
])
# existing "target" array which we will store the result into
store_target = np.zeros((nt, nd), dtype=np.float64)

def array_size_mb(arr: np.ndarray):
    return arr.nbytes * 1.0e-6

print(f"\nData full size {str(lazydata_all.shape).rjust(16)} = {array_size_mb(lazydata_all):8.1f} Mib")
print(f" .. chunk size {str(lazydata_all[0].shape).rjust(16)} = {array_size_mb(lazydata_all[0]):8.1f} Mib")

# Construct the combined store-and-calculate operation, as a delayed
@dask.delayed
def store_data_and_compute(store_operation, calculation):
    return calculation

delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)
lazy_calculation = lazydata_all[0, 0]
delayed_result = store_data_and_compute(delayed_save, lazy_calculation)

# Measure peak additional memory claimed by operations
def compute_memory_mb(delayed_calc):
    tracemalloc.start()
    delayed_calc.compute()
    _, peak_mem_bytes = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    return peak_mem_bytes * 1.0e-6

print("\nCombined calculation:")
combined_operation_mb = compute_memory_mb(delayed_result)
chunk_mb = array_size_mb(lazydata_all[0])
print(f"Consumed memory ~ {combined_operation_mb:6.2f} Mb.")
print(f"  --> {combined_operation_mb / chunk_mb:.1f} * chunksize")

Sample output :

Data full size    (50, 1000000) =    400.0 Mib
 .. chunk size       (1000000,) =      8.0 Mib

Combined calculation:
Consumed memory ~ 400.25 Mb.
  --> 50.0 * chunksize

NOTE: the individual operations (store or calculate) do not consume large amounts of memory

store_only_mb = compute_memory_mb(delayed_save)
print(f"\nStore alone, takes {store_only_mb} Mb.")
calc_only_mb = compute_memory_mb(lazy_calculation)
print(f"Calculate alone, takes {calc_only_mb} Mb.")

Resulting

Store alone, takes 32.230089 Mb.
Calculate alone, takes 8.018998 Mb.

NOTE: this on a machine with 4 CPUs, 4 dask workers
Hence 32 ~4*8, seems to makes sense

Anything else we need to know?

Environment

  • Dask version: 2023.09.01 and 2024.02.01
  • Python version: 3.11
  • Operating System: linux
  • Machine : 4 CPUs
  • Install method (conda, pip, source): conda
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Mar 11, 2024
@pp-mo
Copy link
Author

pp-mo commented Mar 11, 2024

Some additional observations...

(1) the above makes no use of netcdf files :
it just stores to an in-memory array.
But the effect is just the same


(2) the da.stack usage (or at least, some combination of separate arrays) is essential to this problem
If in the above, you replace the source data with ...

lazydata_all = da.random.uniform(
    0, 1,
    size=(nt, nd),
    chunks=(1, nd)
)

... then the problem does not occur.


(3) The result sample is using a threaded scheduler with 4 workers (CPUs) available.
We have confirmed that the same memory cost occurs with a distributed scheduler, but tracemalloc cannot measure this as it is distributed across workers in separate processes. With large enough data, the effect can be seen on the system monitor.

@pp-mo
Copy link
Author

pp-mo commented Mar 11, 2024

@dcherian sorry to personally disturb you, but have you maybe experienced a similar problem and can advise ?

@pp-mo
Copy link
Author

pp-mo commented Mar 28, 2024

N.B. As @bouweandela points out, this may well relate to #8380

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage Needs a response from a contributor
Projects
None yet
Development

No branches or pull requests

1 participant