Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Product CLI Command #1583

Merged
merged 10 commits into from May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from typing import Iterable, Sequence, Optional, Set, Any
from typing import cast as type_cast
Expand All @@ -35,6 +36,7 @@
from datacube.utils.uris import split_uri
from datacube.index.abstract import DSID
from datacube.model.lineage import LineageRelation, LineageDirection
from datacube.migration import ODC2DeprecationWarning
from . import _core
from ._fields import parse_fields, Expression, PgField, PgExpression, DateRangeDocField # noqa: F401
from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue
Expand Down Expand Up @@ -464,15 +466,15 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(Dataset.id)
if archived:
query = query.where(
Dataset.archived != None
Dataset.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
Dataset.archived == None
Dataset.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -746,6 +748,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
# TODO
raise NotImplementedError()

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1106,6 +1112,13 @@ def update_product(self,

return prod_id

def delete_product(self, name):
res = self._connection.execute(
delete(Product).returning(Product.id).where(Product.name == name)
)

return res.first()[0]

def insert_metadata_type(self, name, definition):
res = self._connection.execute(
insert(MetadataType).values(
Expand Down
49 changes: 31 additions & 18 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -25,11 +25,13 @@
from sqlalchemy.dialects.postgresql import JSONB, insert, UUID
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from datacube.index.exceptions import MissingRecordError
from datacube.index.fields import Field, Expression, OrExpression
from datacube.model import Range
from datacube.utils.uris import split_uri
from datacube.migration import ODC2DeprecationWarning
from . import _core
from . import _dynamic as dynamic
from ._fields import parse_fields, PgField, PgExpression, DateRangeDocField # noqa: F401
Expand Down Expand Up @@ -108,9 +110,9 @@ def get_native_fields() -> dict[str, NativeField]:
'Product name',
PRODUCT.c.name
),
'dataset_type_id': NativeField(
'dataset_type_id',
'ID of a dataset type',
'product_id': NativeField(
'product_id',
'ID of a product',
DATASET.c.dataset_type_ref
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
),
'metadata_type': NativeField(
Expand Down Expand Up @@ -344,19 +346,19 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(
DATASET.c.id # type: ignore[arg-type]
).select_from(
DATASET
)
if archived:
query = query.where(
DATASET.c.archived != None
DATASET.c.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
DATASET.c.archived == None
DATASET.c.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -476,7 +478,7 @@ def get_dataset_sources(self, dataset_id):

return self._connection.execute(query).fetchall()

def search_datasets_by_metadata(self, metadata, archived: bool | None):
def search_datasets_by_metadata(self, metadata, archived: bool | None = False):
"""
Find any datasets that have the given metadata.

Expand Down Expand Up @@ -792,6 +794,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
)
)

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1066,6 +1072,18 @@ def update_product(self,
rebuild_view=True)
return type_id

def delete_product(self, name, mt_id, mt_name, mt_def):
res = self._connection.execute(
PRODUCT.delete().returning(PRODUCT.c.id).where(
PRODUCT.c.name == name
)
)

# Update metadata type fields to remove deleted product fields
self._setup_metadata_type_fields(mt_id, mt_name, mt_def, rebuild_indexes=True, rebuild_views=True)

return res.first()[0]

def insert_metadata_type(self, name, definition, concurrently=False):
res = self._connection.execute(
METADATA_TYPE.insert().values(
Expand All @@ -1075,9 +1093,8 @@ def insert_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.inserted_primary_key[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields, concurrently=concurrently
type_id, name, definition, concurrently=concurrently
)

def update_metadata_type(self, name, definition, concurrently=False):
Expand All @@ -1091,9 +1108,8 @@ def update_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.first()[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields,
type_id, name, definition,
concurrently=concurrently,
rebuild_views=True,
)
Expand All @@ -1103,24 +1119,21 @@ def update_metadata_type(self, name, definition, concurrently=False):
def check_dynamic_fields(self, concurrently=False, rebuild_views=False, rebuild_indexes=False):
_LOG.info('Checking dynamic views/indexes. (rebuild views=%s, indexes=%s)', rebuild_views, rebuild_indexes)

search_fields = {}

for metadata_type in self.get_all_metadata_types():
fields = get_dataset_fields(metadata_type.definition)
search_fields[metadata_type.id] = fields
self._setup_metadata_type_fields(
metadata_type.id,
metadata_type.name,
fields,
metadata_type.definition,
rebuild_indexes=rebuild_indexes,
rebuild_views=rebuild_views,
concurrently=concurrently,
)

def _setup_metadata_type_fields(self, id_, name, fields,
def _setup_metadata_type_fields(self, id_, name, definition,
rebuild_indexes=False, rebuild_views=False, concurrently=True):
# Metadata fields are no longer used (all queries are per-dataset-type): exclude all.
# This will have the effect of removing any old indexes that still exist.
fields = get_dataset_fields(definition)
exclude_fields = tuple(fields)

dataset_filter = and_(DATASET.c.archived == None, DATASET.c.metadata_type_ref == id_)
Expand Down
12 changes: 10 additions & 2 deletions datacube/index/abstract.py
Expand Up @@ -587,7 +587,7 @@ def can_update(self,

@abstractmethod
def update(self,
metadata_type: Product,
product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False
) -> Product:
Expand All @@ -597,7 +597,7 @@ def update(self,
(An unsafe change is anything that may potentially make the product
incompatible with existing datasets of that type)

:param metadata_type: Product model with unpersisted updates
:param product: Product model with unpersisted updates
:param allow_unsafe_updates: Allow unsafe changes. Use with caution.
:param allow_table_lock:
Allow an exclusive lock to be taken on the table while creating the indexes.
Expand Down Expand Up @@ -641,6 +641,14 @@ def add_document(self, definition: JsonDict) -> Product:
type_ = self.from_doc(definition)
return self.add(type_)

@abstractmethod
def delete(self, product: Product):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a ODC-beginner, it's hard to figure out if delete is guaranteed to succeed. Is there a reason return types are frequently omitted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The omission of a return type was my mistake, I've updated it to be None, in line with the dataset purge method. Beyond that, is there a more specific return type you would suggest to be more beginner-friendly?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen some surprising types in my drive-bys of the pull requests in these repositories, but my main issue is missing type annotations. As a rule of thumb, input types help me understand what a function is working with/on, and return types generally help me understand if a function is expected to sometimes fail (None/bool) or produces/assembles some kind of data (string/dict/dataclass/etc).

I don't know if it makes sense for opendatacube, but the Ruff guy (Charlie Marsh) wrote about Mypy settings they used when he was working at Spring in a blog post: https://notes.crmarsh.com/using-mypy-in-production-at-spring

Even my own code, which is significantly smaller than opendatacube, doesn't pass with what he was running in large scale production, but if you want the pyproject.toml excerpt recommended without reading his post, here's what I believe the settings are:

[tool.mypy]
namespace_packages = true
disallow_untyped_defs = true
disallow_any_unimported = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do run MyPy on this code, and it did pass (hmm). Thanks for the blog link, I'll play around with the settings and see what makes the most sense in a future PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're in the process of systematically typehinting a large complex codebase that has historically had major modules that were either not typehinted at all (or worse, were typehinted incorrectly).

We've come a long way in a relatively short period of time, but there's still a lot of work to be done in this space.

"""
Delete the specified product.

:param product: Product to be deleted
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the signature should be:

delete(self, product: Product, delete_archived_only: bool = True)

raise an exception if there exist archived datasets and delete_archived_only is true.

(Edit: i.e. move this logic up from the CLI)

Add an exception to datacube.index.exceptions - and impose an IndexException superclass while you're there.

Also catch the low level and wrap it in an IndexException if deleting the product (or dataset) throws a database error - rather than just letting the low level exception bubble up as dataset.purge does currently.

def get(self, id_: int) -> Product | None:
"""
Fetch product by id.
Expand Down
18 changes: 12 additions & 6 deletions datacube/index/memory/_datasets.py
Expand Up @@ -281,9 +281,8 @@ def restore(self, ids: Iterable[DSID]) -> None:
def purge(self, ids: Iterable[DSID]) -> None:
for id_ in ids:
id_ = dsid_to_uuid(id_)
if id_ in self._archived_by_id:
ds = self._archived_by_id.pop(id_)
del self._by_id[id_]
if id_ in self._by_id:
ds = self._by_id.pop(id_)
if id_ in self._derived_from:
for classifier, src_id in self._derived_from[id_].items():
del self._derivations[src_id][classifier]
Expand All @@ -292,13 +291,20 @@ def purge(self, ids: Iterable[DSID]) -> None:
for classifier, child_id in self._derivations[id_].items():
del self._derived_from[child_id][classifier]
del self._derivations[id_]
self._archived_by_product[ds.product.name].remove(id_)
if id_ in self._archived_by_id:
del self._archived_by_id[id_]
self._archived_by_product[ds.product.name].remove(id_)
else:
del self._active_by_id[id_]
self._by_product[ds.product.name].remove(id_)

def get_all_dataset_ids(self, archived: bool) -> Iterable[UUID]:
def get_all_dataset_ids(self, archived: bool | None = False) -> Iterable[UUID]:
if archived:
return (id_ for id_ in self._archived_by_id.keys())
else:
elif archived is not None:
return (id_ for id_ in self._active_by_id.keys())
else:
return (id_ for id_ in self._by_id.keys())

@deprecat(
reason="Multiple locations per dataset are now deprecated. Please use the 'get_location' method.",
Expand Down
9 changes: 9 additions & 0 deletions datacube/index/memory/_products.py
Expand Up @@ -115,6 +115,15 @@ def update(self, product: Product,
self.by_name[persisted.name] = persisted
return cast(Product, self.get_by_name(product.name))

def delete(self, product: Product):
datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
if datasets:
self._index.datasets.purge([ds.id for ds in datasets]) # type: ignore[attr-defined]

if product.id is not None:
del self.by_id[product.id]
del self.by_name[product.name]

def get_unsafe(self, id_: int) -> Product:
return self.clone(self.by_id[id_])

Expand Down
3 changes: 3 additions & 0 deletions datacube/index/null/_products.py
Expand Up @@ -23,6 +23,9 @@ def can_update(self, product, allow_unsafe_updates=False, allow_table_lock=False
def update(self, product: Product, allow_unsafe_updates=False, allow_table_lock=False):
raise NotImplementedError()

def delete(self, product: Product):
raise NotImplementedError()

def get_unsafe(self, id_):
raise KeyError(id_)

Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -414,7 +414,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status

Expand Down Expand Up @@ -922,7 +922,7 @@ class DatasetLight(result_type): # type: ignore
__slots__ = ()

with self._db_connection() as connection:
results = connection.search_unique_datasets(
results = connection.search_datasets(
query_exprs,
select_fields=select_fields,
limit=limit,
Expand Down
18 changes: 16 additions & 2 deletions datacube/index/postgis/_products.py
Expand Up @@ -234,6 +234,20 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo
allow_table_lock=allow_table_lock,
)

def delete(self, product: Product):
"""
Delete a Product, as well as all related datasets

:param product: the Proudct to delete
"""
# First find and delete all related datasets
product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
self._index.datasets.purge([ds.id for ds in product_datasets]) # type: ignore[attr-defined]

# Now we can safely delete the Product
with self._db_connection() as conn:
conn.delete_product(product.name)

# This is memoized in the constructor
# pylint: disable=method-hidden
def get_unsafe(self, id_): # type: ignore
Expand Down Expand Up @@ -313,8 +327,8 @@ def search_by_metadata(self, metadata):
:rtype: list[Product]
"""
with self._db_connection() as connection:
for dataset in self._make_many(connection.search_products_by_metadata(metadata)):
yield dataset
for product in self._make_many(connection.search_products_by_metadata(metadata)):
yield product

def get_all(self) -> Iterable[Product]:
"""
Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgres/_datasets.py
Expand Up @@ -385,7 +385,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status

Expand Down Expand Up @@ -748,7 +748,7 @@ def _get_dataset_types(self, q):

def _get_product_queries(self, query):
for product, q in self.products.search_robust(**query):
q['dataset_type_id'] = product.id
q['product_id'] = product.id
yield q, product

# pylint: disable=too-many-locals
Expand Down