Skip to content

Commit

Permalink
Spatial index by product (#1539)
Browse files Browse the repository at this point in the history
* Implement spatial_index(product) as per EP13 - should have been in last PR.

* Update whats_new.rst

* Trivial text change to retrigger workflows after changing base branch.

* Lintage. sigh.

* Split temporal and spatial extent methods across dataset and product resources.

* Close coverage gaps.

* Base class docstring copypasta.

* Remove separate metadata resource from product resource - we have whole index.

* Fix null driver method signatures.

* More docstring copypasta.

* lintage
  • Loading branch information
SpacemanPaul committed Feb 5, 2024
1 parent ad78e75 commit 9585230
Show file tree
Hide file tree
Showing 19 changed files with 220 additions and 195 deletions.
15 changes: 7 additions & 8 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -396,15 +396,14 @@ def spatial_extent(self, ids, crs):
SpatialIndex = self._db.spatial_index(crs) # noqa: N806
if SpatialIndex is None:
return None
result = self._connection.execute(
select(
func.ST_AsGeoJSON(func.ST_Union(SpatialIndex.extent))
).select_from(
SpatialIndex
).where(
SpatialIndex.dataset_ref.in_(ids)
)
query = select(
func.ST_AsGeoJSON(func.ST_Union(SpatialIndex.extent))
).select_from(
SpatialIndex
).where(
SpatialIndex.dataset_ref.in_(ids)
)
result = self._connection.execute(query)
for r in result:
extent_json = r[0]
if extent_json is None:
Expand Down
25 changes: 25 additions & 0 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -13,6 +13,7 @@
Persistence API implementation for postgres.
"""

import datetime
import logging
import uuid # noqa: F401
from typing import Iterable, Tuple
Expand Down Expand Up @@ -1151,6 +1152,30 @@ def get_all_metadata_defs(self):
).fetchall()
]

def temporal_extent_by_product(self, product_id: int,
min_time_offset, max_time_offset) -> tuple[datetime.datetime, datetime.datetime]:
time_min = DateDocField('aquisition_time_min',
'Min of time when dataset was acquired',
DATASET.c.metadata,
False, # is it indexed
offset=min_time_offset,
selection='least')

time_max = DateDocField('aquisition_time_max',
'Max of time when dataset was acquired',
DATASET.c.metadata,
False, # is it indexed
offset=max_time_offset,
selection='greatest')

return self._connection.execute(
select(
func.min(time_min.alchemy_expression), func.max(time_max.alchemy_expression)
).where(
DATASET.c.dataset_type_ref == product_id
)
).first()

def get_locations(self, dataset_id):
return [
record[0]
Expand Down
57 changes: 40 additions & 17 deletions datacube/index/abstract.py
Expand Up @@ -405,7 +405,8 @@ class and implement all abstract methods.
(If a particular abstract method is not applicable for a particular implementation
raise a NotImplementedError)
"""
metadata_type_resource: AbstractMetadataTypeResource
def __init__(self, index: "AbstractIndex"):
self._index = index

def from_doc(self, definition: Mapping[str, Any],
metadata_type_cache: Optional[MutableMapping[str, MetadataType]] = None) -> Product:
Expand All @@ -430,14 +431,14 @@ def from_doc(self, definition: Mapping[str, Any],
if metadata_type_cache is not None and metadata_type in metadata_type_cache:
metadata_type = metadata_type_cache[metadata_type]
else:
metadata_type = self.metadata_type_resource.get_by_name(metadata_type)
metadata_type = self._index.metadata_types.get_by_name(metadata_type)
if (metadata_type is not None
and metadata_type_cache is not None
and metadata_type.name not in metadata_type_cache):
metadata_type_cache[metadata_type.name] = metadata_type
else:
# Otherwise they embedded a document, add it if needed:
metadata_type = self.metadata_type_resource.from_doc(metadata_type)
metadata_type = self._index.metadata_types.from_doc(metadata_type)
definition = dict(definition)
definition['metadata_type'] = metadata_type.name

Expand Down Expand Up @@ -689,7 +690,7 @@ def get_with_fields(self, field_names: Iterable[str]) -> Iterable[Product]:
:param field_names: names of fields that returned products must have
:returns: Matching product models
"""
return self.get_with_types(self.metadata_type_resource.get_with_fields(field_names))
return self.get_with_types(self._index.metadata_types.get_with_fields(field_names))

def get_with_types(self, types: Iterable[MetadataType]) -> Iterable[Product]:
"""
Expand Down Expand Up @@ -779,6 +780,33 @@ def get_all_docs(self) -> Iterable[Mapping[str, Any]]:
for prod in self.get_all():
yield prod.definition

@abstractmethod
def spatial_extent(self, product: str | Product, crs: CRS = CRS("EPSG:4326")) -> Optional[Geometry]:
"""
Return the combined spatial extent of the nominated product
Uses spatial index.
Returns None if no index for the CRS, or if no datasets for the product in the relevant spatial index,
or if the driver does not support the spatial index api.
Result will not include extents of datasets that cannot be validly projected into the CRS.
:param product: A Product or product name. (or None)
:param crs: A CRS (defaults to EPSG:4326)
:return: The combined spatial extents of the product.
"""

@abstractmethod
def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, datetime.datetime]:
"""
Returns the minimum and maximum acquisition time of a product.
Raises KeyError if product has no datasets in the index
:param product: Product or name of product
:return: minimum and maximum acquisition times
"""


# Non-strict Dataset ID representation
DSID = Union[str, UUID]
Expand Down Expand Up @@ -1696,24 +1724,18 @@ def search_eager(self, **query: QueryField) -> List[Dataset]:
return list(self.search(**query)) # type: ignore[arg-type] # mypy isn't being very smart here :(

@abstractmethod
def temporal_extent(self,
product: str | Product | None,
ids: Iterable[DSID] | None
) -> tuple[datetime.datetime, datetime.datetime]:
def temporal_extent(self, ids: Iterable[DSID]) -> tuple[datetime.datetime, datetime.datetime]:
"""
Returns the minimum and maximum acquisition time of a product or an iterable of dataset ids.
Returns the minimum and maximum acquisition time of an iterable of dataset ids.
Only one ids or products can be passed - the other should be None. Raises ValueError if
both or neither of ids and products is passed. Raises KeyError if no datasets in the index
match the input argument.
Raises KeyError if none of the datasets are in the index
:param product: Product or name of product
:param ids: Iterable of dataset ids.
:return: minimum and maximum acquisition times
"""

@deprecat(
reason="This method has been renamed 'temporal_extent'",
reason="This method has been moved to the Product Resource and renamed 'temporal_extent()'",
version="1.9.0",
category=ODC2DeprecationWarning
)
Expand All @@ -1726,7 +1748,7 @@ def get_product_time_bounds(self,
:param product: Product of name of product
:return: minimum and maximum acquisition times
"""
return self.temporal_extent(product=product)
return self._index.products.temporal_extent(product=product)

@abstractmethod
def search_returning_datasets_light(self,
Expand Down Expand Up @@ -1762,11 +1784,12 @@ def search_returning_datasets_light(self,
"""

@abstractmethod
def spatial_extent(self, ids: Iterable[DSID], crs: CRS = CRS("EPSG:4326")) -> Optional[Geometry]:
def spatial_extent(self, ids: Iterable[DSID], crs: CRS = CRS("EPSG:4326")) -> Geometry | None:
"""
Return the combined spatial extent of the nominated datasets.
Return the combined spatial extent of the nominated datasets
Uses spatial index.
Returns None if no index for the CRS, or if no identified datasets are indexed in the relevant spatial index.
Result will not include extents of datasets that cannot be validly projected into the CRS.
Expand Down
19 changes: 3 additions & 16 deletions datacube/index/memory/_datasets.py
Expand Up @@ -642,20 +642,10 @@ def make_summary(ds: Dataset) -> Mapping[str, Any]:
for ds in self.search(**query): # type: ignore[arg-type]
yield make_summary(ds)

def temporal_extent(
self,
product: str | Product | None = None,
ids: Iterable[DSID] | None = None
) -> tuple[datetime.datetime, datetime.datetime]:
if product is None and ids is None:
raise ValueError("Must supply product or ids")
elif product is not None and ids is not None:
raise ValueError("Cannot supply both product and ids")
elif product is not None:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
ids = self.by_product.get(product.name, [])
def spatial_extent(self, ids, crs=None):
return None

def temporal_extent(self, ids: Iterable[DSID]) -> tuple[datetime.datetime, datetime.datetime]:
min_time: Optional[datetime.datetime] = None
max_time: Optional[datetime.datetime] = None
for dsid in ids:
Expand Down Expand Up @@ -727,9 +717,6 @@ def clone(self, orig: Dataset, for_save=False, lookup_locations=True) -> Dataset
archived_time=None if for_save else orig.archived_time
)

def spatial_extent(self, ids, crs=None):
return None

# Lineage methods need to be implemented on the dataset resource as that is where the relevant indexes
# currently live.
def _get_all_lineage(self) -> Iterable[LineageRelation]:
Expand Down
23 changes: 16 additions & 7 deletions datacube/index/memory/_products.py
Expand Up @@ -2,13 +2,13 @@
#
# Copyright (c) 2015-2024 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import datetime
import logging

from typing import Iterable, Iterator, Mapping, Tuple, cast

from datacube.index.fields import as_expression
from datacube.index.abstract import AbstractProductResource, QueryField
from datacube.index.memory._metadata_types import MetadataTypeResource
from datacube.model import Product
from datacube.utils import changes, jsonify_document, _readable_offset
from datacube.utils.changes import AllowPolicy, Change, Offset, check_doc_unchanged, get_doc_changes, classify_changes
Expand All @@ -19,8 +19,8 @@


class ProductResource(AbstractProductResource):
def __init__(self, metadata_type_resource):
self.metadata_type_resource: MetadataTypeResource = metadata_type_resource
def __init__(self, index):
self._index = index
self.by_id = {}
self.by_name = {}
self.next_id = 1
Expand All @@ -36,11 +36,11 @@ def add(self, product: Product, allow_table_lock: bool = False) -> Product:
f'Metadata Type {product.name}'
)
else:
mdt = self.metadata_type_resource.get_by_name(product.metadata_type.name)
mdt = self._index.metadata_types.get_by_name(product.metadata_type.name)
if mdt is None:
_LOG.warning(f'Adding metadata_type "{product.metadata_type.name}" as it doesn\'t exist')
product.metadata_type = self.metadata_type_resource.add(product.metadata_type,
allow_table_lock=allow_table_lock)
product.metadata_type = self._index.metadata_types.add(product.metadata_type,
allow_table_lock=allow_table_lock)
clone = self.clone(product)
clone.id = self.next_id
self.next_id += 1
Expand Down Expand Up @@ -168,7 +168,16 @@ def get_all(self) -> Iterable[Product]:

def clone(self, orig: Product) -> Product:
return Product(
self.metadata_type_resource.clone(orig.metadata_type),
self._index.metadata_types.clone(orig.metadata_type),
jsonify_document(orig.definition),
id_=orig.id
)

def spatial_extent(self, product, crs=None):
return None

def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, datetime.datetime]:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
ids = self._index.datasets.by_product.get(product.name, [])
return self._index.datasets.temporal_extent(ids)
5 changes: 2 additions & 3 deletions datacube/index/memory/index.py
Expand Up @@ -47,7 +47,7 @@ def __init__(self, env: ODCEnvironment) -> None:
self._env = env
self._users = UserResource()
self._metadata_types = MetadataTypeResource()
self._products = ProductResource(self.metadata_types)
self._products = ProductResource(self)
self._lineage = LineageResource(self)
self._datasets = DatasetResource(self)
global counter
Expand Down Expand Up @@ -128,8 +128,7 @@ def metadata_type_from_doc(definition: dict) -> MetadataType:
:param definition:
"""
MetadataType.validate(definition) # type: ignore
return MetadataType(definition,
dataset_search_fields=Index.get_dataset_fields(definition))
return MetadataType(definition, dataset_search_fields=Index.get_dataset_fields(definition))


def index_driver_init():
Expand Down
15 changes: 2 additions & 13 deletions datacube/index/null/_datasets.py
Expand Up @@ -106,19 +106,8 @@ def count_product_through_time(self, period, **query):
def search_summaries(self, **query):
return []

def temporal_extent(
self,
product: str | Product = None,
ids: Iterable[DSID] | None = None
) -> tuple[datetime.datetime, datetime.datetime]:
if product is None and ids is None:
raise ValueError("Must specify product or ids")
elif ids is not None and product is not None:
raise ValueError("Cannot specify both product and ids")
elif ids is not None:
raise KeyError(str(ids))
else:
raise KeyError(str(product))
def temporal_extent(self, ids: Iterable[DSID]) -> tuple[datetime.datetime, datetime.datetime]:
raise KeyError(str(ids))

# pylint: disable=redefined-outer-name
def search_returning_datasets_light(self, field_names: tuple, custom_offsets=None, limit=None, **query):
Expand Down
10 changes: 7 additions & 3 deletions datacube/index/null/_products.py
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) 2015-2024 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import logging
import datetime

from datacube.index.abstract import AbstractProductResource
from datacube.model import Product
Expand All @@ -13,9 +14,6 @@


class ProductResource(AbstractProductResource):
def __init__(self, mdtr):
self.metadata_type_resource = mdtr

def add(self, product, allow_table_lock=False):
raise NotImplementedError()

Expand All @@ -39,3 +37,9 @@ def search_by_metadata(self, metadata):

def get_all(self) -> Iterable[Product]:
return []

def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, datetime.datetime]:
raise KeyError(str(product))

def spatial_extent(self, product, crs=None):
raise KeyError(str(product))
2 changes: 1 addition & 1 deletion datacube/index/null/index.py
Expand Up @@ -36,7 +36,7 @@ def __init__(self, env: ODCEnvironment) -> None:
self._env = env
self._users = UserResource()
self._metadata_types = MetadataTypeResource()
self._products = ProductResource(self._metadata_types)
self._products = ProductResource(self)
self._lineage = NoLineageResource(self)
self._datasets = DatasetResource(self)

Expand Down

0 comments on commit 9585230

Please sign in to comment.