Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geospatial ingestion - WIP #507

Merged
merged 28 commits into from Mar 19, 2024
Merged

Geospatial ingestion - WIP #507

merged 28 commits into from Mar 19, 2024

Conversation

normanb
Copy link
Contributor

@normanb normanb commented Jan 30, 2024

For sc-38757

@sgillies @ktsitsi

The get_metadata groups the source files by destination raster block which is scalable for task graphs. For point clouds and geometries it will return the filtered input files. I checked the test results against expected outputs with GDAL using QGIS.

@sgillies can we review the use of nodata in the raster ingest from rasterio? I am not able to override nodata for the destination raster and this may need a change in the GDAL driver.

Geometries and point clouds are simpler and I will add these as a separate commit.

Copy link
Contributor

@sgillies sgillies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@normanb @ktsitsi I've got some comments, questions, and suggestions.

src/tiledb/cloud/geospatial/helpers.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/helpers.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/helpers.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
@normanb
Copy link
Contributor Author

normanb commented Jan 30, 2024

Thanks @sgillies, I will make these changes and run black before adding more support for point clouds and geometries.

@normanb
Copy link
Contributor Author

normanb commented Feb 12, 2024

Items ToDo;

  • Add more tests
  • singular usage of rasterio.merge rather than warpedvrt
  • check nodata is propagated for rasters
  • Geometries (need pyopener or similar)

Can I get a review of the new code before I add these items?

Copy link
Contributor

@ktsitsi ktsitsi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks proper in general, just some minor comments to be addressed.

src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Outdated Show resolved Hide resolved
src/tiledb/cloud/utilities/_common.py Outdated Show resolved Hide resolved
src/tiledb/cloud/utilities/_common.py Outdated Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
src/tiledb/cloud/geospatial/ingestion.py Show resolved Hide resolved
@normanb
Copy link
Contributor Author

normanb commented Feb 16, 2024

Ingestion of geometries requires an update to use a tiledb VSI opener (a parameter).

Copy link
Collaborator

@thetorpedodog thetorpedodog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is looking pretty good so far. While I have quite a few comments here, overall it's well-written and fairly easy to follow, and I think these suggestions can make it even more so.

BATCH_SIZE = 10


@dataclass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer @attrs.define in place of @dataclass, since it gives us some nice features…

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 120 to 123
if mins is None:
mins = hdr.mins
else:
mins = [min(e) for e in zip(mins, hdr.mins)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it might be worth pulling into a function:

def _fold_in(oper, existing, new):
  if existing is None:
    return new
  return [oper(e) for e in zip(existing, new)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +46 to +51
minx: float
miny: float
maxx: float
maxy: float
minz: Optional[float] = None
maxz: Optional[float] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense make these be min and max: Point members (or maybe lo/hi to avoid colliding with builtins?), with a Point like:

@attrs.define
class Point:
  x: float
  y: float
  z: Optional[float] = None

  @classmethod
  def at(cls, xyz: Iterable[float]) -> Self:
    """Takes in a two- or three-element iterable of (x, y[, z]) coordinates."""
    itr = iter(xyz)
    return Point(next(itr), next(itr), next(itr, None))

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abstraction is the BoundingBox class, adding a Point class makes the code more complicated and doesn't reflect that these are bounds.

# common properties
extents: BoundingBox
crs: str = None
paths: Optional[Union[List[os.PathLike], os.PathLike]] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to have this be a one-element list (or tuple) in the case of there being one entry, rather than an os.PathLike object itself.

Using attrs, this can be accomplished with a converter:

def _wrap_paths(paths: Optional[Union[Sequence[os.PathLike], os.PathLike]]) -> Optional[Tuple[os.PathLike, ...]]:
  if paths is None:
    return None
    # Would it make sense to return () here, so it's always a Tuple[PathLike, ...]?
  if isinstance(paths, (str, bytes)) or not isinstance(paths, Sequence):
    # Editor's note: the isinstance has to go after the str-or-bytes check
    # because str and bytes are both Sequences.
    # (Worse yet, str is a Sequence[str].)
    return (paths,)
  return tuple(paths)

# ...

@attrs.define
class GeoMetadata:
  # ...
  paths = attrs.field(converter=_wrap_paths, default=None)

This gives an interface where you can pass either a single path or a collection of paths, but keeps the type that is stored consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

def get_pointcloud_metadata(
sources: Iterable[os.PathLike],
*,
config: Mapping[str, Any] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is set to None, it should be optional (and as a small nit, when possible, use object instead of Any for “unspecified type”) so this can be config: Optional[Mapping[str, object]].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

raise ValueError("Require at least one point cloud file to have been found")
elif dataset_type == DatasetType.RASTER:
kwargs.update(
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the keys to this .update are all literals, this can be written as

kwargs.update(
    crs=meta.crs,
    ...
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""

# Validate user input
if bool(search_uri) & bool(dataset_list_uri):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be if bool(search_uri) != bool(dataset_list_uri)? Otherwise, this will work fine if neither is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is checking that not both search_uri and dataset_list_uri are provided. I will simplify this.

T = TypeVar("T")


def batch(items: Sequence[T], chunk_size: int) -> Iterator[Sequence[T]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the overloaded term batch, maybe chunked? Alternately, we could directly use more_itertools.chunked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
if isinstance(filter, tiledb.Filter):
filter_dict = filter._attrs_()
filter_dict["_name"] = type(filter).__name__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same format we use in TileDB SOMA, except that we use the key _type rather than _name. Maybe useful to keep that consistent?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these tests can’t run with the basic TileDB install, it might be better to put them in a geospatial directory for easy exclusion, and so that it’s obvious that the datafiles belong with the geospatial test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@normanb
Copy link
Contributor Author

normanb commented Feb 20, 2024

Added dynamic node expansion from the result of a previous UDF when ingesting the data.

:param dataset_list_uri: URI with a list of dataset URIs, defaults to None
:param max_files: maximum number of URIs to read/find,
defaults to None (no limit)
:param max_samples: maximum number of samples to ingest, defaults to None (no limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the difference between max_files and max_samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, this is leftover from copying the VCF template, thanks for spotting.

defaults to None
:param config: config dictionary, defaults to None
:param namespace: TileDB-Cloud namespace, defaults to None
:param register_name: name to register the dataset with on TileDB Cloud,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default is none are passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the destination array is not registered, I have updated the docstring.

).depends_on(process_node)

# Register the dataset on TileDB Cloud
if register_name:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we are not writing directly to tiledb:// URI? There shouldn't need to be a register step because we should create it through TileDB Cloud directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataset_uri can be a tiledb:// uri and work. This way we can write directly to object storage if needed. This is following VCF but happy to restrict this to just tiledb:// URIs.

register_name=register_name,
config=config,
verbose=verbose,
trace=trace,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't take the trace parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

config=config,
verbose=verbose,
trace=trace,
log_uri=log_uri,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't take the log_uri parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:thumbs

verbose=verbose,
trace=trace,
log_uri=log_uri,
access_credentials_name=acn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't take access_credentials_name, it likely should be added to support setting credentials_name if this function remains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Register the dataset on TileDB Cloud
if register_name:
register_dataset_udf(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to keep this function, it should be done as part of the task graph? It currently is called locally in the caller's environment (and fails because there is no context setup/usage), and it is called before the task graph runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be part of the graph, I am fixing all of the comments above related to this function.

"""Groups input URIs into batches.
:param dataset_uri: dataset URI
:param dataset_type: dataset type, one of pointcloud, raster or geometry
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed as its not used in the function itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

input_list_node = graph.submit(
build_inputs_udf,
dataset_type=dataset_type,
acn=acn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be access_credentials_name=acn so the batch task graph is setup correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

sources = find(
search_uri,
config=config,
excludes=ignore,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo should be exclude?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

search_uri,
config=config,
excludes=ignore,
includes=pattern if pattern else fns[dataset_type]["pattern_fn"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo should be include?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

config=config,
excludes=ignore,
includes=pattern if pattern else fns[dataset_type]["pattern_fn"],
max_files=max_files,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the parameter to find is max_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is, I missed this change when writing the find function.

continue

if vfs.is_dir(f):
yield list_files(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be yield from and I believe we want to call find not list_files to recurse into the sub-folders. This means we also need to pass all the parameters, config=config, max_count=max_count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a test case. For find and not list_files, we retain the current count as a check in the outer function. With list_files I have removed the extraneous arguments.

offsets = _fold_in(min, offsets, hdr.offsets)

return GeoMetadata(
paths=sources,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be list(sources)

Copy link
Member

@Shelnutt2 Shelnutt2 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should convert source to list before we consume the generator. something like:

paths = list(sources)
for f in paths:
...
...
..
return GeoMetadata(
    paths=paths
...
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

raise ValueError(f"No {dataset_type.name} datasets found")

meta_kwargs = fns[dataset_type]["kwargs"]
meta = fns[dataset_type]["meta_fn"](sources, config=config, **meta_kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in testing this on some larger scale use cases. Getting the metadata in-line is a problem. I believe we should batch the output into jobs of 10 or 100 files, and grab metadata in parallel in another stage of the task graph. I've seen it take over 30m when there is 6000 files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change this after this PR has been merged. There are a couple of scalability improvements we can make.

Copy link
Contributor Author

@normanb normanb Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just noting that geospatial datasets tend to be larger as opposed to thousands of files. We should anticipate the problem but I would like to make this change as an incremental PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, thanks!

# schema creation node, returns a sequence of work items
ingest_node = graph.submit(
fn,
**input_list_node,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input_list_node is a Node type, you can't deconstruct the dictionary that is returned from input_list_node like this. The dictionary needs to be passed in-intact and handles inside the functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@normanb
Copy link
Contributor Author

normanb commented Mar 8, 2024

The latest code has been tested with rasters and point clouds on TileDB Cloud.
pyopener is required for geometries in object storage which will be added in a separate PR but the code is stubbed out for geometry ingestion.
This PR inlines the new utility functions, after the PR is merged another PR will be issued to remove them and use the updated tiledb-cloud-py.

Copy link
Collaborator

@thetorpedodog thetorpedodog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To set your tests up so that they only run when geospatial tests are called for:

Part 1 is already complete—you can use @pytest.mark.geospatial on your test cases that use geospatial code. This ensures that they don’t run when not called for. However, they are still collected as part of the test discovery process, which means the test Python files themselves should not need things installed.

I will add notes on the applicable files below, but don’t have time to do a full read-through of the codebase.

Comment on lines 54 to 62
test_1 = [
self.fs.create_file("/var/data.dat"),
self.fs.create_file("/var/data/xx1.txt"),
self.fs.create_file("/var/data/xx2.txt"),
]

with mock.patch.object(VFS, "ls", return_value=test_1):
with mock.patch.object(VFS, "is_dir", return_value=True) as mock_is_dir:
mock_is_dir.side_effect = lambda f: self.fs.isdir(f.name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I did notice this while scrolling through: Instead of setting up a bunch of mocks and using a fake filesystem, it seems like it would be simpler to create a temporary directory:

with tempfile.TemporaryDirectory() as tmp_name:
    tmp = pathlib.Path(tmp_name)
    (tmp / "data.dat").write_text("test")
    data_dir = tmp / "data"
    data_dir.mkdir()
    (data_dir / "xx1.txt").write_text("test")
    ...

The temporary directory will be cleaned up at the end of the with-block, and there is no need to muck with TileDB internals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree for find this is easier than mocking the file system.

Comment on lines 11 to 15
import affine
import fiona
import numpy as np
import rasterio
import shapely
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to move the imports of these into only the methods where they are directly used, to avoid having them at the module level.

Comment on lines 18 to 27
from tiledb.cloud.geospatial import BoundingBox
from tiledb.cloud.geospatial import DatasetType
from tiledb.cloud.geospatial import build_inputs_udf
from tiledb.cloud.geospatial import ingest_geometry_udf
from tiledb.cloud.geospatial import ingest_point_cloud_udf
from tiledb.cloud.geospatial import ingest_raster_udf
from tiledb.cloud.geospatial import load_geometry_metadata
from tiledb.cloud.geospatial import load_pointcloud_metadata
from tiledb.cloud.geospatial import load_raster_metadata
from tiledb.cloud.geospatial import read_uris
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise with this—importing tiledb.cloud.geospatial will end up importing those libraries by proxy, so within the function itself, import the items. To avoid all the repetition, I recommend importing the module rather than anything inside it, so…

# Ignore warnings
warnings.simplefilter("ignore")
# Create a temporary directory
self.test_dir = Path(tempfile.mkdtemp())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another sidenote:

Something like

self.tempdir_obj = tempfile.TemporaryDirectory()
self.test_dir = pathlib.Path(self.tempdir.name)

can be used here, then…


def tearDown(self):
# Remove the directory after the test
shutil.rmtree(self.test_dir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.tempdir_obj.cleanup() will do all the cleanup work for you.

self.test_dir = Path(tempfile.mkdtemp())

def create_test_geometries(tmp_path: os.PathLike):
radius = 1.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add import shapely to the top of this function rather than having that import at the module level.

Additionally, since this doesn’t appear to depend upon anything in its closure (i.e., the only thing it uses is the tmp_path passed in as a parameter), I recommend moving it to the module level so that the setUp function isn’t so long. Likewise with similar functions.

json.dump(shapely.geometry.mapping(g), dst)

def create_test_rasters(tmp_path: os.PathLike):
kwargs = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, at the top of this function, do import affine; import rasterio, etc.

This means that when this file is imported by pytest, it won’t immediately import any of the libraries, and by default all these tests will be skipped.

)


class GeospatialTest(unittest.TestCase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can mark this test with @pytest.mark.geospatial, then

@ihnorton ihnorton requested a review from Shelnutt2 March 11, 2024 13:36
@ktsitsi ktsitsi self-requested a review March 11, 2024 13:43
@ktsitsi ktsitsi dismissed their stale review March 11, 2024 13:52

Comments were resolved

@normanb normanb merged commit a76221e into main Mar 19, 2024
17 checks passed
@normanb normanb deleted the nb/geospatial-ingestion branch March 19, 2024 19:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants