Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Product CLI Command #1583

Merged
merged 10 commits into from May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 11 additions & 31 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -464,15 +464,15 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(Dataset.id)
if archived:
query = query.where(
Dataset.archived != None
Dataset.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
Dataset.archived == None
Dataset.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -732,33 +732,6 @@ def insert_lineage_bulk(self, values):
)
return res.rowcount, requested - res.rowcount

@staticmethod
def search_unique_datasets_query(expressions, select_fields, limit, archived: bool | None = False):
"""
'unique' here refer to that the query results do not contain datasets
having the same 'id' more than once.

We are not dealing with dataset_source table here and we are not joining
dataset table with dataset_location table. We are aggregating stuff
in dataset_location per dataset basis if required. It returns the constructed
query.
"""
# TODO
raise NotImplementedError()

def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.

'unique' here refer to that the results do not contain datasets having the same 'id'
more than once. we achieve this by not allowing dataset table to join with
dataset_location or dataset_source tables. Joining with other tables would not
result in multiple records per dataset due to the direction of cardinality.
"""
select_query = self.search_unique_datasets_query(expressions, select_fields, limit, archived=archived)

return self._connection.execute(select_query)

def get_duplicates(self, match_fields: Sequence[PgField], expressions: Sequence[PgExpression]) -> Iterable[Row]:
# TODO
if "time" in [f.name for f in match_fields]:
Expand Down Expand Up @@ -1106,6 +1079,13 @@ def update_product(self,

return prod_id

def delete_product(self, name):
res = self._connection.execute(
delete(Product).returning(Product.id).where(Product.name == name)
)

return res.first()[0]

def insert_metadata_type(self, name, definition):
res = self._connection.execute(
insert(MetadataType).values(
Expand Down
140 changes: 34 additions & 106 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -18,7 +18,7 @@
import uuid # noqa: F401
from typing import Iterable, Any
from typing import cast as type_cast
from sqlalchemy import cast, String, Label, Table, FromClause
from sqlalchemy import cast, String, Label, FromClause
from sqlalchemy import delete, column, values
from sqlalchemy import select, text, bindparam, and_, or_, func, literal, distinct
from sqlalchemy.dialects.postgresql import INTERVAL
Expand Down Expand Up @@ -108,9 +108,9 @@ def get_native_fields() -> dict[str, NativeField]:
'Product name',
PRODUCT.c.name
),
'dataset_type_id': NativeField(
'dataset_type_id',
'ID of a dataset type',
'product_id': NativeField(
'product_id',
'ID of a product',
DATASET.c.dataset_type_ref
Ariana-B marked this conversation as resolved.
Show resolved Hide resolved
),
'metadata_type': NativeField(
Expand Down Expand Up @@ -344,19 +344,19 @@ def get_datasets_for_location(self, uri, mode=None):
)
).fetchall()

def all_dataset_ids(self, archived: bool):
def all_dataset_ids(self, archived: bool | None = False):
query = select(
DATASET.c.id # type: ignore[arg-type]
).select_from(
DATASET
)
if archived:
query = query.where(
DATASET.c.archived != None
DATASET.c.archived.is_not(None)
)
else:
elif archived is not None:
query = query.where(
DATASET.c.archived == None
DATASET.c.archived.is_(None)
)
return self._connection.execute(query).fetchall()

Expand Down Expand Up @@ -476,7 +476,7 @@ def get_dataset_sources(self, dataset_id):

return self._connection.execute(query).fetchall()

def search_datasets_by_metadata(self, metadata, archived: bool | None):
def search_datasets_by_metadata(self, metadata, archived: bool | None = False):
"""
Find any datasets that have the given metadata.

Expand Down Expand Up @@ -722,90 +722,6 @@ def insert_lineage_bulk(self, vals):
res = self._connection.execute(query)
return res.rowcount, requested - res.rowcount

@staticmethod
def search_unique_datasets_query(expressions, select_fields, limit, archived: bool | None = False):
"""
'unique' here refer to that the query results do not contain datasets
having the same 'id' more than once.

We are not dealing with dataset_source table here and we are not joining
dataset table with dataset_location table. We are aggregating stuff
in dataset_location per dataset basis if required. It returns the construted
query.
"""

# expressions involving DATASET_SOURCE cannot not done for now
for expression in expressions:
assert expression.field.required_alchemy_table != DATASET_SOURCE, \
'Joins with dataset_source cannot be done for this query'

# expressions involving 'uri' and 'uris' will be handled different
expressions = [expression for expression in expressions
if expression.field.required_alchemy_table != DATASET_LOCATION]

if select_fields:
select_columns: list[Label[Any] | Table] = []
for field in select_fields:
if field.name in {'uri', 'uris'}:
# All active URIs, from newest to oldest
uris_field = func.array(
select(
_dataset_uri_field(SELECTED_DATASET_LOCATION)
).where(
and_(
SELECTED_DATASET_LOCATION.c.dataset_ref == DATASET.c.id,
SELECTED_DATASET_LOCATION.c.archived == None
)
).order_by(
SELECTED_DATASET_LOCATION.c.added.desc(),
SELECTED_DATASET_LOCATION.c.id.desc()
).label('uris')
).label('uris')
select_columns.append(uris_field)
else:
select_columns.append(field.alchemy_expression.label(field.name))
else:
select_columns = list(_DATASET_SELECT_FIELDS)

raw_expressions = PostgresDbAPI._alchemify_expressions(expressions)

# We don't need 'DATASET_LOCATION table in the from expression
select_fields_ = [field for field in select_fields if field.name not in {'uri', 'uris'}]

from_expression = PostgresDbAPI._from_expression(DATASET, expressions, select_fields_)
if archived:
where_expr = and_(DATASET.c.archived.is_not(None), *raw_expressions)
elif archived is not None:
where_expr = and_(DATASET.c.archived.is_(None), *raw_expressions)
if archived:
where_expr = and_(*raw_expressions)

return (
select(
*select_columns
).select_from(
from_expression
).where(
where_expr
).limit(
limit
)
)

def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False):
"""
Processes a search query without duplicating datasets.

'unique' here refer to that the results do not contain datasets having the same 'id'
more than once. we achieve this by not allowing dataset table to join with
dataset_location or dataset_source tables. Joining with other tables would not
result in multiple records per dataset due to the direction of cardinality.
"""

select_query = self.search_unique_datasets_query(expressions, select_fields, limit, archived=archived)

return self._connection.execute(select_query)

def get_duplicates(self, match_fields: Iterable[Field], expressions: Iterable[Expression]) -> Iterable[Row]:
if "time" in [f.name for f in match_fields]:
return self.get_duplicates_with_time(match_fields, expressions)
Expand Down Expand Up @@ -1066,6 +982,19 @@ def update_product(self,
rebuild_view=True)
return type_id

def delete_product(self, name, fields, definition):
res = self._connection.execute(
PRODUCT.delete().returning(PRODUCT.c.id).where(
PRODUCT.c.name == name
)
)
type_id = res.first()[0]

# Update dynamic fields to remove deleted product fields
self._setup_product_fields(type_id, name, fields, definition['metadata'], concurrently=False, delete=True)

return type_id

def insert_metadata_type(self, name, definition, concurrently=False):
res = self._connection.execute(
METADATA_TYPE.insert().values(
Expand All @@ -1075,9 +1004,8 @@ def insert_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.inserted_primary_key[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields, concurrently=concurrently
type_id, name, definition, concurrently=concurrently
)

def update_metadata_type(self, name, definition, concurrently=False):
Expand All @@ -1091,9 +1019,8 @@ def update_metadata_type(self, name, definition, concurrently=False):
)
type_id = res.first()[0]

search_fields = get_dataset_fields(definition)
self._setup_metadata_type_fields(
type_id, name, search_fields,
type_id, name, definition,
concurrently=concurrently,
rebuild_views=True,
)
Expand All @@ -1103,24 +1030,21 @@ def update_metadata_type(self, name, definition, concurrently=False):
def check_dynamic_fields(self, concurrently=False, rebuild_views=False, rebuild_indexes=False):
_LOG.info('Checking dynamic views/indexes. (rebuild views=%s, indexes=%s)', rebuild_views, rebuild_indexes)

search_fields = {}

for metadata_type in self.get_all_metadata_types():
fields = get_dataset_fields(metadata_type.definition)
search_fields[metadata_type.id] = fields
self._setup_metadata_type_fields(
metadata_type.id,
metadata_type.name,
fields,
metadata_type.definition,
rebuild_indexes=rebuild_indexes,
rebuild_views=rebuild_views,
concurrently=concurrently,
)

def _setup_metadata_type_fields(self, id_, name, fields,
def _setup_metadata_type_fields(self, id_, name, definition,
rebuild_indexes=False, rebuild_views=False, concurrently=True):
# Metadata fields are no longer used (all queries are per-dataset-type): exclude all.
# This will have the effect of removing any old indexes that still exist.
fields = get_dataset_fields(definition)
exclude_fields = tuple(fields)

dataset_filter = and_(DATASET.c.archived == None, DATASET.c.metadata_type_ref == id_)
Expand All @@ -1140,13 +1064,17 @@ def _setup_metadata_type_fields(self, id_, name, fields,
)

def _setup_product_fields(self, id_, name, fields, metadata_doc,
rebuild_indexes=False, rebuild_view=False, concurrently=True):
rebuild_indexes=False, rebuild_view=False, concurrently=True, delete=False):
dataset_filter = and_(DATASET.c.archived == None, DATASET.c.dataset_type_ref == id_)
excluded_field_names = tuple(self._get_active_field_names(fields, metadata_doc))
if delete:
excluded_field_names = [field.name for field in fields.values()]
else:
excluded_field_names = tuple(self._get_active_field_names(fields, metadata_doc))

dynamic.check_dynamic_fields(self._connection, concurrently, dataset_filter,
excluded_field_names, fields, name,
rebuild_indexes=rebuild_indexes, rebuild_view=rebuild_view)
rebuild_indexes=rebuild_indexes, rebuild_view=rebuild_view,
delete_view=delete)

@staticmethod
def _get_active_field_names(fields, metadata_doc):
Expand Down
9 changes: 6 additions & 3 deletions datacube/drivers/postgres/_dynamic.py
Expand Up @@ -32,7 +32,7 @@ def contains_all(d_, *keys):
return all([d_.get(key) for key in keys])


def _ensure_view(conn, fields, name, replace_existing, where_expression):
def _ensure_view(conn, fields, name, replace_existing, where_expression, delete=False):
"""
Ensure a view exists for the given fields
"""
Expand Down Expand Up @@ -62,6 +62,9 @@ def _ensure_view(conn, fields, name, replace_existing, where_expression):
).where(where_expression)
)
)
elif exists and delete:
_LOG.debug(f"Dropping view: {view_name}")
conn.execute(text(f'drop view {view_name}'))
else:
_LOG.debug('View exists: %s (replace=%r)', view_name, replace_existing)
legacy_name = schema_qualified('{}_dataset'.format(name))
Expand All @@ -71,7 +74,7 @@ def _ensure_view(conn, fields, name, replace_existing, where_expression):


def check_dynamic_fields(conn, concurrently, dataset_filter, excluded_field_names, fields, name,
rebuild_indexes=False, rebuild_view=False):
rebuild_indexes=False, rebuild_view=False, delete_view=False):
"""
Check that we have expected indexes and views for the given fields
"""
Expand Down Expand Up @@ -114,7 +117,7 @@ def check_dynamic_fields(conn, concurrently, dataset_filter, excluded_field_name
replace_existing=rebuild_indexes,
)
# A view of all fields
_ensure_view(conn, fields, name, rebuild_view, dataset_filter)
_ensure_view(conn, fields, name, rebuild_view, dataset_filter, delete_view)


def _check_field_index(conn, fields, name_prefix, filter_expression,
Expand Down
24 changes: 20 additions & 4 deletions datacube/index/abstract.py
Expand Up @@ -587,7 +587,7 @@ def can_update(self,

@abstractmethod
def update(self,
metadata_type: Product,
product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False
) -> Product:
Expand All @@ -597,7 +597,7 @@ def update(self,
(An unsafe change is anything that may potentially make the product
incompatible with existing datasets of that type)

:param metadata_type: Product model with unpersisted updates
:param product: Product model with unpersisted updates
:param allow_unsafe_updates: Allow unsafe changes. Use with caution.
:param allow_table_lock:
Allow an exclusive lock to be taken on the table while creating the indexes.
Expand Down Expand Up @@ -641,6 +641,20 @@ def add_document(self, definition: JsonDict) -> Product:
type_ = self.from_doc(definition)
return self.add(type_)

@abstractmethod
def delete(self, products: Iterable[Product], allow_delete_active: bool = False) -> Sequence[Product]:
"""
Delete the specified products.

:param products: Products to be deleted
:param bool allow_delete_active:
Whether to allow the deletion of a Product with active datasets
(and thereby said active datasets). Use with caution.

If false (default), will error if a Product has active datasets.
:return: list of deleted Products
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the signature should be:

delete(self, product: Product, delete_archived_only: bool = True)

raise an exception if there exist archived datasets and delete_archived_only is true.

(Edit: i.e. move this logic up from the CLI)

Add an exception to datacube.index.exceptions - and impose an IndexException superclass while you're there.

Also catch the low level and wrap it in an IndexException if deleting the product (or dataset) throws a database error - rather than just letting the low level exception bubble up as dataset.purge does currently.

def get(self, id_: int) -> Product | None:
"""
Fetch product by id.
Expand Down Expand Up @@ -1398,11 +1412,13 @@ def restore(self, ids: Iterable[DSID]) -> None:
"""

@abstractmethod
def purge(self, ids: Iterable[DSID]) -> None:
def purge(self, ids: Iterable[DSID], allow_delete_active: bool = False) -> Sequence[DSID]:
"""
Delete archived datasets
Delete datasets

:param ids: iterable of dataset ids to purge
:param allow_delete_active: if false, only archived datasets can be deleted
:return: list of purged dataset ids
"""

@abstractmethod
Expand Down