New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Geospatial ingestion - WIP #507
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sgillies, I will make these changes and run |
Items
Can I get a review of the new code before I add these items? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks proper in general, just some minor comments to be addressed.
f8eba0e
to
cae0932
Compare
Ingestion of geometries requires an update to use a tiledb VSI opener (a parameter). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is looking pretty good so far. While I have quite a few comments here, overall it's well-written and fairly easy to follow, and I think these suggestions can make it even more so.
BATCH_SIZE = 10 | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer @attrs.define
in place of @dataclass
, since it gives us some nice features…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if mins is None: | ||
mins = hdr.mins | ||
else: | ||
mins = [min(e) for e in zip(mins, hdr.mins)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it might be worth pulling into a function:
def _fold_in(oper, existing, new):
if existing is None:
return new
return [oper(e) for e in zip(existing, new)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
minx: float | ||
miny: float | ||
maxx: float | ||
maxy: float | ||
minz: Optional[float] = None | ||
maxz: Optional[float] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense make these be min
and max: Point
members (or maybe lo
/hi
to avoid colliding with builtins?), with a Point
like:
@attrs.define
class Point:
x: float
y: float
z: Optional[float] = None
@classmethod
def at(cls, xyz: Iterable[float]) -> Self:
"""Takes in a two- or three-element iterable of (x, y[, z]) coordinates."""
itr = iter(xyz)
return Point(next(itr), next(itr), next(itr, None))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstraction is the BoundingBox
class, adding a Point
class makes the code more complicated and doesn't reflect that these are bounds.
# common properties | ||
extents: BoundingBox | ||
crs: str = None | ||
paths: Optional[Union[List[os.PathLike], os.PathLike]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make more sense to have this be a one-element list (or tuple) in the case of there being one entry, rather than an os.PathLike
object itself.
Using attrs
, this can be accomplished with a converter:
def _wrap_paths(paths: Optional[Union[Sequence[os.PathLike], os.PathLike]]) -> Optional[Tuple[os.PathLike, ...]]:
if paths is None:
return None
# Would it make sense to return () here, so it's always a Tuple[PathLike, ...]?
if isinstance(paths, (str, bytes)) or not isinstance(paths, Sequence):
# Editor's note: the isinstance has to go after the str-or-bytes check
# because str and bytes are both Sequences.
# (Worse yet, str is a Sequence[str].)
return (paths,)
return tuple(paths)
# ...
@attrs.define
class GeoMetadata:
# ...
paths = attrs.field(converter=_wrap_paths, default=None)
This gives an interface where you can pass either a single path or a collection of paths, but keeps the type that is stored consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
def get_pointcloud_metadata( | ||
sources: Iterable[os.PathLike], | ||
*, | ||
config: Mapping[str, Any] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is set to None
, it should be optional (and as a small nit, when possible, use object
instead of Any
for “unspecified type”) so this can be config: Optional[Mapping[str, object]]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
raise ValueError("Require at least one point cloud file to have been found") | ||
elif dataset_type == DatasetType.RASTER: | ||
kwargs.update( | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the keys to this .update
are all literals, this can be written as
kwargs.update(
crs=meta.crs,
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
""" | ||
|
||
# Validate user input | ||
if bool(search_uri) & bool(dataset_list_uri): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be if bool(search_uri) != bool(dataset_list_uri)
? Otherwise, this will work fine if neither is provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is checking that not both search_uri
and dataset_list_uri
are provided. I will simplify this.
T = TypeVar("T") | ||
|
||
|
||
def batch(items: Sequence[T], chunk_size: int) -> Iterator[Sequence[T]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the overloaded term batch
, maybe chunked
? Alternately, we could directly use more_itertools.chunked
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
""" | ||
if isinstance(filter, tiledb.Filter): | ||
filter_dict = filter._attrs_() | ||
filter_dict["_name"] = type(filter).__name__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same format we use in TileDB SOMA, except that we use the key _type
rather than _name
. Maybe useful to keep that consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these tests can’t run with the basic TileDB install, it might be better to put them in a geospatial
directory for easy exclusion, and so that it’s obvious that the datafiles belong with the geospatial test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Added dynamic node expansion from the result of a previous UDF when ingesting the data. |
4cc0134
to
9c0c7ac
Compare
:param dataset_list_uri: URI with a list of dataset URIs, defaults to None | ||
:param max_files: maximum number of URIs to read/find, | ||
defaults to None (no limit) | ||
:param max_samples: maximum number of samples to ingest, defaults to None (no limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the difference between max_files
and max_samples
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, this is leftover from copying the VCF template, thanks for spotting.
defaults to None | ||
:param config: config dictionary, defaults to None | ||
:param namespace: TileDB-Cloud namespace, defaults to None | ||
:param register_name: name to register the dataset with on TileDB Cloud, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default is none are passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the destination array is not registered, I have updated the docstring.
).depends_on(process_node) | ||
|
||
# Register the dataset on TileDB Cloud | ||
if register_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we are not writing directly to tiledb://
URI? There shouldn't need to be a register step because we should create it through TileDB Cloud directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dataset_uri
can be a tiledb://
uri and work. This way we can write directly to object storage if needed. This is following VCF but happy to restrict this to just tiledb://
URIs.
register_name=register_name, | ||
config=config, | ||
verbose=verbose, | ||
trace=trace, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't take the trace parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
config=config, | ||
verbose=verbose, | ||
trace=trace, | ||
log_uri=log_uri, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't take the log_uri
parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:thumbs
verbose=verbose, | ||
trace=trace, | ||
log_uri=log_uri, | ||
access_credentials_name=acn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't take access_credentials_name
, it likely should be added to support setting credentials_name
if this function remains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
# Register the dataset on TileDB Cloud | ||
if register_name: | ||
register_dataset_udf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to keep this function, it should be done as part of the task graph? It currently is called locally in the caller's environment (and fails because there is no context setup/usage), and it is called before the task graph runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should be part of the graph, I am fixing all of the comments above related to this function.
"""Groups input URIs into batches. | ||
:param dataset_uri: dataset URI | ||
:param dataset_type: dataset type, one of pointcloud, raster or geometry | ||
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed as its not used in the function itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
input_list_node = graph.submit( | ||
build_inputs_udf, | ||
dataset_type=dataset_type, | ||
acn=acn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be access_credentials_name=acn
so the batch task graph is setup correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
sources = find( | ||
search_uri, | ||
config=config, | ||
excludes=ignore, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo should be exclude
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
search_uri, | ||
config=config, | ||
excludes=ignore, | ||
includes=pattern if pattern else fns[dataset_type]["pattern_fn"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo should be include
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
config=config, | ||
excludes=ignore, | ||
includes=pattern if pattern else fns[dataset_type]["pattern_fn"], | ||
max_files=max_files, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the parameter to find is max_count
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is, I missed this change when writing the find
function.
continue | ||
|
||
if vfs.is_dir(f): | ||
yield list_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be yield from
and I believe we want to call find
not list_files
to recurse into the sub-folders. This means we also need to pass all the parameters, config=config, max_count=max_count
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a test case. For find
and not list_files
, we retain the current count as a check in the outer function. With list_files
I have removed the extraneous arguments.
offsets = _fold_in(min, offsets, hdr.offsets) | ||
|
||
return GeoMetadata( | ||
paths=sources, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be list(sources)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we should convert source to list before we consume the generator. something like:
paths = list(sources)
for f in paths:
...
...
..
return GeoMetadata(
paths=paths
...
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
raise ValueError(f"No {dataset_type.name} datasets found") | ||
|
||
meta_kwargs = fns[dataset_type]["kwargs"] | ||
meta = fns[dataset_type]["meta_fn"](sources, config=config, **meta_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in testing this on some larger scale use cases. Getting the metadata in-line is a problem. I believe we should batch the output into jobs of 10 or 100 files, and grab metadata in parallel in another stage of the task graph. I've seen it take over 30m when there is 6000 files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change this after this PR has been merged. There are a couple of scalability improvements we can make.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just noting that geospatial datasets tend to be larger as opposed to thousands of files. We should anticipate the problem but I would like to make this change as an incremental PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, thanks!
# schema creation node, returns a sequence of work items | ||
ingest_node = graph.submit( | ||
fn, | ||
**input_list_node, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input_list_node
is a Node
type, you can't deconstruct the dictionary that is returned from input_list_node
like this. The dictionary needs to be passed in-intact and handles inside the functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
The latest code has been tested with rasters and point clouds on TileDB Cloud. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set your tests up so that they only run when geospatial tests are called for:
Part 1 is already complete—you can use @pytest.mark.geospatial
on your test cases that use geospatial code. This ensures that they don’t run when not called for. However, they are still collected as part of the test discovery process, which means the test Python files themselves should not need things installed.
I will add notes on the applicable files below, but don’t have time to do a full read-through of the codebase.
tests/common/test_utils.py
Outdated
test_1 = [ | ||
self.fs.create_file("/var/data.dat"), | ||
self.fs.create_file("/var/data/xx1.txt"), | ||
self.fs.create_file("/var/data/xx2.txt"), | ||
] | ||
|
||
with mock.patch.object(VFS, "ls", return_value=test_1): | ||
with mock.patch.object(VFS, "is_dir", return_value=True) as mock_is_dir: | ||
mock_is_dir.side_effect = lambda f: self.fs.isdir(f.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I did notice this while scrolling through: Instead of setting up a bunch of mocks and using a fake filesystem, it seems like it would be simpler to create a temporary directory:
with tempfile.TemporaryDirectory() as tmp_name:
tmp = pathlib.Path(tmp_name)
(tmp / "data.dat").write_text("test")
data_dir = tmp / "data"
data_dir.mkdir()
(data_dir / "xx1.txt").write_text("test")
...
The temporary directory will be cleaned up at the end of the with-block, and there is no need to muck with TileDB internals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree for find
this is easier than mocking the file system.
tests/test_geospatial.py
Outdated
import affine | ||
import fiona | ||
import numpy as np | ||
import rasterio | ||
import shapely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll need to move the imports of these into only the methods where they are directly used, to avoid having them at the module level.
tests/test_geospatial.py
Outdated
from tiledb.cloud.geospatial import BoundingBox | ||
from tiledb.cloud.geospatial import DatasetType | ||
from tiledb.cloud.geospatial import build_inputs_udf | ||
from tiledb.cloud.geospatial import ingest_geometry_udf | ||
from tiledb.cloud.geospatial import ingest_point_cloud_udf | ||
from tiledb.cloud.geospatial import ingest_raster_udf | ||
from tiledb.cloud.geospatial import load_geometry_metadata | ||
from tiledb.cloud.geospatial import load_pointcloud_metadata | ||
from tiledb.cloud.geospatial import load_raster_metadata | ||
from tiledb.cloud.geospatial import read_uris |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise with this—importing tiledb.cloud.geospatial
will end up importing those libraries by proxy, so within the function itself, import the items. To avoid all the repetition, I recommend importing the module rather than anything inside it, so…
tests/test_geospatial.py
Outdated
# Ignore warnings | ||
warnings.simplefilter("ignore") | ||
# Create a temporary directory | ||
self.test_dir = Path(tempfile.mkdtemp()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another sidenote:
Something like
self.tempdir_obj = tempfile.TemporaryDirectory()
self.test_dir = pathlib.Path(self.tempdir.name)
can be used here, then…
tests/test_geospatial.py
Outdated
|
||
def tearDown(self): | ||
# Remove the directory after the test | ||
shutil.rmtree(self.test_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…self.tempdir_obj.cleanup()
will do all the cleanup work for you.
tests/test_geospatial.py
Outdated
self.test_dir = Path(tempfile.mkdtemp()) | ||
|
||
def create_test_geometries(tmp_path: os.PathLike): | ||
radius = 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add import shapely
to the top of this function rather than having that import at the module level.
Additionally, since this doesn’t appear to depend upon anything in its closure (i.e., the only thing it uses is the tmp_path
passed in as a parameter), I recommend moving it to the module level so that the setUp
function isn’t so long. Likewise with similar functions.
tests/test_geospatial.py
Outdated
json.dump(shapely.geometry.mapping(g), dst) | ||
|
||
def create_test_rasters(tmp_path: os.PathLike): | ||
kwargs = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, at the top of this function, do import affine; import rasterio
, etc.
This means that when this file is imported by pytest
, it won’t immediately import any of the libraries, and by default all these tests will be skipped.
) | ||
|
||
|
||
class GeospatialTest(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can mark this test with @pytest.mark.geospatial
, then
For sc-38757
@sgillies @ktsitsi
The
get_metadata
groups the source files by destination raster block which is scalable for task graphs. For point clouds and geometries it will return the filtered input files. I checked the test results against expected outputs with GDAL using QGIS.@sgillies can we review the use of
nodata
in the raster ingest from rasterio? I am not able to overridenodata
for the destination raster and this may need a change in the GDAL driver.Geometries and point clouds are simpler and I will add these as a separate commit.