You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Not clear if this is truly a "bug", but it seems highly undesirable + I can't work out how to avoid it
Describe the issue:
We are trying to save large data arrays to file, and at the same time check the data for any occurrences of a given value
All while, hopefully, loading and processing each data chunk only once.
For each array, we use a dask.delayed function which takes as argument both the (delayed) da.store and the collision check computation on the same data.
But this seems to cause Dask to load the entire dataset into memory,
which, for big enough data, simply crashes.
a bit more detail
When saving data to netcdf, we create a file and stream data from Dask arrays into the file variables with 'dask.array.store'.
( typically, out source data is also derived from netcdf files -- but this is not required to provoke the problem ).
We need at the same time to perform a check on the data, which determines whether there are any data points which are masked, or unmasked points matching a proposed "fill value".
Our existing code combines a delayed 'store' operation with computing the check function.
Since one is a "store" and one a "compute", they are combined by creating a delayed function which takes both as arguments.
The aim of this is that the data should only be fetched once, and streamed to the file one chunk at a time, so that we can handle files larger than memory.
What we have found is that in some cases, this operation is using memory equivalent to the size of the entire data variable, rather than a small number of its chunks
Minimal Complete Verifiable Example
importtracemallocimportdaskimportdask.arrayasdaimportnumpyasnp# construct test data as a stack of random arraysnt, nd=50, 1000000lazydata_all=da.stack([
da.random.uniform(
0, 1,
size=nd
)
for_inrange(nt)
])
# existing "target" array which we will store the result intostore_target=np.zeros((nt, nd), dtype=np.float64)
defarray_size_mb(arr: np.ndarray):
returnarr.nbytes*1.0e-6print(f"\nData full size {str(lazydata_all.shape).rjust(16)} = {array_size_mb(lazydata_all):8.1f} Mib")
print(f" .. chunk size {str(lazydata_all[0].shape).rjust(16)} = {array_size_mb(lazydata_all[0]):8.1f} Mib")
# Construct the combined store-and-calculate operation, as a delayed@dask.delayeddefstore_data_and_compute(store_operation, calculation):
returncalculationdelayed_save=da.store(lazydata_all, store_target, compute=False, lock=False)
lazy_calculation=lazydata_all[0, 0]
delayed_result=store_data_and_compute(delayed_save, lazy_calculation)
# Measure peak additional memory claimed by operationsdefcompute_memory_mb(delayed_calc):
tracemalloc.start()
delayed_calc.compute()
_, peak_mem_bytes=tracemalloc.get_traced_memory()
tracemalloc.stop()
returnpeak_mem_bytes*1.0e-6print("\nCombined calculation:")
combined_operation_mb=compute_memory_mb(delayed_result)
chunk_mb=array_size_mb(lazydata_all[0])
print(f"Consumed memory ~ {combined_operation_mb:6.2f} Mb.")
print(f" --> {combined_operation_mb/chunk_mb:.1f} * chunksize")
(1) the above makes no use of netcdf files :
it just stores to an in-memory array.
But the effect is just the same
(2) the da.stack usage (or at least, some combination of separate arrays) is essential to this problem
If in the above, you replace the source data with ...
(3) The result sample is using a threaded scheduler with 4 workers (CPUs) available.
We have confirmed that the same memory cost occurs with a distributed scheduler, but tracemalloc cannot measure this as it is distributed across workers in separate processes. With large enough data, the effect can be seen on the system monitor.
Not clear if this is truly a "bug", but it seems highly undesirable + I can't work out how to avoid it
Describe the issue:
We are trying to save large data arrays to file, and at the same time check the data for any occurrences of a given value
All while, hopefully, loading and processing each data chunk only once.
For each array, we use a
dask.delayed
function which takes as argument both the (delayed)da.store
and the collision check computation on the same data.But this seems to cause Dask to load the entire dataset into memory,
which, for big enough data, simply crashes.
a bit more detail
When saving data to netcdf, we create a file and stream data from Dask arrays into the file variables with 'dask.array.store'.
( typically, out source data is also derived from netcdf files -- but this is not required to provoke the problem ).
We need at the same time to perform a check on the data, which determines whether there are any data points which are masked, or unmasked points matching a proposed "fill value".
Our existing code combines a delayed 'store' operation with computing the check function.
Since one is a "store" and one a "compute", they are combined by creating a delayed function which takes both as arguments.
The aim of this is that the data should only be fetched once, and streamed to the file one chunk at a time, so that we can handle files larger than memory.
What we have found is that in some cases, this operation is using memory equivalent to the size of the entire data variable, rather than a small number of its chunks
Minimal Complete Verifiable Example
Sample output :
NOTE: the individual operations (store or calculate) do not consume large amounts of memory
Resulting
NOTE: this on a machine with 4 CPUs, 4 dask workers
Hence 32 ~4*8, seems to makes sense
Anything else we need to know?
Environment
The text was updated successfully, but these errors were encountered: