Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed to write files to s3 storage #39

Open
tinaok opened this issue Oct 13, 2022 · 37 comments
Open

speed to write files to s3 storage #39

tinaok opened this issue Oct 13, 2022 · 37 comments

Comments

@tinaok
Copy link
Collaborator

tinaok commented Oct 13, 2022

https://github.com/pangeo-data/pangeo-eosc/blob/main/notebooks/SwiftXarrayTest.ipynb
@sebastian-luna-valero @guillaumeeb
I've tried to write the Zarr file following the example and I confirm it is very slow.

Is there any way to make it faster?

@sebastian-luna-valero
Copy link
Collaborator

I can check with CESNET after @guillaumeeb confirms issues.

@guillaumeeb
Copy link
Member

guillaumeeb commented Oct 13, 2022

@tinaok could you point towards the code cell that you feel is slow? I have to admit I didn't go into details about performances, but this seemed reasonable to me when I did those tests.

If I remember correctly, this was between 0.5s and 2s per chunk, depending on chunk size.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

@guillaumeeb If you have better example than that, please place them at https://github.com/pangeo-data/clivar-2022/blob/main/tutorial/examples/notebooks/ and make pull request?

@guillaumeeb
Copy link
Member

guillaumeeb commented Oct 13, 2022

So, I've looked at the notebook and tried to reproduce. This is something I've experienced during my tests: when writing a Zarr file to S3 on CESNET, there is an non compressible time before the writes of chunks actually start. This time is about 30s.

From your example, we can see that it is not related to Dask. So this must be on Zarr or Xarray side. I'm not sure what is happening between the Python command and task graphs begin in Dask. Maybe it's the time it takes to create Metadata? But this seems really high. You can verify this by using a LocalCluster and watching the tasks stream.

Maybe it comes from CESNET, I don't know. Did you try the same code using another Cloud infrastructure?

What I can say, is that once the writes of chunks are started, the bandwidth seems reasonable, so bandwidth and latency of CESNET storage looks good when writing the chunks of data.

We can try simple writes from command line or with s3fs to see if it comes from CESNET, or from something internal to Xarray or Zarr, or maybe from a conjunction of both...

@guillaumeeb
Copy link
Member

Hm, trying

ds.to_zarr("/tmp/myfile", mode='w', consolidated=True)

takes less than a second, so there is definitely something going on with Zarr and the object store.

@guillaumeeb
Copy link
Member

I've also tried to sync the dataset from local storage to CESNET object store using aws s3 cli:

 time aws s3 --endpoint https://object-store.cloud.muni.cz --profile pangeoswift sync /tmp/myfile s3://tmp/guillaumeeb/
upload: ../../tmp/myfile/.zattrs to s3://tmp/guillaumeeb/.zattrs   
upload: ../../tmp/myfile/lon/.zarray to s3://tmp/guillaumeeb/lon/.zarray
upload: ../../tmp/myfile/Tair/.zattrs to s3://tmp/guillaumeeb/Tair/.zattrs
upload: ../../tmp/myfile/Tair/.zarray to s3://tmp/guillaumeeb/Tair/.zarray
upload: ../../tmp/myfile/lat/0 to s3://tmp/guillaumeeb/lat/0       
upload: ../../tmp/myfile/lat/.zarray to s3://tmp/guillaumeeb/lat/.zarray
upload: ../../tmp/myfile/.zgroup to s3://tmp/guillaumeeb/.zgroup  
upload: ../../tmp/myfile/.zmetadata to s3://tmp/guillaumeeb/.zmetadata
upload: ../../tmp/myfile/lat/.zattrs to s3://tmp/guillaumeeb/lat/.zattrs
upload: ../../tmp/myfile/Tair/0.0.0 to s3://tmp/guillaumeeb/Tair/0.0.0
upload: ../../tmp/myfile/lon/0 to s3://tmp/guillaumeeb/lon/0  
upload: ../../tmp/myfile/time/0 to s3://tmp/guillaumeeb/time/0
upload: ../../tmp/myfile/time/.zattrs to s3://tmp/guillaumeeb/time/.zattrs
upload: ../../tmp/myfile/time/.zarray to s3://tmp/guillaumeeb/time/.zarray
upload: ../../tmp/myfile/lon/.zattrs to s3://tmp/guillaumeeb/lon/.zattrs

real    0m1.989s
user    0m0.733s
sys     0m0.184s

Less than 2 seconds, so the problem does not come from CESNET per se. We're in the last situation, conjunction between using Zarr/Xarray and the object store. We should try the notebook on another system with S3 storage if possible. Else, maybe ask some help on Pangeo community side.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

Thank you very much @guillaumeeb !! I would like to start letting students use the object storage for writing their Zarr file.
I'll may be then warn student if it is small file, they should write it in netcdf format. And use Zarr only for big Zarr tmp files, until we have definitive solution.

Do you have a way for avoiding one students deleting other student's data?

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

Did you try the same code using another Cloud infrastructure?

I have access to https://us-central1-b.gcp.pangeo.io/ but I do not have access to it's s3 storage so I can not bench. May be @rabernat can help us?

@rabernat
Copy link
Member

rabernat commented Oct 13, 2022

Everyone on that hub has access to the scratch bucket where you can write a much data as you want.

I tried this example on the main cluster in the following way

import xarray as xr
import os
ds = xr.tutorial.open_dataset("air_temperature.nc").load()
SCRATCH_BUCKET = os.environ['SCRATCH_BUCKET'] 
%time ds.to_zarr(f'{SCRATCH_BUCKET}/air_temperature.zarr')

It took 5s. Definitely not very fast! But not 30s either. This is using gcsfs.

Just a thought...rather than using an s3fs.S3Map try using a zarr.storage.FSStore. That's the approach recommended in the Zarr tutorial: https://github.com/zarr-developers/tutorials/blob/main/zarr_cloud_native_geospatial_2022.ipynb

There are some optimizations available in FSStore that are not there in S3Map, and this could be making a difference.

See zarr-developers/zarr-python#911 for some relevant discussion.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

Thank you @rabernat but I do not understand yet what I should change in this notebook to use Zarr.storage.FSStore ...

Also, we have one more question that we want to ask how US Pangeo is doing #17 (comment)

@rabernat
Copy link
Member

rabernat commented Oct 13, 2022

Thank you @rabernat but I do not understand yet what I should change in this notebook to use Zarr.storage.FSStore ...

You want to construct your store as follows

uri = f"s3://{s3_prefix}/{zarr_file_name}"
store = zarr.storage.FSStore(uri, **storage_kwargs)

where storage_kwargs has all of the customization options. (See https://github.com/zarr-developers/tutorials/blob/main/zarr_cloud_native_geospatial_2022.ipynb)

Also, we have one more question that we want to ask how US Pangeo is doing #17 (comment)

2i2c-org/infrastructure#1322

@rabernat
Copy link
Member

To profile this, here is what I am doing. From my notebook, I run

%prun -D zarr.prof ds.to_zarr(f'{SCRATCH_BUCKET}/air_temperature.zarr')

Then I download thezarr.prof file and visualize it locally with python -m snakeviz zarr.prof. This produces a view like this:
Screen Shot 2022-10-13 at 11 29 16 AM

zarr.prof.gz (gzipped because github doesn't allow just .prof files.)

What I have learned from this is that an awful lot of time is being spent on calling __contains__ on the store. In other words, Zarr is checking, maybe too much, what data are already stored. If the list operation on your object store is slow, this will take even more time.

I would be very interested to see the same profiling for your object store. Feel free to run the same command and post your results here.

This seems like a good target for optimization within Zarr. I'm pinging @martindurant, and expert on all of these things, to see if he can see any low-hanging fruit.

@guillaumeeb
Copy link
Member

I've tried switching from S3Map to FSStore with no luck, the time taken is about the same, between 40 and 45s!

So I profiled the call, an here is what I get:
image

The profiling file.
zarr.prof.gz

I'm not sure how to analyze this, it seems that every steps take longer time. I guess this is probably due to a higher latency of some commands?

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

@rabernat @guillaumeeb Thank you.

I'm still stuck with saving Zarr file through zarr.storage.FSStore

I tried

import zarr
access_key = !aws configure get aws_access_key_id
access_key = access_key[0]
secret_key = !aws configure get aws_secret_access_key
secret_key = secret_key[0]
your_name='tinaok'
s3_prefix =  "tmp/"+your_name
zarr_file_name= "newzarr"
uri = f"s3://{s3_prefix}/{zarr_file_name}"
storage_kwargs={
         "key":access_key,
         "secret":secret_key}
store = zarr.storage.FSStore(uri, **storage_kwargs)

@guillaumeeb How did you specify 'endpoint_url': 'https://object-store.cloud.muni.cz' ?

@guillaumeeb
Copy link
Member

I used:

uri = f"s3://{s3_prefix}/zarr-fsstore"
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}
store = zarr.storage.FSStore(uri, client_kwargs=client_kwargs,key=access_key, secret=secret_key)

@rabernat
Copy link
Member

Here's a useful breakdown of your profile.

A huge amount of time is being spent on __contains__ calls. I am quite certain we can optimize this. Next step is to start looking at the zarr source code more carefully.

ncalls tottime percall cumtime percall filename:lineno(function)
1 7.953e-05 7.953e-05 46.81 46.81 ~:0()
1 4.942e-05 4.942e-05 46.81 46.81 api.py:1511(to_zarr)
1 2.793e-05 2.793e-05 46.81 46.81 :1()
1 1.597e-05 1.597e-05 46.81 46.81 dataset.py:1935(to_zarr)
105 0.002647 2.521e-05 46.71 0.4448 asyn.py:60(sync)
105 0.001294 1.232e-05 46.71 0.4449 asyn.py:108(wrapper)
105 0.001647 1.569e-05 46.69 0.4446 threading.py:280(wait)
106 0.001035 9.767e-06 46.69 0.4404 threading.py:563(wait)
421 46.68 0.1109 46.68 0.1109 ~:0(<method 'acquire' of '_thread.lock' objects>)
1 8.417e-05 8.417e-05 43.37 43.37 zarr.py:502(store)
1 8.858e-06 8.858e-06 43.37 43.37 api.py:1235(dump_to_store)
1 0.000419 0.000419 31.93 31.93 zarr.py:580(set_variables)
33 0.000313 9.483e-06 24.07 0.7295 storage.py:1411(contains)
33 0.0002367 7.173e-06 24.07 0.7294 mapping.py:171(contains)
4 0.0001056 2.64e-05 16.42 4.104 creation.py:19(create)
4 7.26e-05 1.815e-05 16.42 4.104 hierarchy.py:1034(_create_nosync)
4 6.31e-05 1.578e-05 16.42 4.104 hierarchy.py:786(_write_op)
4 3.062e-05 7.655e-06 16.42 4.104 hierarchy.py:1029(create)
4 7.684e-05 1.921e-05 14.97 3.743 storage.py:283(init_array)
19 0.0001746 9.191e-06 12.4 0.6524 storage.py:101(contains_array)
8 0.0001491 1.864e-05 11.71 1.463 hierarchy.py:367(contains)
18 0.0002274 1.263e-05 11.68 0.6487 storage.py:109(contains_group)
28 0.0003081 1.1e-05 10.26 0.3663 storage.py:1366(getitem)
28 0.0002857 1.02e-05 10.26 0.3663 mapping.py:133(getitem)
4 0.0002816 7.039e-05 9.202 2.3 storage.py:427(_init_array_metadata)
11 0.0003803 3.457e-05 8.249 0.7499 storage.py:1379(setitem)
4 7.747e-05 1.937e-05 6.561 1.64 common.py:149(add)
4 0.0002106 5.264e-05 6.56 1.64 core.py:1750(_set_selection)
4 4.959e-05 1.24e-05 6.56 1.64 core.py:1260(setitem)

@martindurant
Copy link

martindurant commented Oct 13, 2022

I think the problem is, that a directory listing is cached by the s3fs instance, and makes further lookups much faster. But exists() (used by __contains__) in the absence of a cache will not call list but instead call head on the one item. This is much faster for the one item, but slower in the long run for many items. Also, while the _exists() method is async, and so many could be called concurrently, there is no simple API for checking the existence of many files in bulk - this does it serially. zarr isn't using the listdir method of the store, I'm not sure when that would get called.

A workaround would be to call .find() on the filesystem object before processing, which populates the directory cache all the way down, in a single call.

store.map.fs.find(uri);

@guillaumeeb
Copy link
Member

A workaround would be to call .find() on the filesystem object before processing, which populates the directory cache all the way down, in a single call.

Do you mean before calling ds.to_zarr?

The thing here is that the store is cleared, either by doing it manually before writing the zarr dataset, or either by Zarr I guess?

Anyway, clearing the store manually does give speedup. And on a store cleared, store.map.fs.find(uri) just returns an empty list.

Sorry @martindurant, I'm not sure if I've correctly understood your suggestion.

In any case, as we are starting from a free bucket/uri, Zarr should not have to call __contains__ so many times, right?

@martindurant
Copy link

the store is cleared

That's a good point! In this case, zarr should know that there are no keys available at all.

Actually, by writing to the directory, you invalidate the cache anyway, so my suggestion would only be true for reading, not writing.

I wonder whether s3fs should attempt to update update the relevant part of the directory cache when completing a write, rather than invalidating it. I have checked, and AWS doesn't quite give you the same information on finalizing a file as when querying it afterwards.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

@rabernat
Copy link
Member

No thanks! I am not going to log in your cluster. 🙃 But I am happy to interact here on GitHub around optimizing Zarr and fsspec to improve your use case.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 13, 2022

Thank you @rabernat !

tinaok added a commit to pangeo-data/clivar-2022 that referenced this issue Oct 14, 2022
@tinaok
Copy link
Collaborator Author

tinaok commented Oct 14, 2022

Hi @martindurant and @guillaumeeb

I tried with FSStore and find (51 s) and with FSStore and with out find (51.3 s)
Compare to using s3fs (52.4 s)
So I could not observe the speed up. May be I am doing something wrong?
Thank you

@guillaumeeb
Copy link
Member

@tinaok this is also what I observed above, no speed up with these changes. So we've profiled the calls to try to understand why it takes such time on this object store.

@martindurant
Copy link

I think the question is, when zarr wants to write a chunk, why does it first need to check if that key already exists? We mean to overwrite anyway, so it shouldn't need to know. If we were instead updating part of a chunk, we would need to first read the values from the part of the chunk that is not being changed - but not in this case.

@rabernat
Copy link
Member

when zarr wants to write a chunk, why does it first need to check if that key already exists?

Now we are asking the right questions. 👌

@tinaok & @guillaumeeb - this is how Pangeo has made progress over the past years: carefully identifying performance bottlenecks and working systematically to fix them upstream in the open source libraries. If you have the time and interest, I really encourage you to dig into the zarr source code and see if you can optimize this a bit. It's very satisfying! One behalf of the zarr developers, I can say we will gladly welcome a PR on this topic.

I also think there is something weird going on with your object store. Those list operations are going very slow compared to GCS or AWS S3. So perhaps there is a chance to optimize on that side as well.

If we can optimize both, I believe we can achieve a 10 - 100x speedup of this workflow.

@tinaok
Copy link
Collaborator Author

tinaok commented Oct 14, 2022

Thank you @rabernat @martindurant

I also think there is something weird going on with your object store. Those list operations are going very slow compared to GCS or AWS S3. So perhaps there is a chance to optimize on that side as well.

CESNET is based on openstack, would it be other native protocols I should use to make it faster may be? (or do we see slow down due to that in the trace file?)

@rabernat
Copy link
Member

rabernat commented Oct 15, 2022

I went down a bit of a rabbit hole with this and learned some interesting things.

To understand what was happening better, I created a metaclass that would help me log everything the FSStore object was doing:

import logging
import sys
import os
from functools import wraps, partial
from types import FunctionType, MethodType, MethodWrapperType
from time import perf_counter

from zarr.storage import FSStore

def format_args(args):
    if not isinstance(args, tuple):
        args = (args,)
    return ', '.join((
            repr(a)[:32]
            if not isinstance(a, bytes)
            else f'{len(a)} BYTES'
            for a in args
        ))

def wrapper(method):
    @wraps(method)
    def _impl(self, *method_args, **method_kwargs):
        tic = perf_counter()
        method_output = method(self, *method_args, **method_kwargs)
        toc = perf_counter()
        elapsed = 1000*(toc - tic)
        arg_reprs = format_args(method_args)
        output_reprs = (' -> ' + format_args(method_output)) if method_output else ''
        self.logger.info(f"{method.__name__}({arg_reprs}){output_reprs} - {elapsed:2.5f} ms") # ({method_args}, {method_kwargs})")
        return method_output
    return _impl

            
class LoggingMetaclass(type):
        
    def __new__(cls, clsname, bases, attrs, methods_to_log=()):
        
        instance = super().__new__(cls, clsname, bases, attrs)
        #instance = type(clsname, bases, attrs)
        
        methods_to_overwrite = (
            method_name
            for method_name in dir(instance)
            if callable(getattr(instance, method_name, None))
            and method_name in methods_to_log
        )
   
        for method_name in methods_to_overwrite:
            setattr(instance, method_name, wrapper(getattr(instance, method_name)))
  
        logging.basicConfig(level=logging.INFO, stream=sys.stdout)
        instance.logger = logging.getLogger(clsname)
        instance.logger.info("initialized")

        return instance

class Foo:
    
    def __init__(self, a):
        self.a = a

    def get_a(self):
        return self.a
    
    def __getitem__(self, key):
        if key=="a":
            return self.a
        else:
            raise KeyError
    
class LoggingFoo(Foo, metaclass=LoggingMetaclass, methods_to_log=['get_a', '__getitem__']):
    pass

lf = LoggingFoo(a=1)
lf.get_a()
lf['a']

store_methods = [
    'getitems', '__getitem__', 'setitems', '__setitem__',
    'delitems', '__delitem__', '__contains__', 'keys'
]


class FinalMeta(type(FSStore), LoggingMetaclass):
    pass

class LoggedFSStore(FSStore, metaclass=FinalMeta, methods_to_log=store_methods):
    pass

With this I could watch the action as follows

import xarray as xr
import gcsfs

fs = gcsfs.GCSFileSystem()
SCRATCH_BUCKET = os.environ['SCRATCH_BUCKET'] 
print(SCRATCH_BUCKET)
try:
    # make sure scratch dir is empty
    fs.rm(SCRATCH_BUCKET, recursive=True)
except FileNotFoundError:
    pass

ds = xr.tutorial.open_dataset("air_temperature.nc").load()
encoding = {var: {"chunks": ds[var].shape} for var in ds.variables}
store = LoggedFSStore(f'{SCRATCH_BUCKET}/air_temperature.zarr')
%time ds.to_zarr(store, encoding=encoding, mode='w')

which gave me the following logs

INFO:LoggedFSStore:__setitem__('.zgroup', 24 BYTES) - 139.48745 ms
INFO:LoggedFSStore:__contains__('.zarray') - 89.15675 ms
INFO:LoggedFSStore:__getitem__('.zgroup') -> 24 BYTES - 35.91319 ms
INFO:LoggedFSStore:__contains__('lat/.zarray') - 83.44236 ms
INFO:LoggedFSStore:__contains__('lat/.zgroup') - 70.80098 ms
INFO:LoggedFSStore:__contains__('air/.zarray') - 75.87749 ms
INFO:LoggedFSStore:__contains__('air/.zgroup') - 79.30506 ms
INFO:LoggedFSStore:__contains__('lon/.zarray') - 62.76967 ms
INFO:LoggedFSStore:__contains__('lon/.zgroup') - 78.97612 ms
INFO:LoggedFSStore:__contains__('time/.zarray') - 80.28468 ms
INFO:LoggedFSStore:__contains__('time/.zgroup') - 72.80981 ms
INFO:LoggedFSStore:__setitem__('.zattrs', 307 BYTES) - 145.47094 ms
INFO:LoggedFSStore:__contains__('lat/.zarray') - 65.16041 ms
INFO:LoggedFSStore:__contains__('lat/.zgroup') - 80.85878 ms
INFO:LoggedFSStore:__contains__('.zarray') - 76.13892 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 41.55132 ms
INFO:LoggedFSStore:__contains__('lat/.zarray') - 75.99438 ms
INFO:LoggedFSStore:__contains__('lat/.zgroup') - 81.07246 ms
INFO:LoggedFSStore:__setitem__('lat/.zarray', 315 BYTES) - 133.20920 ms
INFO:LoggedFSStore:__getitem__('lat/.zarray') -> 315 BYTES - 35.53439 ms
INFO:LoggedFSStore:__setitem__('lat/.zattrs', 159 BYTES) - 159.61459 ms
INFO:LoggedFSStore:setitems({'lat/0': b'\x02\x013\x04d\x00\x) - 62.34879 ms
INFO:LoggedFSStore:__contains__('lon/.zarray') - 75.67225 ms
INFO:LoggedFSStore:__contains__('lon/.zgroup') - 71.23305 ms
INFO:LoggedFSStore:__contains__('.zarray') - 66.78281 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 41.40893 ms
INFO:LoggedFSStore:__contains__('lon/.zarray') - 89.63352 ms
INFO:LoggedFSStore:__contains__('lon/.zgroup') - 74.64754 ms
INFO:LoggedFSStore:__setitem__('lon/.zarray', 315 BYTES) - 144.20582 ms
INFO:LoggedFSStore:__getitem__('lon/.zarray') -> 315 BYTES - 42.66366 ms
INFO:LoggedFSStore:__setitem__('lon/.zattrs', 160 BYTES) - 128.60093 ms
INFO:LoggedFSStore:setitems({'lon/0': b'\x02\x011\x04\xd4\x0) - 71.23684 ms
INFO:LoggedFSStore:__contains__('time/.zarray') - 73.72732 ms
INFO:LoggedFSStore:__contains__('time/.zgroup') - 62.58414 ms
INFO:LoggedFSStore:__contains__('.zarray') - 74.29651 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 52.46065 ms
INFO:LoggedFSStore:__contains__('time/.zarray') - 78.48200 ms
INFO:LoggedFSStore:__contains__('time/.zgroup') - 72.51067 ms
INFO:LoggedFSStore:__setitem__('time/.zarray', 318 BYTES) - 159.85706 ms
INFO:LoggedFSStore:__getitem__('time/.zarray') -> 318 BYTES - 36.60147 ms
INFO:LoggedFSStore:__setitem__('time/.zattrs', 192 BYTES) - 137.85338 ms
INFO:LoggedFSStore:setitems({'time/0': b'\x02\x01!\x08@[\x00) - 69.46322 ms
INFO:LoggedFSStore:__contains__('air/.zarray') - 73.52592 ms
INFO:LoggedFSStore:__contains__('air/.zgroup') - 65.83150 ms
INFO:LoggedFSStore:__contains__('.zarray') - 81.25717 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 46.77492 ms
INFO:LoggedFSStore:__contains__('air/.zarray') - 57.73701 ms
INFO:LoggedFSStore:__contains__('air/.zgroup') - 71.78812 ms
INFO:LoggedFSStore:__setitem__('air/.zarray', 367 BYTES) - 130.99378 ms
INFO:LoggedFSStore:__getitem__('air/.zarray') -> 367 BYTES - 39.19929 ms
INFO:LoggedFSStore:__setitem__('air/.zattrs', 471 BYTES) - 134.16379 ms
INFO:LoggedFSStore:setitems({'air/0.0.0': b'\x02\x01!\x04 %\) - 230.19235 ms
INFO:LoggedFSStore:keys() -> <generator object FSMap.__iter__ - 26.51500 ms
INFO:LoggedFSStore:__getitem__('.zattrs') -> 307 BYTES - 42.53620 ms
INFO:LoggedFSStore:__getitem__('.zgroup') -> 24 BYTES - 29.00718 ms
INFO:LoggedFSStore:__getitem__('air/.zarray') -> 367 BYTES - 28.26738 ms
INFO:LoggedFSStore:__getitem__('air/.zattrs') -> 471 BYTES - 34.54897 ms
INFO:LoggedFSStore:__getitem__('lat/.zarray') -> 315 BYTES - 29.59451 ms
INFO:LoggedFSStore:__getitem__('lat/.zattrs') -> 159 BYTES - 36.64703 ms
INFO:LoggedFSStore:__getitem__('lon/.zarray') -> 315 BYTES - 23.31331 ms
INFO:LoggedFSStore:__getitem__('lon/.zattrs') -> 160 BYTES - 36.63186 ms
INFO:LoggedFSStore:__getitem__('time/.zarray') -> 318 BYTES - 27.16683 ms
INFO:LoggedFSStore:__getitem__('time/.zattrs') -> 192 BYTES - 33.03136 ms
INFO:LoggedFSStore:__setitem__('.zmetadata', 3989 BYTES) - 84.72898 ms
INFO:LoggedFSStore:__getitem__('.zmetadata') -> 3989 BYTES - 48.72868 ms
CPU times: user 571 ms, sys: 85.3 ms, total: 656 ms
Wall time: 5.31 s

It is RIDICULOUS how much unnecessary listing of the same objects over and over is being done.

I am confident that we can pretty easily get a 10x improvement here by refactoring Zarr.

@rabernat
Copy link
Member

rabernat commented Oct 15, 2022

Out of curiosity, I tried implementing a very naive implementation of storing the data which bypasses most of xarray. This allows us to see whether xarray itself is responsible for these inefficient calls

import zarr
from xarray.backends.zarr import encode_zarr_attr_value

def naive_store(ds, store):
    group = zarr.open_group(store, mode="w")
    group.attrs.update(ds.attrs)
    for v in ds.variables:
        a = group.create(v, shape=ds[v].shape, chunks=ds[v].shape, dtype=ds[v].dtype)
        a[:] = ds[v].values
        attrs = {k: encode_zarr_attr_value(val) for k, val in ds[v].attrs.items()}
        attrs['_ARRAY_DIMENSIONS'] = ds[v].dims
        a.attrs.update(attrs)
    zarr.consolidate_metadata(store)
        
%time naive_store(ds, LoggedFSStore(f'{SCRATCH_BUCKET}/air_temperature_naive.zarr'))
INFO:LoggedFSStore:__setitem__('.zgroup', 24 BYTES) - 119.35081 ms
INFO:LoggedFSStore:__contains__('.zarray') - 63.41545 ms
INFO:LoggedFSStore:__getitem__('.zgroup') -> 24 BYTES - 33.21743 ms
INFO:LoggedFSStore:__setitem__('.zattrs', 307 BYTES) - 134.55527 ms
INFO:LoggedFSStore:__contains__('.zarray') - 77.27599 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 51.41485 ms
INFO:LoggedFSStore:__contains__('lat/.zarray') - 67.90970 ms
INFO:LoggedFSStore:__contains__('lat/.zgroup') - 106.38684 ms
INFO:LoggedFSStore:__setitem__('lat/.zarray', 313 BYTES) - 128.03115 ms
INFO:LoggedFSStore:__getitem__('lat/.zarray') -> 313 BYTES - 36.21120 ms
INFO:LoggedFSStore:setitems({'lat/0': b'\x02\x013\x04d\x00\x) - 63.10295 ms
INFO:LoggedFSStore:__setitem__('lat/.zattrs', 159 BYTES) - 124.15345 ms
INFO:LoggedFSStore:__contains__('.zarray') - 132.32426 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 52.42676 ms
INFO:LoggedFSStore:__contains__('air/.zarray') - 69.04383 ms
INFO:LoggedFSStore:__contains__('air/.zgroup') - 72.82501 ms
INFO:LoggedFSStore:__setitem__('air/.zarray', 365 BYTES) - 123.15104 ms
INFO:LoggedFSStore:__getitem__('air/.zarray') -> 365 BYTES - 34.05804 ms
INFO:LoggedFSStore:setitems({'air/0.0.0': b'\x02\x01!\x04 %\) - 182.70398 ms
INFO:LoggedFSStore:__setitem__('air/.zattrs', 471 BYTES) - 117.01889 ms
INFO:LoggedFSStore:__contains__('.zarray') - 69.37202 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 50.62258 ms
INFO:LoggedFSStore:__contains__('lon/.zarray') - 74.53009 ms
INFO:LoggedFSStore:__contains__('lon/.zgroup') - 73.14050 ms
INFO:LoggedFSStore:__setitem__('lon/.zarray', 313 BYTES) - 134.76317 ms
INFO:LoggedFSStore:__getitem__('lon/.zarray') -> 313 BYTES - 33.03248 ms
INFO:LoggedFSStore:setitems({'lon/0': b'\x02\x011\x04\xd4\x0) - 63.11621 ms
INFO:LoggedFSStore:__setitem__('lon/.zattrs', 160 BYTES) - 146.96551 ms
INFO:LoggedFSStore:__contains__('.zarray') - 82.45841 ms
INFO:LoggedFSStore:__contains__('.zgroup') -> True - 43.36221 ms
INFO:LoggedFSStore:__contains__('time/.zarray') - 65.99253 ms
INFO:LoggedFSStore:__contains__('time/.zgroup') - 64.22730 ms
INFO:LoggedFSStore:__setitem__('time/.zarray', 319 BYTES) - 143.17184 ms
INFO:LoggedFSStore:__getitem__('time/.zarray') -> 319 BYTES - 53.40948 ms
INFO:LoggedFSStore:setitems({'time/0': b'\x02\x01!\x08@[\x00) - 66.31368 ms
INFO:LoggedFSStore:__setitem__('time/.zattrs', 105 BYTES) - 131.50225 ms
INFO:LoggedFSStore:keys() -> <generator object FSMap.__iter__ - 22.87500 ms
INFO:LoggedFSStore:__getitem__('.zattrs') -> 307 BYTES - 36.86145 ms
INFO:LoggedFSStore:__getitem__('.zgroup') -> 24 BYTES - 31.11855 ms
INFO:LoggedFSStore:__getitem__('air/.zarray') -> 365 BYTES - 16.61750 ms
INFO:LoggedFSStore:__getitem__('air/.zattrs') -> 471 BYTES - 36.56843 ms
INFO:LoggedFSStore:__getitem__('lat/.zarray') -> 313 BYTES - 26.13736 ms
INFO:LoggedFSStore:__getitem__('lat/.zattrs') -> 159 BYTES - 40.68098 ms
INFO:LoggedFSStore:__getitem__('lon/.zarray') -> 313 BYTES - 46.28976 ms
INFO:LoggedFSStore:__getitem__('lon/.zattrs') -> 160 BYTES - 34.83477 ms
INFO:LoggedFSStore:__getitem__('time/.zarray') -> 319 BYTES - 30.99338 ms
INFO:LoggedFSStore:__getitem__('time/.zattrs') -> 105 BYTES - 45.75546 ms
INFO:LoggedFSStore:__setitem__('.zmetadata', 3881 BYTES) - 59.14524 ms
INFO:LoggedFSStore:__getitem__('.zmetadata') -> 3881 BYTES - 43.15027 ms
CPU times: user 483 ms, sys: 52.5 ms, total: 536 ms
Wall time: 4.86 s

This shows that Xarray is adding a few calls of its own, but mostly the problem exists at the Zarr level.

@rabernat
Copy link
Member

rabernat commented Oct 15, 2022

What does this all mean. It boils down to this

CPU times: user 571 ms, sys: 85.3 ms, total: 656 ms
Wall time: 5.31 s

The difference btw the 656ms of CPU time and the 5.31s of Wall time is time spent blocking on I/O. If we put all of these sequential I/O calls into an async event loop, I'm pretty sure we could eliminate most of that difference.

There is also absolutely no reason we need to be checking for .zarray and .zgroup over and over again...in series! 🤦 Nor do we need to be getting the data we have just set.

We should try turning on the Zarr V3 spec and see if it behaves better.

@guillaumeeb
Copy link
Member

@tinaok & @guillaumeeb - this is how Pangeo has made progress over the past years: carefully identifying performance bottlenecks and working systematically to fix them upstream in the open source libraries. If you have the time and interest, I really encourage you to dig into the zarr source code and see if you can optimize this a bit.

Thanks so much @rabernat for all that.

Just wanted to say that I hope to do that soon, however I won't be available next week.

@martindurant
Copy link

Something like the consolidated store knows about the differences between metadata keys and data chunks, and can cache the small pieces of JSON data. Separately, the remote store (s3fs) could also cache small writes or all writes LRU. I note that .zmetadata is being accessed, but as a separate step (as recommended). Maybe consolidated store already does the right thing?

Zarr itself has no caching of state at all, so every time xarray asks for a new array or a write at all, it needs to check what is in the output location. I wonder if this is any different with write-clobber mode as opposed to write-append.

@martindurant
Copy link

martindurant commented Oct 17, 2022

I am thinking something like

--- a/zarr/storage.py
+++ b/zarr/storage.py
@@ -2858,7 +2858,11 @@ class ConsolidatedMetadataStore(Store):
         self.store = Store._ensure_store(store)

         # retrieve consolidated metadata
-        meta = json_loads(self.store[metadata_key])
+        try:
+            meta = json_loads(self.store[metadata_key])
+        except KeyError:
+            # new store for writing
+            meta = {'zarr_consolidated_format': 1}

         # check format of consolidated metadata
         consolidated_format = meta.get('zarr_consolidated_format', None)
@@ -2885,7 +2889,12 @@ class ConsolidatedMetadataStore(Store):
         raise ReadOnlyError()

     def __setitem__(self, key, value):
-        raise ReadOnlyError()
+        self.meta_store[key] = value
+
+    def commit(self):
+        out = {".zmetadata": self.meta_store}
+        out.update(self.meta_store)
+        del out["zarr_consolidated_format"]
+        self.store.setitems(out)

so that instead of calling consolidate_metadata, you use this store instead (for metadata only).

@rabernat
Copy link
Member

That's one option. However, I see it as a bit of a workaround for the fundamental problem that the store has to be created in this very sequential, procedural way.

What if Zarr had a fast path to create an entire hierarchy at once, i.e.

zarr.create_hierarchy(big_dict, store=store, mode="w")

and all the data were set using a single call to store.setitems(...)?

I feel like that would be a more elegant solution that would not require consolidated metadata. After all, the `aws s3' CLI test shows that it is possible to get all of this data in S3 (without consolidated metadata) in < 2s. Why shouldn't Zarr be able to do the same?

@martindurant
Copy link

Certainly. I don't have an immediate evaluation as to the merits of adding top level functions to zarr versus making more versatile storage layers. They end up doing the same thing here. What does your big_dict contain?

setitems is (for now) only making any difference on FSStore compared to serial.

this very sequential, procedural way.

Some of this was also discussed in the async-zarr idea. That was about data access, possibly across multiple arrays (which zarr also can't normally do).

Note that fsspec instances also allow for "transactions" to defer finalising upload batches and make them semi-atomic. That wouldn't help with speed, but would with consistency, which any lazy/deferred collection of thing to write would need to worry about.

@martindurant
Copy link

(ah, and my diff doesn't actually require consolidated metadata, I am merely hijacking the class, because it was already doing some of this work; any dict could be used as the metadata store for uploading in a batch)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants