Skip to content

Commit

Permalink
add product delete method & cli command, plus misc surrounding cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Ariana Barzinpour committed Apr 24, 2024
1 parent 93e1d01 commit e53839b
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 47 deletions.
21 changes: 17 additions & 4 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from typing import Iterable, Sequence, Optional, Set, Any
from typing import cast as type_cast
Expand All @@ -35,6 +36,7 @@
from datacube.utils.uris import split_uri
from datacube.index.abstract import DSID
from datacube.model.lineage import LineageRelation, LineageDirection
from datacube.migration import ODC2DeprecationWarning
from . import _core
from ._fields import parse_fields, Expression, PgField, PgExpression, DateRangeDocField # noqa: F401
from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue
Expand Down Expand Up @@ -464,15 +466,15 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(Dataset.id)
if archived:
query = query.where(
Dataset.archived != None
Dataset.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
Dataset.archived == None
Dataset.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -746,6 +748,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
# TODO
raise NotImplementedError()

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1106,6 +1112,13 @@ def update_product(self,

return prod_id

def delete_product(self, name):
res = self._connection.execute(
delete(Product).returning(Product.id).where(Product.name == name)
)

return res.first()[0]

def insert_metadata_type(self, name, definition):
res = self._connection.execute(
insert(MetadataType).values(
Expand Down
49 changes: 31 additions & 18 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -25,11 +25,13 @@
from sqlalchemy.dialects.postgresql import JSONB, insert, UUID
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
from deprecat import deprecat

from datacube.index.exceptions import MissingRecordError
from datacube.index.fields import Field, Expression, OrExpression
from datacube.model import Range
from datacube.utils.uris import split_uri
from datacube.migration import ODC2DeprecationWarning
from . import _core
from . import _dynamic as dynamic
from ._fields import parse_fields, PgField, PgExpression, DateRangeDocField # noqa: F401
Expand Down Expand Up @@ -108,9 +110,9 @@ def get_native_fields() -> dict[str, NativeField]:
'Product name',
PRODUCT.c.name
),
'dataset_type_id': NativeField(
'dataset_type_id',
'ID of a dataset type',
'product_id': NativeField(
'product_id',
'ID of a product',
DATASET.c.dataset_type_ref
),
'metadata_type': NativeField(
Expand Down Expand Up @@ -344,19 +346,19 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(
DATASET.c.id # type: ignore[arg-type]
).select_from(
DATASET
)
if archived:
query = query.where(
DATASET.c.archived != None
DATASET.c.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
DATASET.c.archived == None
DATASET.c.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -476,7 +478,7 @@ def get_dataset_sources(self, dataset_id):

return self._connection.execute(query).fetchall()

def search_datasets_by_metadata(self, metadata, archived: bool | None):
def search_datasets_by_metadata(self, metadata, archived: bool | None = False):
"""
Find any datasets that have the given metadata.
Expand Down Expand Up @@ -792,6 +794,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo
)
)

@deprecat(
reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.",
version='1.9.0',
category=ODC2DeprecationWarning)
def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.
Expand Down Expand Up @@ -1066,6 +1072,18 @@ def update_product(self,
rebuild_view=True)
return type_id

def delete_product(self, name, mt_id, mt_name, mt_def):
res = self._connection.execute(
PRODUCT.delete().returning(PRODUCT.c.id).where(
PRODUCT.c.name == name
)
)

# Update metadata type fields to remove deleted product fields
self._setup_metadata_type_fields(mt_id, mt_name, mt_def, rebuild_indexes=True, rebuild_views=True)

return res.first()[0]

def insert_metadata_type(self, name, definition, concurrently=False):
res = self._connection.execute(
METADATA_TYPE.insert().values(
Expand All @@ -1075,9 +1093,8 @@ def insert_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.inserted_primary_key[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields, concurrently=concurrently
type_id, name, definition, concurrently=concurrently
)

def update_metadata_type(self, name, definition, concurrently=False):
Expand All @@ -1091,9 +1108,8 @@ def update_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.first()[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields,
type_id, name, definition,
concurrently=concurrently,
rebuild_views=True,
)
Expand All @@ -1103,24 +1119,21 @@ def update_metadata_type(self, name, definition, concurrently=False):
def check_dynamic_fields(self, concurrently=False, rebuild_views=False, rebuild_indexes=False):
_LOG.info('Checking dynamic views/indexes. (rebuild views=%s, indexes=%s)', rebuild_views, rebuild_indexes)

search_fields = {}

for metadata_type in self.get_all_metadata_types():
fields = get_dataset_fields(metadata_type.definition)
search_fields[metadata_type.id] = fields
self._setup_metadata_type_fields(
metadata_type.id,
metadata_type.name,
fields,
metadata_type.definition,
rebuild_indexes=rebuild_indexes,
rebuild_views=rebuild_views,
concurrently=concurrently,
)

def _setup_metadata_type_fields(self, id_, name, fields,
def _setup_metadata_type_fields(self, id_, name, definition,
rebuild_indexes=False, rebuild_views=False, concurrently=True):
# Metadata fields are no longer used (all queries are per-dataset-type): exclude all.
# This will have the effect of removing any old indexes that still exist.
fields = get_dataset_fields(definition)
exclude_fields = tuple(fields)

dataset_filter = and_(DATASET.c.archived == None, DATASET.c.metadata_type_ref == id_)
Expand Down
12 changes: 10 additions & 2 deletions datacube/index/abstract.py
Expand Up @@ -587,7 +587,7 @@ def can_update(self,

@abstractmethod
def update(self,
metadata_type: Product,
product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False
) -> Product:
Expand All @@ -597,7 +597,7 @@ def update(self,
(An unsafe change is anything that may potentially make the product
incompatible with existing datasets of that type)
:param metadata_type: Product model with unpersisted updates
:param product: Product model with unpersisted updates
:param allow_unsafe_updates: Allow unsafe changes. Use with caution.
:param allow_table_lock:
Allow an exclusive lock to be taken on the table while creating the indexes.
Expand Down Expand Up @@ -641,6 +641,14 @@ def add_document(self, definition: JsonDict) -> Product:
type_ = self.from_doc(definition)
return self.add(type_)

@abstractmethod
def delete(self, product: Product):
"""
Delete the specified product.
:param product: Product to be deleted
"""

def get(self, id_: int) -> Product | None:
"""
Fetch product by id.
Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -414,7 +414,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status
Expand Down Expand Up @@ -922,7 +922,7 @@ class DatasetLight(result_type): # type: ignore
__slots__ = ()

with self._db_connection() as connection:
results = connection.search_unique_datasets(
results = connection.search_datasets(
query_exprs,
select_fields=select_fields,
limit=limit,
Expand Down
18 changes: 16 additions & 2 deletions datacube/index/postgis/_products.py
Expand Up @@ -234,6 +234,20 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo
allow_table_lock=allow_table_lock,
)

def delete(self, product: Product):
"""
Delete a Product, as well as all related datasets
:param product: the Proudct to delete
"""
# First find and delete all related datasets
product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
self._index.datasets.purge([ds.id for ds in product_datasets])

# Now we can safely delete the Product
with self._db_connection() as conn:
conn.delete_product(product.name)

# This is memoized in the constructor
# pylint: disable=method-hidden
def get_unsafe(self, id_): # type: ignore
Expand Down Expand Up @@ -313,8 +327,8 @@ def search_by_metadata(self, metadata):
:rtype: list[Product]
"""
with self._db_connection() as connection:
for dataset in self._make_many(connection.search_products_by_metadata(metadata)):
yield dataset
for product in self._make_many(connection.search_products_by_metadata(metadata)):
yield product

def get_all(self) -> Iterable[Product]:
"""
Expand Down
4 changes: 2 additions & 2 deletions datacube/index/postgres/_datasets.py
Expand Up @@ -385,7 +385,7 @@ def purge(self, ids: Iterable[DSID]):
for id_ in ids:
transaction.delete_dataset(id_)

def get_all_dataset_ids(self, archived: bool):
def get_all_dataset_ids(self, archived: bool | None = False):
"""
Get list of all dataset IDs based only on archived status
Expand Down Expand Up @@ -748,7 +748,7 @@ def _get_dataset_types(self, q):

def _get_product_queries(self, query):
for product, q in self.products.search_robust(**query):
q['dataset_type_id'] = product.id
q['product_id'] = product.id
yield q, product

# pylint: disable=too-many-locals
Expand Down
28 changes: 25 additions & 3 deletions datacube/index/postgres/_products.py
Expand Up @@ -187,7 +187,8 @@ def update(self, product: Product, allow_unsafe_updates=False, allow_table_lock=
# name, field.sql_expression, new_field.sql_expression
# )
# )
metadata_type = cast(MetadataType, self._index.metadata_types.get_by_name(product.metadata_type.name))
# metadata_type = cast(MetadataType, self._index.metadata_types.get_by_name(product.metadata_type.name))
metadata_type = product.metadata_type
# Given we cannot change metadata type because of the check above, and this is an
# update method, the metadata type is guaranteed to already exist.
with self._db_connection(transaction=allow_table_lock) as conn:
Expand Down Expand Up @@ -225,6 +226,27 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo
allow_table_lock=allow_table_lock,
)

def delete(self, product: Product):
"""
Delete a Product, as well as all related datasets
:param product: the Proudct to delete
"""
# First find and delete all related datasets
product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name)
self._index.datasets.purge([ds.id for ds in product_datasets])

# Now we can safely delete the Product
# Pass along metadata type information as well to update indexes/views
with self._db_connection(transaction=True) as conn:
mt = product.metadata_type
conn.delete_product(
name=product.name,
mt_id=mt.id,
mt_name=mt.name,
mt_def=mt.definition
)

# This is memoized in the constructor
# pylint: disable=method-hidden
def get_unsafe(self, id_): # type: ignore
Expand Down Expand Up @@ -301,8 +323,8 @@ def search_by_metadata(self, metadata):
:rtype: list[Product]
"""
with self._db_connection() as connection:
for dataset in self._make_many(connection.search_products_by_metadata(metadata)):
yield dataset
for product in self._make_many(connection.search_products_by_metadata(metadata)):
yield product

def get_all(self) -> Iterable[Product]:
"""
Expand Down
20 changes: 10 additions & 10 deletions datacube/scripts/dataset.py
Expand Up @@ -610,20 +610,20 @@ def purge_cmd(index: Index, dry_run: bool, all_ds: bool, ids: List[str]):
sys.exit(1)

if all_ds:
datasets_for_archive = {dsid: True for dsid in index.datasets.get_all_dataset_ids(archived=True)}
datasets_for_purge = {dsid: True for dsid in index.datasets.get_all_dataset_ids(archived=True)}
else:
datasets_for_archive = {UUID(dataset_id): exists
for dataset_id, exists in zip(ids, index.datasets.bulk_has(ids))}
datasets_for_purge = {UUID(dataset_id): exists
for dataset_id, exists in zip(ids, index.datasets.bulk_has(ids))}

# Check for non-existent datasets
if False in datasets_for_archive.values():
for dataset_id, exists in datasets_for_archive.items():
if False in datasets_for_purge.values():
for dataset_id, exists in datasets_for_purge.items():
if not exists:
click.echo(f'No dataset found with id: {dataset_id}')
sys.exit(-1)

# Check for unarchived datasets
datasets = index.datasets.bulk_get(datasets_for_archive.keys())
datasets = index.datasets.bulk_get(datasets_for_purge.keys())
unarchived_datasets = False
for d in datasets:
if not d.is_archived:
Expand All @@ -632,15 +632,15 @@ def purge_cmd(index: Index, dry_run: bool, all_ds: bool, ids: List[str]):
if unarchived_datasets:
sys.exit(-1)

for dataset in datasets_for_archive.keys():
for dataset in datasets_for_purge.keys():
click.echo(f'Purging dataset: {dataset}')

if not dry_run:
# Perform purge
index.datasets.purge(datasets_for_archive.keys())
click.echo(f'{len(datasets_for_archive)} datasets purged')
index.datasets.purge(datasets_for_purge.keys())
click.echo(f'{len(datasets_for_purge)} datasets purged')
else:
click.echo(f'{len(datasets_for_archive)} datasets not purged (dry run)')
click.echo(f'{len(datasets_for_purge)} datasets not purged (dry run)')

click.echo('Completed dataset purge.')

Expand Down

0 comments on commit e53839b

Please sign in to comment.