Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize reads for timeseries #199

Open
bkachroo opened this issue Feb 28, 2023 · 3 comments
Open

Parallelize reads for timeseries #199

bkachroo opened this issue Feb 28, 2023 · 3 comments

Comments

@bkachroo
Copy link

bkachroo commented Feb 28, 2023

Continuing from coiled/feedback#229.

Previous Discussion

When multiple assets are in a single chunk, stackstac reads them serially:

# NOTE: when there are multiple assets, we _could_ parallelize these reads with our own threadpool.
# However, that would probably increase memory usage, since the internal, thread-local GDAL datasets
# would end up copied to even more threads.

Chunks are still processed in parallel, so you'll always have nthreads concurrent reads at once. Generally, when working in any parallel framework, you don't want to layer parallelism: https://stackoverflow.com/a/39425693/17100540. Because these reads are mostly (but not entirely!) network bound, I can see an argument for stackstac parallelizing them internally, but it still would likely increase memory usage. The general best practice is that things running in a Dask task should be single-threaded.

I understand what you're saying about layering parallelism. I thought asyncio was an option, but after reading more, it looks like rasterio/GDAL are incompatible with asyncio, so threads are the only option. However, it still seems like threads within a task would be the best option for a time-series use-case.

The additional memory burden of threads within a task would (in worst case) be the number of simultaneous reads * the extra data (to be thrown away) per read. Since I'm reading from a different image in each thread, I presume I don't need to copy any GDALDatasets. In my case the size of each chunk (50 images) is tiny (1MB), whereas each read task takes about 50sec. If we assume the extra data per read is on the same order of magnitude as the data I keep, each task would use 50MB max.
So threads within the task would get me 1sec per chunk, for 50MB per chunk, a 50X speedup without modifying memory significantly for me.

The other option is to keep each chunk/task single-threaded, and put lots of threads on dask. This would require a graph with 1 image per chunk, so 50X graph size, to get the same performance with 50 threads per worker, and the worker still uses 50X memory. However, this causes everything on my cluster to use 50 threads per worker, instead of this specific operation where I know the execution time is very large in comparison to the memory usage.
I've experimented with second option, and it causes memory pressure since other operations are also executing on the cluster (eg. zarr reads, rechunking, resampling). I get out-of-memory error even doing nthreads = 2 * ncores instead of nthreads = ncores. It also makes the task graph very large (>300K tasks).

A similar logic applies if I want a very large number of disjoint spatial points instead of a time-series.
Please let me know if I've got anything wrong here, or there's another better option.

Attempted Solution

I attempted to multithread the reads in fetch_raster_window:

def fetch_raster_window(

...

all_empty: bool = True
entry: ReaderTableEntry
thread_pool = ThreadPoolExecutor(len(reader_table))
futures = []
for index, entry in np.ndenumerate(reader_table):
    if entry:
        reader, asset_window = entry
        # Only read if the window we're fetching actually overlaps with the asset
        if windows.intersect(current_window, asset_window):

            # TODO when the Reader won't be rescaling, support passing `output` to avoid the copy?
            futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))

for future in as_completed(futures):
    index, data = future.result()
    if all_empty:
        # Turn `output` from a broadcast-trick array to a real array, so it's writeable
        if (
            np.isnan(data)
            if np.isnan(fill_value)
            else np.equal(data, fill_value)
        ).all():
            # Unless the data we just read is all empty anyway
            continue
        output = np.array(output)
        all_empty = False

    output[index] = data

thread_pool.shutdown()

Problem

Unfortunately, this produces an error:

...
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/to_dask.py", line 186, in <lambda>
  futures.append(thread_pool.submit(lambda: (index, reader.read(current_window))))
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 385, in read
  reader = self.dataset
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 381, in dataset
  self._dataset = self._open()
File "/Users/bharethkachroo/Dev/pharos_api/stackstac/stackstac/rio_reader.py", line 337, in _open
  raise RuntimeError(msg) from e
RuntimeError: Error opening 'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/H/WC/2017/10/S2B_18HWC_20171024_0_L2A/B04.tif': RasterioIOError('CURL error: getaddrinfo() thread failed to start')

What I Tried:

  • Increasing the number of max_threads in the thread_pool
  • Ensuring AWS credentials and requester pays were enabled in the gdal_env rasterio.session
  • Using Dask's synchronous scheduler

Do you have any suggestions for how to deal with this problem? Or is there a better approach for me to achieve parallelism here?

@bkachroo
Copy link
Author

bkachroo commented Feb 28, 2023

I set CPL_CURL_VERBOSE and the logs look like this:

...
* Couldn't find host sentinel-cogs.s3.us-west-2.amazonaws.com in the (nil) file; using defaults
* getaddrinfo() thread failed to start
* Could not resolve host: sentinel-cogs.s3.us-west-2.amazonaws.com
* Closing connection 0
Error opening 'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/43/T/EL/2017/8/S2A_43TEL_20170814_0_L2A/B04.tif': RasterioIOError('CURL error: getaddrinfo() thread failed to start')
...
*   Trying 52.218.251.1:443...
* Connected to sentinel-cogs.s3.us-west-2.amazonaws.com (52.218.251.1) port 443 (#0)
...
ALPN, offering h2
* * ALPN, offering h2
* ALPN, offering http/1.1
...
* SSL connection using TLSv1.2 / ECDHE-RSA-AES128-GCM-SHA256
* ALPN, server accepted to use http/1.1
* Server certificate:
*  subject: CN=*.s3-us-west-2.amazonaws.com
*  start date: Sep 21 00:00:00 2022 GMT
*  expire date: Aug 24 23:59:59 2023 GMT
*  subjectAltName: host "sentinel-cogs.s3.us-west-2.amazonaws.com" matched cert's "*.s3.us-west-2.amazonaws.com"
*  issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon
*  SSL certificate verify ok.
> HEAD /sentinel-s2-l2a-cogs/43/T/EK/2017/8/S2A_43TEK_20170814_0_L2A/B04.tif HTTP/1.1
Host: sentinel-cogs.s3.us-west-2.amazonaws.com
Accept: */*
...
> HEAD /sentinel-s2-l2a-cogs/43/T/EK/2017/5/S2A_43TEK_20170526_0_L2A/B04.tif HTTP/1.1
Host: sentinel-cogs.s3.us-west-2.amazonaws.com
Accept: */*
...
* Server certificate:
*  subject: CN=*.s3-us-west-2.amazonaws.com
*  start date: Sep 21 00:00:00 2022 GMT
*  expire date: Aug 24 23:59:59 2023 GMT
*  subjectAltName: host "sentinel-cogs.s3.us-west-2.amazonaws.com" matched cert's "*.s3.us-west-2.amazonaws.com"
*  issuer: C=US; O=Amazon; OU=Server CA 1B; CN=Amazon
*  SSL certificate verify ok.
> HEAD /sentinel-s2-l2a-cogs/43/T/EK/2017/6/S2A_43TEK_20170615_0_L2A/B04.tif HTTP/1.1
Host: sentinel-cogs.s3.us-west-2.amazonaws.com
Accept: */*

* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< x-amz-id-2: gQ75z9Q88+GZU00jMDrdk/R1YC7bX0+Cpsmj1nP7FY7af8RwD38GwX3KePG4qR77a6NzcpjEnXI=
< x-amz-request-id: NBB8ZVDWNW3A70JR
< Date: Tue, 28 Feb 2023 22:01:06 GMT
< Last-Modified: Tue, 29 Sep 2020 00:07:12 GMT
< ETag: "2385f248d6f29bf7450b66442c23ea34-26"
< Cache-Control: public, max-age=31536000, immutable
< Accept-Ranges: bytes
< Content-Type: image/tiff; application=geotiff; profile=cloud-optimized
< Server: AmazonS3
< Content-Length: 213606320
< 
* Connection #0 to host sentinel-cogs.s3.us-west-2.amazonaws.com left intact
* Couldn't find host sentinel-cogs.s3.us-west-2.amazonaws.com in the (nil) file; using defaults
* Found bundle for host sentinel-cogs.s3.us-west-2.amazonaws.com: 0x7fbdfb6ad1c0 [serially]
* Can not multiplex, even if we wanted to!
* Re-using existing connection! (#0) with host sentinel-cogs.s3.us-west-2.amazonaws.com
* Connected to sentinel-cogs.s3.us-west-2.amazonaws.com (52.218.251.1) port 443 (#0)
> GET /sentinel-s2-l2a-cogs/43/T/EK/2017/8/S2A_43TEK_20170814_0_L2A/B04.tif HTTP/1.1
Host: sentinel-cogs.s3.us-west-2.amazonaws.com
Accept: */*
Range: bytes=0-32767
...
< HTTP/1.1 206 Partial Content
< x-amz-id-2: aYiWDbOY15Cj/9CNwSoG9mdM7npRwnDf13LgJYmZtkzTkJ+KPRr2Txd9Ha/UePLnNSsOO1gW4j8=
< x-amz-request-id: EYK81C9VC290SNWS
< Date: Tue, 28 Feb 2023 22:01:07 GMT
< Last-Modified: Fri, 18 Sep 2020 11:43:44 GMT
< ETag: "929b09a1ad65ad1c7bc81b3eff02ff0b-28"
< Cache-Control: public, max-age=31536000, immutable
< Accept-Ranges: bytes
< Content-Range: bytes 223723520-224821247/233121230
< Content-Type: image/tiff; application=geotiff; profile=cloud-optimized
< Server: AmazonS3
< Content-Length: 1097728
...
* Couldn't find host sentinel-cogs.s3.us-west-2.amazonaws.com in the (nil) file; using defaults
* Found bundle for host sentinel-cogs.s3.us-west-2.amazonaws.com: 0x7fbdfbfd0c50 [serially]
* Can not multiplex, even if we wanted to!
* Re-using existing connection! (#0) with host sentinel-cogs.s3.us-west-2.amazonaws.com
* Connected to sentinel-cogs.s3.us-west-2.amazonaws.com (52.218.251.1) port 443 (#0)
> GET /sentinel-s2-l2a-cogs/43/T/EL/2017/1/S2A_43TEL_20170119_0_L2A/B04.tif HTTP/1.1
Host: sentinel-cogs.s3.us-west-2.amazonaws.com
Accept: */*
Range: bytes=218972160-220151807
...

Except for the section at the very beginning, getaddrinfo is mentioned nowhere else. The error gets raised in python at that spot, but the curl logs continue with requests while the python program is doing some cleanup and logging before shutting down.

@bkachroo
Copy link
Author

bkachroo commented Mar 1, 2023

Comparing the verbose curl output from the main repo (before multithreading). These four lines are present in the multithreaded output (which errors), and not present in the serial version. Everything else is the same.

* getaddrinfo() thread failed to start
* Could not resolve host: sentinel-cogs.s3.us-west-2.amazonaws.com
* Closing connection 0
Error opening 'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/43/T/EL/2017/8/S2A_43TEL_20170814_0_L2A/B04.tif': RasterioIOError('CURL error: getaddrinfo() thread failed to start')

Instead, it does:

* Couldn't find host sentinel-cogs.s3.us-west-2.amazonaws.com in the (nil) file; using defaults
*   Trying 52.218.243.121:443...
* Connected to sentinel-cogs.s3.us-west-2.amazonaws.com (52.218.243.121) port 443 (#0)

Simply skipping that section.

@bkachroo
Copy link
Author

bkachroo commented Mar 1, 2023

I tried running the threading reads outside of stackstac, and they work.

setlist = ['https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/T/VR/2017/10/S2A_18TVR_20171006_0_L2A/SCL.tif',
           'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/T/VR/2017/10/S2B_18TVR_20171008_0_L2A/SCL.tif',
           'https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/18/T/VR/2017/10/S2A_18TVR_20171013_0_L2A/SCL.tif',
           ]

def readdata(url):
  with rio.open(url, sharing=False) as src:
      data = src.read(1, window=Window(0, 0, 50, 50))
  return data

thread_pool = ThreadPoolExecutor(len(setlist))
futures = []
for index, url in enumerate(setlist):
  futures.append(thread_pool.submit(lambda: readdata(url)))

datas = []
for future in as_completed(futures):
   datas.append(future.result())
print(datas[-1])

I also tried this with 50 images and it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant