diff --git a/datacube/drivers/postgis/_api.py b/datacube/drivers/postgis/_api.py index 6fa806f9d..eef55e774 100644 --- a/datacube/drivers/postgis/_api.py +++ b/datacube/drivers/postgis/_api.py @@ -25,6 +25,7 @@ from sqlalchemy.dialects.postgresql import INTERVAL from sqlalchemy.exc import IntegrityError from sqlalchemy.engine import Row +from deprecat import deprecat from typing import Iterable, Sequence, Optional, Set, Any from typing import cast as type_cast @@ -35,6 +36,7 @@ from datacube.utils.uris import split_uri from datacube.index.abstract import DSID from datacube.model.lineage import LineageRelation, LineageDirection +from datacube.migration import ODC2DeprecationWarning from . import _core from ._fields import parse_fields, Expression, PgField, PgExpression, DateRangeDocField # noqa: F401 from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue @@ -464,15 +466,15 @@ def get_datasets_for_location(self, uri, mode=None): ) ).fetchall() - def all_dataset_ids(self, archived: bool): + def all_dataset_ids(self, archived: bool | None = False): query = select(Dataset.id) if archived: query = query.where( - Dataset.archived != None + Dataset.archived.is_not(None) ) - else: + elif archived is not None: query = query.where( - Dataset.archived == None + Dataset.archived.is_(None) ) return self._connection.execute(query).fetchall() @@ -746,6 +748,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo # TODO raise NotImplementedError() + @deprecat( + reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.", + version='1.9.0', + category=ODC2DeprecationWarning) def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False): """ Processes a search query without duplicating datasets. @@ -1106,6 +1112,13 @@ def update_product(self, return prod_id + def delete_product(self, name): + res = self._connection.execute( + delete(Product).returning(Product.id).where(Product.name == name) + ) + + return res.first()[0] + def insert_metadata_type(self, name, definition): res = self._connection.execute( insert(MetadataType).values( diff --git a/datacube/drivers/postgres/_api.py b/datacube/drivers/postgres/_api.py index 186526431..16c65acd1 100644 --- a/datacube/drivers/postgres/_api.py +++ b/datacube/drivers/postgres/_api.py @@ -25,11 +25,13 @@ from sqlalchemy.dialects.postgresql import JSONB, insert, UUID from sqlalchemy.exc import IntegrityError from sqlalchemy.engine import Row +from deprecat import deprecat from datacube.index.exceptions import MissingRecordError from datacube.index.fields import Field, Expression, OrExpression from datacube.model import Range from datacube.utils.uris import split_uri +from datacube.migration import ODC2DeprecationWarning from . import _core from . import _dynamic as dynamic from ._fields import parse_fields, PgField, PgExpression, DateRangeDocField # noqa: F401 @@ -108,9 +110,9 @@ def get_native_fields() -> dict[str, NativeField]: 'Product name', PRODUCT.c.name ), - 'dataset_type_id': NativeField( - 'dataset_type_id', - 'ID of a dataset type', + 'product_id': NativeField( + 'product_id', + 'ID of a product', DATASET.c.dataset_type_ref ), 'metadata_type': NativeField( @@ -344,7 +346,7 @@ def get_datasets_for_location(self, uri, mode=None): ) ).fetchall() - def all_dataset_ids(self, archived: bool): + def all_dataset_ids(self, archived: bool | None = False): query = select( DATASET.c.id # type: ignore[arg-type] ).select_from( @@ -352,11 +354,11 @@ def all_dataset_ids(self, archived: bool): ) if archived: query = query.where( - DATASET.c.archived != None + DATASET.c.archived.is_not(None) ) - else: + elif archived is not None: query = query.where( - DATASET.c.archived == None + DATASET.c.archived.is_(None) ) return self._connection.execute(query).fetchall() @@ -476,7 +478,7 @@ def get_dataset_sources(self, dataset_id): return self._connection.execute(query).fetchall() - def search_datasets_by_metadata(self, metadata, archived: bool | None): + def search_datasets_by_metadata(self, metadata, archived: bool | None = False): """ Find any datasets that have the given metadata. @@ -792,6 +794,10 @@ def search_unique_datasets_query(expressions, select_fields, limit, archived: bo ) ) + @deprecat( + reason="This method is unnecessary as multiple locations have been deprecated. Use search_datasets instead.", + version='1.9.0', + category=ODC2DeprecationWarning) def search_unique_datasets(self, expressions, select_fields=None, limit=None, archived: bool | None = False): """ Processes a search query without duplicating datasets. @@ -1066,6 +1072,18 @@ def update_product(self, rebuild_view=True) return type_id + def delete_product(self, name, mt_id, mt_name, mt_def): + res = self._connection.execute( + PRODUCT.delete().returning(PRODUCT.c.id).where( + PRODUCT.c.name == name + ) + ) + + # Update metadata type fields to remove deleted product fields + self._setup_metadata_type_fields(mt_id, mt_name, mt_def, rebuild_indexes=True, rebuild_views=True) + + return res.first()[0] + def insert_metadata_type(self, name, definition, concurrently=False): res = self._connection.execute( METADATA_TYPE.insert().values( @@ -1075,9 +1093,8 @@ def insert_metadata_type(self, name, definition, concurrently=False): ) type_id = res.inserted_primary_key[0] - search_fields = get_dataset_fields(definition) self._setup_metadata_type_fields( - type_id, name, search_fields, concurrently=concurrently + type_id, name, definition, concurrently=concurrently ) def update_metadata_type(self, name, definition, concurrently=False): @@ -1091,9 +1108,8 @@ def update_metadata_type(self, name, definition, concurrently=False): ) type_id = res.first()[0] - search_fields = get_dataset_fields(definition) self._setup_metadata_type_fields( - type_id, name, search_fields, + type_id, name, definition, concurrently=concurrently, rebuild_views=True, ) @@ -1103,24 +1119,21 @@ def update_metadata_type(self, name, definition, concurrently=False): def check_dynamic_fields(self, concurrently=False, rebuild_views=False, rebuild_indexes=False): _LOG.info('Checking dynamic views/indexes. (rebuild views=%s, indexes=%s)', rebuild_views, rebuild_indexes) - search_fields = {} - for metadata_type in self.get_all_metadata_types(): - fields = get_dataset_fields(metadata_type.definition) - search_fields[metadata_type.id] = fields self._setup_metadata_type_fields( metadata_type.id, metadata_type.name, - fields, + metadata_type.definition, rebuild_indexes=rebuild_indexes, rebuild_views=rebuild_views, concurrently=concurrently, ) - def _setup_metadata_type_fields(self, id_, name, fields, + def _setup_metadata_type_fields(self, id_, name, definition, rebuild_indexes=False, rebuild_views=False, concurrently=True): # Metadata fields are no longer used (all queries are per-dataset-type): exclude all. # This will have the effect of removing any old indexes that still exist. + fields = get_dataset_fields(definition) exclude_fields = tuple(fields) dataset_filter = and_(DATASET.c.archived == None, DATASET.c.metadata_type_ref == id_) diff --git a/datacube/index/abstract.py b/datacube/index/abstract.py index 74afaf11e..8ceb109df 100644 --- a/datacube/index/abstract.py +++ b/datacube/index/abstract.py @@ -587,7 +587,7 @@ def can_update(self, @abstractmethod def update(self, - metadata_type: Product, + product: Product, allow_unsafe_updates: bool = False, allow_table_lock: bool = False ) -> Product: @@ -597,7 +597,7 @@ def update(self, (An unsafe change is anything that may potentially make the product incompatible with existing datasets of that type) - :param metadata_type: Product model with unpersisted updates + :param product: Product model with unpersisted updates :param allow_unsafe_updates: Allow unsafe changes. Use with caution. :param allow_table_lock: Allow an exclusive lock to be taken on the table while creating the indexes. @@ -641,6 +641,14 @@ def add_document(self, definition: JsonDict) -> Product: type_ = self.from_doc(definition) return self.add(type_) + @abstractmethod + def delete(self, product: Product): + """ + Delete the specified product. + + :param product: Product to be deleted + """ + def get(self, id_: int) -> Product | None: """ Fetch product by id. diff --git a/datacube/index/postgis/_datasets.py b/datacube/index/postgis/_datasets.py index d220da2eb..3301cfa55 100755 --- a/datacube/index/postgis/_datasets.py +++ b/datacube/index/postgis/_datasets.py @@ -414,7 +414,7 @@ def purge(self, ids: Iterable[DSID]): for id_ in ids: transaction.delete_dataset(id_) - def get_all_dataset_ids(self, archived: bool): + def get_all_dataset_ids(self, archived: bool | None = False): """ Get list of all dataset IDs based only on archived status @@ -922,7 +922,7 @@ class DatasetLight(result_type): # type: ignore __slots__ = () with self._db_connection() as connection: - results = connection.search_unique_datasets( + results = connection.search_datasets( query_exprs, select_fields=select_fields, limit=limit, diff --git a/datacube/index/postgis/_products.py b/datacube/index/postgis/_products.py index 7c33d7021..5aff42b6b 100644 --- a/datacube/index/postgis/_products.py +++ b/datacube/index/postgis/_products.py @@ -234,6 +234,20 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo allow_table_lock=allow_table_lock, ) + def delete(self, product: Product): + """ + Delete a Product, as well as all related datasets + + :param product: the Proudct to delete + """ + # First find and delete all related datasets + product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name) + self._index.datasets.purge([ds.id for ds in product_datasets]) + + # Now we can safely delete the Product + with self._db_connection() as conn: + conn.delete_product(product.name) + # This is memoized in the constructor # pylint: disable=method-hidden def get_unsafe(self, id_): # type: ignore @@ -313,8 +327,8 @@ def search_by_metadata(self, metadata): :rtype: list[Product] """ with self._db_connection() as connection: - for dataset in self._make_many(connection.search_products_by_metadata(metadata)): - yield dataset + for product in self._make_many(connection.search_products_by_metadata(metadata)): + yield product def get_all(self) -> Iterable[Product]: """ diff --git a/datacube/index/postgres/_datasets.py b/datacube/index/postgres/_datasets.py index 43b36aba0..a82e09fb3 100755 --- a/datacube/index/postgres/_datasets.py +++ b/datacube/index/postgres/_datasets.py @@ -385,7 +385,7 @@ def purge(self, ids: Iterable[DSID]): for id_ in ids: transaction.delete_dataset(id_) - def get_all_dataset_ids(self, archived: bool): + def get_all_dataset_ids(self, archived: bool | None = False): """ Get list of all dataset IDs based only on archived status @@ -748,7 +748,7 @@ def _get_dataset_types(self, q): def _get_product_queries(self, query): for product, q in self.products.search_robust(**query): - q['dataset_type_id'] = product.id + q['product_id'] = product.id yield q, product # pylint: disable=too-many-locals diff --git a/datacube/index/postgres/_products.py b/datacube/index/postgres/_products.py index 833e29106..b2966df5c 100644 --- a/datacube/index/postgres/_products.py +++ b/datacube/index/postgres/_products.py @@ -187,7 +187,8 @@ def update(self, product: Product, allow_unsafe_updates=False, allow_table_lock= # name, field.sql_expression, new_field.sql_expression # ) # ) - metadata_type = cast(MetadataType, self._index.metadata_types.get_by_name(product.metadata_type.name)) + # metadata_type = cast(MetadataType, self._index.metadata_types.get_by_name(product.metadata_type.name)) + metadata_type = product.metadata_type # Given we cannot change metadata type because of the check above, and this is an # update method, the metadata type is guaranteed to already exist. with self._db_connection(transaction=allow_table_lock) as conn: @@ -225,6 +226,27 @@ def update_document(self, definition, allow_unsafe_updates=False, allow_table_lo allow_table_lock=allow_table_lock, ) + def delete(self, product: Product): + """ + Delete a Product, as well as all related datasets + + :param product: the Proudct to delete + """ + # First find and delete all related datasets + product_datasets = self._index.datasets.search_returning(('id',), archived=None, product=product.name) + self._index.datasets.purge([ds.id for ds in product_datasets]) + + # Now we can safely delete the Product + # Pass along metadata type information as well to update indexes/views + with self._db_connection(transaction=True) as conn: + mt = product.metadata_type + conn.delete_product( + name=product.name, + mt_id=mt.id, + mt_name=mt.name, + mt_def=mt.definition + ) + # This is memoized in the constructor # pylint: disable=method-hidden def get_unsafe(self, id_): # type: ignore @@ -301,8 +323,8 @@ def search_by_metadata(self, metadata): :rtype: list[Product] """ with self._db_connection() as connection: - for dataset in self._make_many(connection.search_products_by_metadata(metadata)): - yield dataset + for product in self._make_many(connection.search_products_by_metadata(metadata)): + yield product def get_all(self) -> Iterable[Product]: """ diff --git a/datacube/scripts/dataset.py b/datacube/scripts/dataset.py index fe4d206bf..355acb898 100644 --- a/datacube/scripts/dataset.py +++ b/datacube/scripts/dataset.py @@ -610,20 +610,20 @@ def purge_cmd(index: Index, dry_run: bool, all_ds: bool, ids: List[str]): sys.exit(1) if all_ds: - datasets_for_archive = {dsid: True for dsid in index.datasets.get_all_dataset_ids(archived=True)} + datasets_for_purge = {dsid: True for dsid in index.datasets.get_all_dataset_ids(archived=True)} else: - datasets_for_archive = {UUID(dataset_id): exists - for dataset_id, exists in zip(ids, index.datasets.bulk_has(ids))} + datasets_for_purge = {UUID(dataset_id): exists + for dataset_id, exists in zip(ids, index.datasets.bulk_has(ids))} # Check for non-existent datasets - if False in datasets_for_archive.values(): - for dataset_id, exists in datasets_for_archive.items(): + if False in datasets_for_purge.values(): + for dataset_id, exists in datasets_for_purge.items(): if not exists: click.echo(f'No dataset found with id: {dataset_id}') sys.exit(-1) # Check for unarchived datasets - datasets = index.datasets.bulk_get(datasets_for_archive.keys()) + datasets = index.datasets.bulk_get(datasets_for_purge.keys()) unarchived_datasets = False for d in datasets: if not d.is_archived: @@ -632,15 +632,15 @@ def purge_cmd(index: Index, dry_run: bool, all_ds: bool, ids: List[str]): if unarchived_datasets: sys.exit(-1) - for dataset in datasets_for_archive.keys(): + for dataset in datasets_for_purge.keys(): click.echo(f'Purging dataset: {dataset}') if not dry_run: # Perform purge - index.datasets.purge(datasets_for_archive.keys()) - click.echo(f'{len(datasets_for_archive)} datasets purged') + index.datasets.purge(datasets_for_purge.keys()) + click.echo(f'{len(datasets_for_purge)} datasets purged') else: - click.echo(f'{len(datasets_for_archive)} datasets not purged (dry run)') + click.echo(f'{len(datasets_for_purge)} datasets not purged (dry run)') click.echo('Completed dataset purge.') diff --git a/datacube/scripts/product.py b/datacube/scripts/product.py index ea8c68232..27d58905b 100644 --- a/datacube/scripts/product.py +++ b/datacube/scripts/product.py @@ -131,6 +131,53 @@ def update_products(index: Index, allow_unsafe: bool, allow_exclusive_lock: bool sys.exit(failures) +@product_cli.command('delete', help="Delete products and all associated datasets") +@click.option( + '--force', is_flag=True, default=False, + help="Allow a product with active datasets to be deleted (default: false)" +) +@click.option('--dry-run', '-d', is_flag=True, default=False, + help='Check if everything is ok') +@click.argument('product_names', type=str, nargs=-1) +@ui.pass_index() +def delete_products(index: Index, force: bool, dry_run: bool, product_names: List): + """ + Delete products. + + An error will be thrown if the product has active datasets, unless the force option is provided. + """ + if not product_names: + print_help_msg(delete_products) + sys.exit(1) + + try: + products = [index.products.get_by_name_unsafe(name) for name in product_names] + except KeyError as e: + click.echo(str(e)) + sys.exit(1) + + # Check if any of the products have active datasets + active_product = False + for name in product_names: + active_ds = list(index.datasets.search_returning(('id',), archived=False, product=name)) + if len(active_ds): + click.echo(f"Product {name} has active datasets: {' '.join([str(ds.id) for ds in active_ds])}") + active_product = True + + if active_product: + if not force: + click.echo("Cannot delete products with active datasets. Use the --force option to delete anyway.") + sys.exit(1) + click.confirm("Warning: you will be deleting active datasets. Proceed?", abort=True) + if not dry_run: + for product in products: + index.products.delete(product) + else: + click.echo(f"{len(products)} products not deleted (dry run)") + + click.echo('Completed product deletion.') + + def _write_csv(products): product_dicts = [prod.to_dict() for prod in products] writer = csv.DictWriter(sys.stdout, ['id', 'name', 'description', diff --git a/integration_tests/index/test_config_docs.py b/integration_tests/index/test_config_docs.py index 6cfd1a828..dcbb758fb 100644 --- a/integration_tests/index/test_config_docs.py +++ b/integration_tests/index/test_config_docs.py @@ -10,12 +10,13 @@ import pytest import yaml from sqlalchemy import text +from unittest import mock from datacube.drivers.postgres._fields import NumericRangeDocField as PgrNumericRangeDocField, PgField as PgrPgField from datacube.drivers.postgis._fields import NumericRangeDocField as PgsNumericRangeDocField, PgField as PgsPgField from datacube.index import Index from datacube.index.abstract import default_metadata_type_docs -from datacube.model import MetadataType, DatasetType +from datacube.model import MetadataType, Product from datacube.model import Range, Not, Dataset from datacube.utils import changes from datacube.utils.documents import documents_equal @@ -260,9 +261,9 @@ def test_update_dataset(index, ls5_telem_doc, example_ls5_nbar_metadata_doc): @pytest.mark.parametrize('datacube_env_name', ('datacube', )) -def test_update_dataset_type(index, ls5_telem_type, ls5_telem_doc, ga_metadata_type_doc): +def test_update_product_type(index, ls5_telem_type, ls5_telem_doc, ga_metadata_type_doc): """ - :type ls5_telem_type: datacube.model.DatasetType + :type ls5_telem_type: datacube.model.Product :type index: datacube.index.Index """ assert index.products.get_by_name(ls5_telem_type.name) is not None @@ -314,7 +315,7 @@ def test_update_dataset_type(index, ls5_telem_type, ls5_telem_doc, ga_metadata_t def test_product_update_cli(index: Index, clirunner, - ls8_eo3_product: DatasetType, + ls8_eo3_product: Product, extended_eo3_product_doc: dict, extended_eo3_metadata_type: MetadataType, tmpdir) -> None: @@ -390,6 +391,58 @@ def get_current(index, product_doc): assert documents_equal(fresh, modified_doc) +def test_product_delete_cli(index: Index, + clirunner, + ls8_eo3_product: Product, + ls8_eo3_dataset, ls8_eo3_dataset2, ls8_eo3_dataset3, ls8_eo3_dataset4, + extended_eo3_metadata_type: MetadataType) -> None: + from pathlib import Path + TESTDIR = Path(__file__).parent.parent / "data" / "eo3" + # prduct with some archived and some active datasets + clirunner(['dataset', 'archive', 'c21648b1-a6fa-4de0-9dc3-9c445d8b295a', '4a30d008-4e82-4d67-99af-28bc1629f766']) + runner = clirunner(['product', 'delete', 'ga_ls8c_ard_3'], verbose_flag=False, expect_success=False) + assert "Product ga_ls8c_ard_3 has active datasets" in runner.output + assert "c21648b1-a6fa-4de0-9dc3-9c445d8b295a" not in runner.output + assert "1154087c-211c-4834-a1f8-b4b59101b644" in runner.output + assert "Cannot delete products with active datasets" in runner.output + assert runner.exit_code == 1 + + # active datasets, force without confirmation + runner = clirunner(['product', 'delete', 'ga_ls8c_ard_3', '--force'], verbose_flag=False, expect_success=False) + assert "Product ga_ls8c_ard_3 has active datasets" in runner.output + assert "Proceed?" in runner.output + assert runner.exit_code == 1 + + # adding back product should cause error since it still exists + add = clirunner(['product', 'add', str(TESTDIR / "ard_ls8.odc-product.yaml")]) + assert "is already in the database" in add.output + + # archive the product's active datasets; delete should not require force option + clirunner(['dataset', 'archive', '1154087c-211c-4834-a1f8-b4b59101b644', '0ee5fe0a-6acd-4583-8554-36ad963bf40b']) + runner = clirunner(['product', 'delete', 'ga_ls8c_ard_3', '--dry-run'], verbose_flag=False) + assert "Completed product deletion" in runner.output + assert runner.exit_code == 0 + + # ensure deletion involving active datasets works with confirmation + clirunner(['dataset', 'restore', '1154087c-211c-4834-a1f8-b4b59101b644', '0ee5fe0a-6acd-4583-8554-36ad963bf40b']) + with mock.patch('click.confirm', return_value=True): + runner = clirunner(['product', 'delete', 'ga_ls8c_ard_3', '--force'], verbose_flag=False) + assert "Product ga_ls8c_ard_3 has active datasets" in runner.output + assert "Completed product deletion" in runner.output + assert runner.exit_code == 0 + + runner = clirunner(['dataset', 'archive', '1154087c-211c-4834-a1f8-b4b59101b644'], + verbose_flag=False, expect_success=False) + assert "No dataset found with id" in runner.output + + index.products.get_by_name_unsafe.cache_clear() + assert index.products.get_by_name("ga_ls8c_ard_3") is None + + # should be able to add product back now + add = clirunner(['product', 'add', str(TESTDIR / "ard_ls8.odc-product.yaml")]) + assert "is already in the database" not in add.output + + def _to_yaml(ls5_telem_doc): # Need to explicitly allow unicode in Py2 return yaml.safe_dump(ls5_telem_doc, allow_unicode=True) diff --git a/integration_tests/test_cli_output.py b/integration_tests/test_cli_output.py index 4a8a4b395..e5af248c6 100644 --- a/integration_tests/test_cli_output.py +++ b/integration_tests/test_cli_output.py @@ -31,6 +31,14 @@ def test_cli_product_subcommand(index_empty, clirunner, dataset_add_configs): assert "All files are empty, exit" in runner.output assert runner.exit_code == 1 + runner = clirunner(['product', 'delete'], verbose_flag=False, expect_success=False) + assert "Usage: [OPTIONS] [PRODUCT_NAMES]" in runner.output + assert "Delete products" in runner.output + + runner = clirunner(['product', 'delete', 'ga_ls8c_ard_3'], verbose_flag=False, expect_success=False) + assert '"ga_ls8c_ard_3" is not a valid Product name' in runner.output + assert runner.exit_code == 1 + def test_cli_metadata_subcommand(index_empty, clirunner, dataset_add_configs): runner = clirunner(['metadata', 'update'], verbose_flag=False, expect_success=False) @@ -116,6 +124,7 @@ def test_cli_dataset_subcommand(index, clirunner, assert runner.exit_code == 1 runner = clirunner(['dataset', 'archive', "--all"], verbose_flag=False) + assert "Archiving dataset:" in runner.output assert "Completed dataset archival." in runner.output assert "Usage: [OPTIONS] [IDS]" not in runner.output assert "Archive datasets" not in runner.output