Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clay v1 Pipeline Spec #178

Open
yellowcap opened this issue Mar 12, 2024 · 12 comments
Open

Clay v1 Pipeline Spec #178

yellowcap opened this issue Mar 12, 2024 · 12 comments
Assignees

Comments

@yellowcap
Copy link
Member

yellowcap commented Mar 12, 2024

This is a proposal for a streaming approach that relies on static STAC Indexes and dynamic tiling.

tl;dr

  • List imagery "scenes" using STAC items
  • Compute how many chips are in each scene
  • Download batches of "scenes" to a fast temporary storage
  • Do dynamic tiling for feeding to the model

Tracking scenes

The first step is creating a list of scenes from multiple sources, like Modis, Landsat, Sentinel-2, Naip, Drones. Separately, one by one.

  • For each scene create a static STAC item with asset links to scalable storage (S3), ideally in same region
  • Each STAC item metadata should contain the following information
    • Reference date
    • Central wavelength of each band
    • Bandwidth of each band
    • Resolution in meters of each band
    • lat/lon bounding box
    • Data normalization average
    • Data normalization mean

Track chips in each scene

Track how many chips are in each scene

  • For each STAC item, we precompute how many “chips” can be created using simple window reads. That means no reprojection or spatial lookup, simply cutting the pixels into chips of 256x256 pixels
  • With this count we can index all the chips and their “parent” STAC items

The index table

We would then store all the static STAC item as json in files in a central location and index all the chips into a single table.

Stac Item Chip ID
sentinel-2-2023-01-23-FQ23.json 1
sentinel-2-2023-01-23-FQ23.json 2
sentinel-2-2023-01-23-FQ23.json 3
naip-ma-2020-m-41071327.json 1301
naip-ma-2020-m-4107132.json 1302
naip-ma-2020-m-4107132.json 1303
openaerialmap-65ec6c0deda0.json 2521
openaerialmap-65ec6c0deda0.json 2522

The pipeline would then do random sampling into batches in two steps:

  1. Creating a Mega Batch by random sampling at the STAC item level
    1. This needs to be a big number, so that the random sample at the STAC item level would be representative of the whole globe as much as possible.
    2. Each STAC item is like a single scene ,and we know what “chips” we can make from them from the index
    3. We would expect a few thousand STAC items to be a good number.
  2. Create a Mini Batch to pass to the model using a full random shuffle of all the “chips” in the index from all the sampled STAC items.
    1. The Mini Batch would be sampled from the items in the Mega Batch
    2. Each Mini Batch will be a few hundred chips, depending on the GPU memory we have available

Considerations:

  • The Mega Batch needs to be big enough to have a diverse sample because we would run through each chip in the Mega Batch before moving on to the next one
  • The number of STAC Items in each Mega Batch is a function of the size of the Mini Batch.
    • In each Mini Batch we don’t want to repeat the “parent” STAC Item of the chips
    • We also don’t want each Mini Batch within a Mega Batch to be from the very same STAC Items. So the Mega Batch size should be at least 5 to 10 times as many items as the number of chips of the MiniBatch.

Data Flow

  • Our pipeline would then take all the static STAC items, and the index file with all the chips as input.
  • It would then populate a temporary storage with the first Mega Batch. That means downloading all the assets for all the items in the Mega Batch.
  • It would then dynamically create Mini Batches from all the chips in the Mega Batch and feed these to the model from the fast temporary storage
  • Meanwhile, the next Mega Batch would be prepared.
  • When the first Mega Batch has been passed to the model, the next will be used and the first will be removed from the temporary storage

Considerations

  • We would expect the temporary storage to be in the range of 1TB to 3 TB in size. It would get all the assets for all the items in each Mega Batch, a few thousand “scenes”
  • With this approach, we would re-download all the source data in each Epoch.
@brunosan
Copy link
Member

Love this idea. I believe this also restricts us to use COG data on S3 (for speed and window reads) but that's an ok cost to bear imo. We should just make sure we can fall back always to static pre-made batches (important to keep that functionality for finetunning and users who want to continue training potentially offline).

  1. For 1.i I see the need to not bias a batch, but I don't think it's too concerning, specially when we do batch accumulation, right? We can document that to ensure we accumulate batches.
  2. Do we actually need a megabatch? Why not just weight random call STAC items and from them random chips? (we random weight by the number of chips on the STAC item, and we keep track of the ones we used, to not repeat them.)
  3. Why not streaming the creation of the batches with a queue? If we use several workers on an scalable queue, we can keep pumping batches as they are needed without having to pre-process.
  4. Totally fine to re-download on each epoch, our training data becomes ephemeral. We just need to create logs, maybe even dump the MD5 hashes alongside the STAC source, to ensure complete provenance.

@yellowcap
Copy link
Member Author

yellowcap commented Mar 13, 2024

Thanks for the feecback @brunosan

The fallback would be to run this pipeline but store all tiles permanently. That should be fine for smaller datasets, but then maybe this mechanism is overkill and the user is better off to simply create chips on their own scripts. But in any case, the proposed mechanism could be used to permanently store chips as well.

  1. We will be doing training aross instances so not sure how much batch accumulation will solve for bias
  2. Yes because of the speed we consume the data. The network overhead of chipping directly from S3 is huge (99% of time). So preloading the scenes into a fast storage is probably crucial. That is the motivation for this approach here.
  3. Most definitively we will need many workers to create chips and queuing would happen through the Mega Batch level, and implicicilty by the data loader.
  4. Yes exactly, chips become ephmeral, they would become defined through the STAC catalogs and the pipeline spec. That makes this very scalable in terms of total data, but has a cost on network and processing time.

@sharkinsspatial
Copy link

@yellowcap Thanks for doing this write up 🚀 , I think it is a good summary of some of the discussions we had. I'll try to carve out some time this week to add tickets for

  1. Options to support automated transfer of data for a batch to fast, temporary storage.
  2. Testing and benchmarking a tiffile based Dataloder that might be more efficient in these cases where no spatial reference information or transforms are needed.

A few questions/comments about the approach you've described. On the STAC side

  1. For the required STAC metadata I'd suggest including the projection extension. Many published items already support this and the proj:shape field would provide an easy mechanism to calculate chipping without accessing the COG's header directly.

  2. Generally, most assets/bands in a STAC Item have the same shape, but this is not a requirement. You'll likely want to handle this case in your chip generation.

  3. Would a batch consist of Item's only from a single sensor or a mix of sensors? If the batch is composed of multiple sensors, how will the Dataloder handle mixed asset/band options? Will it need sensor specific information to determine which assets to combine into a chip?

On the index table side, for optimized performance, we may wish to consider in-lining all of the relevant STAC Item information into columns in a Parquet dataset/files (with the assets dictionary stored in a serialized column) to avoid having to load and parse a very large JSON index file. This may only be valuable when our index size grows to > 1M rows.

On the temporary data loading side, the depth of my Pytorch knowledge is fairly limited, but we will also need to include hooks within the training loop to publish an external notification when a batch is completed so that our infrastructure can remove the assets associated with that batch from fast temporary storage.

We will likely also need some mechanism to "peek" at the next batch for the Dataloader's iterator so that it can be preloaded to disk. (This would achieve our temporary storage "streaming" goals and allow us to use our fast disk capacity efficiently).
Hopefully once I get a bit more understanding on the Pytorch side I can provide some more concrete ideas on this 😄

@brunosan
Copy link
Member

In a very timely post, @jhamman. et al at Earthmover seem to have managed to stream Zarr data directly to the GPU. https://earthmover.io/blog/cloud-native-dataloader/

GH code https://github.com/earth-mover/dataloader-demo

@yellowcap
Copy link
Member Author

yellowcap commented Mar 19, 2024

Here is a proposal for where to get / store the metadata fields that are required by the Clay v1 model in STAC. Everything except the normalization parameters is either native STAC or from the eo and raster extensions.

Each of these values would be required for each band. The assumption is that each band is its own "token" to the model eventually. Some things like bbox or date could be stored at the item level instead, and broadcasted to the bands on chip extraction time.

Property STAC Item Reference Extension
Bounding box box -
Date properties:datetime -
Ground Sampling Distance raster:bands:spatial_resolution raster
Wavelength eo:bands:center_wavelength eo
Bandwidth eo:bands:full_width_half_max eo
Normalization Average properties:normalization_average custom
Normalization Standard Deviation properties:normalization_standard_deviation custom

@yellowcap yellowcap self-assigned this Mar 19, 2024
@sharkinsspatial
Copy link

@yellowcap I'm writing up a slightly modified version of this approach based on the recent demonstration from the EarthMover team https://github.com/earth-mover/dataloader-demo. I hadn't even considered Dask's in-memory opportunistic caching as a fast intermediate store for pre-fetched chips to maintain high GPU saturation but this is actually very well aligned with our requirements.

One question which will drive the partitioning/row group structure of our chip index parquet file and how we perform sampling for shuffling is what level of reproducibility we need for batches?

With the approach we'll use Dask Dataframe's sample. We have some options for ensuring that our batch partitioning and shuffling is fully reproducible but I'll need to consider this in our pipeline architecture. Do we need fully reproducible batch generation in this case?

@yellowcap
Copy link
Member Author

Hmm not 100% sure here. I it depends on how we distribute batches across GPU nodes. Maybe we need to reproduce the sample in each node and then select from that, or the sample is created centrally and then pushed to the nodes. In this case reproducibility is less important. But not sure about this, maybe @srmsoumya knows more...

@yellowcap
Copy link
Member Author

yellowcap commented Mar 22, 2024

There are two more issues that we have to take into account:

  1. The optical spaceborn sensors have clouds ☁️ . When indexing, we should ideally limit cloud cover at the chip level. For v0.1 and v0.2 we are limiting chip level cloud cover to 30%, and we pre-select mostly cloud free scenes already.
    This makes the indexing more complicated, as we have to actually inspect the data at indexing time. This does not apply to sources like NAIP or other airborne imagery, but definitively for spaceborne optical.
  2. Nodata pixels are not allowed. For scenes that are only partially covering the whole x-y matrix (scene boundaries), we have to exclude all tiles that touch the nodata section. Only fully populated tiles should be fed to the model. So again, this requires data inspection at indexing time.

@yellowcap
Copy link
Member Author

I have been working more on the v1 pipeline, realizing that we will need cloud and nodata tracking at the chip level. I played around with creating classes where the platform specific cloud and nodata filters can be added through class method overrides.

the current code is in the PR linked above. But here are the the specific pointers

https://github.com/Clay-foundation/model/blob/clay-v1-pipeline/scripts/pipeline_v1/chip_indexer.py

script to try them out with the data attached below is

https://github.com/Clay-foundation/model/blob/clay-v1-pipeline/scripts/pipeline_v1/run_chip_indexer.py

test_stac_items.zip

The latests idea is to make copies of scenes that we want to use in a single bucket that can be hooked directly into FSx. Storing 100TB permanently would cost about 14k per month, which is still acceptable given the credits we have.

If we put the scene files, the STAC items, and a central index in a bucket, then FSx can sync that stuff and we can expose it to our training nodes.

The data loader could then do dynamic chipping from the scenes based on the index. This should be quite fast.

If we move with this approach now we gain time to do the "megabatch" part where we handle batches of scenes. I feel like if we do dynamic chipping and indexing on of the chips, that should integrate well with a more batched streaming approach. What do you think @sharkinsspatial

@yellowcap
Copy link
Member Author

Splitting the indexing and chipping functionality into a separate repo, as this can be done quite univerally I think.
https://github.com/Clay-foundation/stacchip

@yellowcap
Copy link
Member Author

yellowcap commented Mar 26, 2024

Got a first full version for the new v1 pipeline for Sentinel-2, see

https://github.com/Clay-foundation/stacchip/blob/main/scripts/sentinel-2-processor.py

This will

  1. Download items from the earth-search api (least cloudy per quarter for 3 years)
  2. Copy all assets from a whitelist to our own S3 bucket
  3. Update the stac item with the new hrefs
  4. Upload the edited stac item
  5. Create a chip index (using stacchip )
  6. Upload index to S3

If we let this run through all 2500 MGRS tiles we have previously sampled we should have all the S2 data we want for v1. We can test this at scale this week and then replicated for other sources!

Results from the a single MGRS tile and one date are here

@yellowcap
Copy link
Member Author

Ok this is ready for a first serious test drive tomorrow. The index file is written properly to geoparquet and both the STAC item and the assets are created as desired. I'll push this to Batch tomorrow, if things go smoothly we will produce

2500 MGRS tiles for 3 years with 4 seasons each. I.e. 30'000 Sentinel-2 scenes. That should be a good start!

Screenshot from visualization of the geoparquet index using lonboard

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants