Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Product CLI Command #1583

Merged
merged 10 commits into from May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from typing import Iterable, Sequence, Optional, Set, Any
from typing import cast as type_cast
Expand All @@ -35,6 +36,7 @@
from datacube.utils.uris import split_uri
from datacube.index.abstract import DSID
from datacube.model.lineage import LineageRelation, LineageDirection
from datacube.migration import ODC2DeprecationWarning
from . import _core
from ._fields import parse_fields, Expression, PgField, PgExpression, DateRangeDocField # noqa: F401
from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue
Expand Down Expand Up @@ -464,15 +466,15 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(Dataset.id)
if archived:
query = query.where(
Dataset.archived != None
Dataset.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
Dataset.archived == None
Dataset.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -746,6 +748,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
# TODO
raise NotImplementedError()

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1106,6 +1112,13 @@ def update_product(self,

return prod_id

def delete_product(self, name):
res = self._connection.execute(
delete(Product).returning(Product.id).where(Product.name == name)
)

return res.first()[0]

def insert_metadata_type(self, name, definition):
res = self._connection.execute(
insert(MetadataType).values(
Expand Down
49 changes: 31 additions & 18 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -25,11 +25,13 @@
from sqlalchemy.dialects.postgresql import JSONB, insert, UUID
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from datacube.index.exceptions import MissingRecordError
from datacube.index.fields import Field, Expression, OrExpression
from datacube.model import Range
from datacube.utils.uris import split_uri
from datacube.migration import ODC2DeprecationWarning
from . import _core
from . import _dynamic as dynamic
from ._fields import parse_fields, PgField, PgExpression, DateRangeDocField # noqa: F401
Expand Down Expand Up @@ -108,9 +110,9 @@ def get_native_fields() -> dict[str, NativeField]:
'Product name',
PRODUCT.c.name
),
'dataset_type_id': NativeField(
'dataset_type_id',
'ID of a dataset type',
'product_id': NativeField(
'product_id',
'ID of a product',
DATASET.c.dataset_type_ref
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
),
'metadata_type': NativeField(
Expand Down Expand Up @@ -344,19 +346,19 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(
DATASET.c.id # type: ignore[arg-type]
).select_from(
DATASET
)
if archived:
query = query.where(
DATASET.c.archived != None
DATASET.c.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
DATASET.c.archived == None
DATASET.c.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -476,7 +478,7 @@ def get_dataset_sources(self, dataset_id):

return self._connection.execute(query).fetchall()

def search_datasets_by_metadata(self, metadata, archived: bool | None):
def search_datasets_by_metadata(self, metadata, archived: bool | None = False):
"""
Find any datasets that have the given metadata.

Expand Down Expand Up @@ -792,6 +794,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
)
)

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1066,6 +1072,18 @@ def update_product(self,
rebuild_view=True)
return type_id

def delete_product(self, name, mt_id, mt_name, mt_def):
res = self._connection.execute(
PRODUCT.delete().returning(PRODUCT.c.id).where(
PRODUCT.c.name == name
)
)

# Update metadata type fields to remove deleted product fields
self._setup_metadata_type_fields(mt_id, mt_name, mt_def, rebuild_indexes=True, rebuild_views=True)

return res.first()[0]

def insert_metadata_type(self, name, definition, concurrently=False):
res = self._connection.execute(
METADATA_TYPE.insert().values(
Expand All @@ -1075,9 +1093,8 @@ def insert_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.inserted_primary_key[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields, concurrently=concurrently
type_id, name, definition, concurrently=concurrently
)

def update_metadata_type(self, name, definition, concurrently=False):
Expand All @@ -1091,9 +1108,8 @@ def update_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.first()[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields,
type_id, name, definition,
concurrently=concurrently,
rebuild_views=True,
)
Expand All @@ -1103,24 +1119,21 @@ def update_metadata_type(self, name, definition, concurrently=False):
def check_dynamic_fields(self, concurrently=False, rebuild_views=False, rebuild_indexes=False):
_LOG.info('Checking dynamic views/indexes. (rebuild views=%s, indexes=%s)', rebuild_views, rebuild_indexes)

search_fields = {}

for metadata_type in self.get_all_metadata_types():
fields = get_dataset_fields(metadata_type.definition)
search_fields[metadata_type.id] = fields
self._setup_metadata_type_fields(
metadata_type.id,
metadata_type.name,
fields,
metadata_type.definition,
rebuild_indexes=rebuild_indexes,
rebuild_views=rebuild_views,
concurrently=concurrently,
)

def _setup_metadata_type_fields(self, id_, name, fields,
def _setup_metadata_type_fields(self, id_, name, definition,
rebuild_indexes=False, rebuild_views=False, concurrently=True):
# Metadata fields are no longer used (all queries are per-dataset-type): exclude all.
# This will have the effect of removing any old indexes that still exist.
fields = get_dataset_fields(definition)
exclude_fields = tuple(fields)

dataset_filter = and_(DATASET.c.archived == None, DATASET.c.metadata_type_ref == id_)
Expand Down
12 changes: 10 additions & 2 deletions datacube/index/abstract.py
Expand Up @@ -587,7 +587,7 @@ def can_update(self,

@abstractmethod
def update(self,
metadata_type: Product,
product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False
) -> Product:
Expand All @@ -597,7 +597,7 @@ def update(self,
(An unsafe change is anything that may potentially make the product
incompatible with existing datasets of that type)

:param metadata_type: Product model with unpersisted updates
:param product: Product model with unpersisted updates
:param allow_unsafe_updates: Allow unsafe changes. Use with caution.
:param allow_table_lock:
Allow an exclusive lock to be taken on the table while creating the indexes.
Expand Down Expand Up @@ -641,6 +641,14 @@ def add_document(self, definition: JsonDict) -> Product:
type_ = self.from_doc(definition)
return self.add(type_)

@abstractmethod
def delete(self, product: Product) -> None:
"""
Delete the specified product.

:param product: Product to be deleted
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the signature should be:

delete(self, product: Product, delete_archived_only: bool = True)

raise an exception if there exist archived datasets and delete_archived_only is true.

(Edit: i.e. move this logic up from the CLI)

Add an exception to datacube.index.exceptions - and impose an IndexException superclass while you're there.

Also catch the low level and wrap it in an IndexException if deleting the product (or dataset) throws a database error - rather than just letting the low level exception bubble up as dataset.purge does currently.

def get(self, id_: int) -> Product | None:
"""
Fetch product by id.
Expand Down
18 changes: 12 additions & 6 deletions datacube/index/memory/_datasets.py
Expand Up @@ -281,9 +281,8 @@ def restore(self, ids: Iterable[DSID]) -> None:
def purge(self, ids: Iterable[DSID]) -> None:
for id_ in ids:
id_ = dsid_to_uuid(id_)
if id_ in self._archived_by_id:
ds = self._archived_by_id.pop(id_)
del self._by_id[id_]
if id_ in self._by_id:
ds = self._by_id.pop(id_)
if id_ in self._derived_from:
for classifier, src_id in self._derived_from[id_].items():
del self._derivations[src_id][classifier]
Expand All @@ -292,13 +291,20 @@ def purge(self, ids: Iterable[DSID]) -> None:
for classifier, child_id in self._derivations[id_].items():
del self._derived_from[child_id][classifier]
del self._derivations[id_]
self._archived_by_product[ds.product.name].remove(id_)
if id_ in self._archived_by_id:
del self._archived_by_id[id_]
self._archived_by_product[ds.product.name].remove(id_)
else:
del self._active_by_id[id_]
self._by_product[ds.product.name].remove(id_)

def get_all_dataset_ids(self, archived: bool) -> Iterable[UUID]:
def get_all_dataset_ids(self, archived: bool | None = False) -> Iterable[UUID]:
if archived:
return (id_ for id_ in self._archived_by_id.keys())
else:
elif archived is not None:
return (id_ for id_ in self._active_by_id.keys())
else:
return (id_ for id_ in self._by_id.keys())

@deprecat(
reason="Multiple locations per dataset are now deprecated. Please use the 'get_location' method.",
Expand Down
9 changes: 9 additions & 0 deletions datacube/index/memory/_products.py
Expand Up @@ -115,6 +115,15 @@ def update(self, product: Product,
self.by_name[persisted.name] = persisted
return cast(Product, self.get_by_name(product.name))

def delete(self, product: Product):
datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
if datasets:
self._index.datasets.purge([ds.id for ds in datasets]) # type: ignore[attr-defined]

if product.id is not None:
del self.by_id[product.id]
del self.by_name[product.name]

def get_unsafe(self, id_: int) -> Product:
return self.clone(self.by_id[id_])

Expand Down
3 changes: 3 additions & 0 deletions datacube/index/null/_products.py
Expand Up @@ -23,6 +23,9 @@ def can_update(self, product, allow_unsafe_updates=False, allow_table_lock=False
def update(self, product: Product, allow_unsafe_updates=False, allow_table_lock=False):
raise NotImplementedError()

def delete(self, product: Product):
raise NotImplementedError()

def get_unsafe(self, id_):
raise KeyError(id_)

Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -414,7 +414,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status

Expand Down Expand Up @@ -922,7 +922,7 @@ class DatasetLight(result_type): # type: ignore
__slots__ = ()

with self._db_connection() as connection:
results = connection.search_unique_datasets(
results = connection.search_datasets(
query_exprs,
select_fields=select_fields,
limit=limit,
Expand Down
18 changes: 16 additions & 2 deletions datacube/index/postgis/_products.py
Expand Up @@ -234,6 +234,20 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo
allow_table_lock=allow_table_lock,
)

def delete(self, product: Product):
"""
Delete a Product, as well as all related datasets

:param product: the Proudct to delete
"""
# First find and delete all related datasets
product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
self._index.datasets.purge([ds.id for ds in product_datasets]) # type: ignore[attr-defined]

# Now we can safely delete the Product
with self._db_connection() as conn:
conn.delete_product(product.name)

# This is memoized in the constructor
# pylint: disable=method-hidden
def get_unsafe(self, id_): # type: ignore
Expand Down Expand Up @@ -313,8 +327,8 @@ def search_by_metadata(self, metadata):
:rtype: list[Product]
"""
with self._db_connection() as connection:
for dataset in self._make_many(connection.search_products_by_metadata(metadata)):
yield dataset
for product in self._make_many(connection.search_products_by_metadata(metadata)):
yield product

def get_all(self) -> Iterable[Product]:
"""
Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgres/_datasets.py
Expand Up @@ -385,7 +385,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status

Expand Down Expand Up @@ -748,7 +748,7 @@ def _get_dataset_types(self, q):

def _get_product_queries(self, query):
for product, q in self.products.search_robust(**query):
q['dataset_type_id'] = product.id
q['product_id'] = product.id
yield q, product

# pylint: disable=too-many-locals
Expand Down