Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unzipping and filtering OpenURLWithFSSpec ouput before OpenWithXarray #672

Open
Riccardo7-DS opened this issue Jan 19, 2024 · 2 comments
Open

Comments

@Riccardo7-DS
Copy link

Riccardo7-DS commented Jan 19, 2024

I am trying to retreive data from the Landsaf portal for MSG SEVIRI radiometer.
https://datalsasaf.lsasvcs.ipma.pt/PRODUCTS/EPS/ENDVI10/ENVI/
However, the data is currently zipped and contains multiple files of different formats. Using a similar approach of #659 I am trying to open the unzipped and filtered file with the open_rasterio kwarg within OpenWithXarray class.

the error I currently get is: TypeError: cannot unpack non-iterable NoneType object for line 98 of ".../pangeo_forge_recipes/transforms.py" because the OpenWithXarray class fails to open the output of the custom class UnzipFilter I have created. Any hints on how to solve this as I am still a beginner with pangeo_forge_recipes?

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import PrepareZarrTarget, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr

from pangeo_forge_recipes.transforms import _add_keys, MapWithConcurrencyLimit
from pangeo_forge_recipes.openers import OpenFileType
import pangeo_forge_recipes

import apache_beam as beam
import zipfile
import os
import pandas as pd
import fsspec
from typing import Union, Optional, Tuple
import aiohttp
import io
from dataclasses import dataclass, field

@dataclass
class UnzipFilter(beam.PTransform):
    
    num: Optional[int] = 1
    file_format: Optional[str] = None
    file_name: Optional[str] = None
    file_substring: Optional[str] = None

    def expand(self, pcoll):
        refs = pcoll  | "Unzip and filter" >> beam.Map(
            _unzip_and_filter,
            num=self.num,
            file_format=self.file_format,
            file_name=self.file_name,
            file_substring=self.file_name,
        )
        return refs

def _unzip_and_filter(
        response: Tuple[pangeo_forge_recipes.types.Index, OpenFileType], 
        num:int=1, 
        file_format:Union[None,str]=None,
        file_name:Union[None,str]=None, 
        file_substring:Union[None,str]=None):
    
    import io
    with response[1] as f:
        zip_contents = f.read()

    # Step 2: Create a BytesIO object to treat the contents as an in-memory file
    zip_buffer = io.BytesIO(zip_contents)

    # Step 3: Use zipfile to extract the files from the in-memory buffer
    with zipfile.ZipFile(zip_buffer, 'r') as zip_ref:
        # Filter files based on the specified pattern
        if file_name is not None:
            zip_file_list = [file for file in zip_ref.namelist() 
                             if file_name == file]
        elif file_substring is not None:
            zip_file_list = [file for file in zip_ref.namelist() 
                             if file_substring in file]
        elif file_format is not None:
            zip_file_list = [file for file in zip_ref.namelist() 
                             if file.endswith(file_format)]

        if num ==1:
            zip_ref.read(zip_file_list[0]) 
        else:
            raise NotImplementedError

def make_url(time):
    url = URL_FORMAT.format(time=time, day=time.day, 
                             month=time.month, 
                             year=time.year)
    return url

start_date = '2008-01-01'
end_date = '2008-04-21'

# Create a date range for the 1st, 11th, and 21st of each month
date_range = pd.date_range(start=start_date, end=end_date, freq='10D')
# Filter the dates to include only the 1st, 11th, and 21st
dates = date_range[date_range.day.isin([1, 11, 21])]

username = config["LANDSAF"]["user"]
password = config["LANDSAF"]["password"]

path = config["DEFAULT"]["output"]
target_store = "output_file.zarr"

URL_FORMAT = (
    "https://datalsasaf.lsasvcs.ipma.pt/PRODUCTS/EPS/ENDVI10/ENVI/{year:4d}/{month:02d}/{day:02d}/METOP_AVHRR_{time:%Y%m%d}_S10_AFR_V200.zip"
)
time_concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, time_concat_dim)
url = make_url(dates[0])

open_kwarg = {"auth" : aiohttp.BasicAuth(username, password), "num" : 10}


recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(open_kwargs=open_kwarg)
    | UnzipFilter(file_format=".tif")
    | OpenWithXarray(xarray_open_kwargs={'engine': 'rasterio'})
    | PrepareZarrTarget(os.path.join(path, target_store))
    | StoreToZarr(
        target_root=path,
        store_name=target_store,
        combine_dims=pattern.combine_dim_keys,
    )
)

from apache_beam.pipeline import PipelineOptions
with beam.Pipeline() as p:
    p | recipe
@Riccardo7-DS Riccardo7-DS changed the title Unzipping and filtering files before OpenWithXarray Unzipping and filtering OpenURLWithFSSpec ouput before OpenWithXarray Jan 19, 2024
@thodson-usgs
Copy link
Contributor

It's a bit hacky but have a look at my workaround: https://github.com/hytest-feedstocks/ssebop-feedstock

Note the extra zip sugar in the url pattern.

@Riccardo7-DS
Copy link
Author

Thanks @thodson-usgs , it's a nice workaround

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants