/
_products.py
192 lines (160 loc) · 8.21 KB
/
_products.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2024 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import datetime
import logging
from typing import Iterable, cast
from uuid import UUID
from datacube.index.fields import as_expression
from datacube.index.abstract import AbstractProductResource, QueryField, QueryDict, JsonDict, AbstractIndex
from datacube.model import Product
from datacube.utils import changes, jsonify_document, _readable_offset
from datacube.utils.changes import AllowPolicy, Change, Offset, check_doc_unchanged, get_doc_changes, classify_changes
from datacube.utils.documents import metadata_subset
_LOG = logging.getLogger(__name__)
class ProductResource(AbstractProductResource):
def __init__(self, index: AbstractIndex):
from datacube.index.memory.index import Index
self._index: Index = cast(Index, index)
self.by_id: dict[int, Product] = {}
self.by_name: dict[str, Product] = {}
self.next_id = 1
def add(self, product: Product, allow_table_lock: bool = False) -> Product:
Product.validate(product.definition) # type: ignore[attr-defined]
existing = self.get_by_name(product.name)
if existing:
_LOG.warning(f"Product {product.name} is already in the database, checking for differences")
check_doc_unchanged(
existing.definition,
jsonify_document(product.definition),
f'Metadata Type {product.name}'
)
else:
mdt = self._index.metadata_types.get_by_name(product.metadata_type.name)
if mdt is None:
_LOG.warning(f'Adding metadata_type "{product.metadata_type.name}" as it doesn\'t exist')
product.metadata_type = self._index.metadata_types.add(product.metadata_type,
allow_table_lock=allow_table_lock)
clone = self.clone(product)
clone.id = self.next_id
self.next_id += 1
self.by_id[clone.id] = clone
self.by_name[clone.name] = clone
return cast(Product, self.get_by_name(product.name))
def can_update(self, product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False
) -> tuple[bool, Iterable[Change], Iterable[Change]]:
Product.validate(product.definition) # type: ignore[attr-defined]
existing = self.get_by_name(product.name)
if not existing:
raise ValueError(f"Unknown product {product.name}, cannot update - add first")
updates_allowed: dict[Offset, AllowPolicy] = {
('description',): changes.allow_any,
('license',): changes.allow_any,
('metadata_type',): changes.allow_any,
# You can safely make the match rules looser but not tighter.
# Tightening them could exclude datasets already matched to the product.
# (which would make search results wrong)
('metadata',): changes.allow_truncation,
# Some old storage fields should not be in the product definition any more: allow removal.
('storage', 'chunking'): changes.allow_removal,
('storage', 'driver'): changes.allow_removal,
('storage', 'dimension_order'): changes.allow_removal,
}
doc_changes = get_doc_changes(existing.definition, jsonify_document(product.definition))
good_changes, bad_changes = classify_changes(doc_changes, updates_allowed)
for offset, old_val, new_val in good_changes:
_LOG.info("Safe change in %s from %r to %r", _readable_offset(offset), old_val, new_val)
for offset, old_val, new_val in bad_changes:
_LOG.warning("Unsafe change in %s from %r to %r", _readable_offset(offset), old_val, new_val)
return (
(allow_unsafe_updates or not bad_changes),
good_changes,
bad_changes,
)
def update(self, product: Product,
allow_unsafe_updates: bool = False,
allow_table_lock: bool = False) -> Product:
can_update, safe_changes, unsafe_changes = self.can_update(product, allow_unsafe_updates)
if not safe_changes and not unsafe_changes:
_LOG.warning(f"No changes detected for product {product.name}")
return cast(Product, self.get_by_name(product.name))
if not can_update:
errs = ", ".join(_readable_offset(offset) for offset, _, _ in unsafe_changes)
raise ValueError(f"Unsafe changes in {product.name}: {errs}")
existing = cast(Product, self.get_by_name(product.name))
if product.metadata_type.name != existing.metadata_type.name:
raise ValueError("Unsafe change: cannot (currently) switch metadata types for a product")
_LOG.info(f"Updating product {product.name}")
persisted = self.clone(product)
persisted.id = cast(int, existing.id)
self.by_id[persisted.id] = persisted
self.by_name[persisted.name] = persisted
return cast(Product, self.get_by_name(product.name))
def delete(self, product: Product):
ids: Iterable[UUID] = self._index.datasets.search_returning(archived=None, product=product.name)
self._index.datasets.purge(ids)
del self.by_id[product.id]
del self.by_name[product.name]
def get_unsafe(self, id_: int) -> Product:
return self.clone(self.by_id[id_])
def get_by_name_unsafe(self, name: str) -> Product:
return self.clone(self.by_name[name])
def search_robust(self, **query: QueryField) -> Iterable[tuple[Product, QueryDict]]:
def listify(v):
if isinstance(v, tuple):
return list()
elif isinstance(v, list):
return v
else:
return [v]
for prod in self.get_all():
unmatched = query.copy()
# Skip non-matched if user specified specific products/metadata_types
if prod.name not in listify(unmatched.pop('product', prod.name)):
continue
if prod.metadata_type.name not in listify(unmatched.pop('metadata_type', prod.metadata_type.name)):
continue
# Check that all search keys match this product
for key, value in list(unmatched.items()):
field = prod.metadata_type.dataset_fields.get(key)
if not field:
# Product doesn't have this field - can't match
break
if not hasattr(field, 'extract'):
# non-document/native field (??)
continue
if field.extract(prod.metadata_doc) is None: # type: ignore[attr-defined]
# Product has the field, but not defined in the type doc, so unmatchable
continue
expr = as_expression(field, value)
if expr.evaluate(prod.metadata_doc):
# matches
unmatched.pop(key)
else:
# Doesn't match, skip to next product
break
else:
yield prod, unmatched
def search_by_metadata(self, metadata: JsonDict) -> Iterable[Product]:
norm_meta = {"properties": metadata}
for prod in self.get_all():
if metadata_subset(norm_meta, prod.metadata_doc):
yield prod
def get_all(self) -> Iterable[Product]:
return (self.clone(prod) for prod in self.by_id.values())
def clone(self, orig: Product) -> Product:
return Product(
self._index.metadata_types.clone(orig.metadata_type),
jsonify_document(orig.definition),
id_=orig.id
)
def spatial_extent(self, product, crs=None):
return None
def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, datetime.datetime]:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
ids: Iterable[UUID] = self._index.datasets._by_product.get(product.name, [])
return self._index.datasets.temporal_extent(ids)