/
_datasets.py
executable file
·128 lines (91 loc) · 3.44 KB
/
_datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2024 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import datetime
from datacube.index.abstract import AbstractDatasetResource, DSID
from datacube.model import Dataset, Product
from typing import Iterable, Optional
class DatasetResource(AbstractDatasetResource):
def __init__(self, index):
super().__init__(index)
def get_unsafe(self, id_: DSID, include_sources: bool = False, include_deriveds: bool = False, max_depth: int = 0):
raise KeyError(id_)
def bulk_get(self, ids):
return []
def get_derived(self, id_):
return []
def has(self, id_):
return False
def bulk_has(self, ids_):
return [False for id_ in ids_]
def add(self, dataset: Dataset,
with_lineage: bool = True,
archive_less_mature: Optional[int] = None) -> Dataset:
raise NotImplementedError()
def search_product_duplicates(self, product: Product, *args):
return []
def can_update(self, dataset, updates_allowed=None):
raise NotImplementedError()
def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=None):
raise NotImplementedError()
def archive(self, ids):
raise NotImplementedError()
def restore(self, ids):
raise NotImplementedError()
def purge(self, ids: Iterable[DSID]):
raise NotImplementedError()
def get_all_dataset_ids(self, archived: bool):
return []
def get_locations(self, id_):
return []
def get_archived_locations(self, id_):
return []
def get_archived_location_times(self, id_):
return []
def add_location(self, id_, uri):
raise NotImplementedError()
def get_datasets_for_location(self, uri, mode=None):
return []
def remove_location(self, id_, uri):
raise NotImplementedError()
def archive_location(self, id_, uri):
raise NotImplementedError()
def restore_location(self, id_, uri):
raise NotImplementedError()
def search_by_metadata(self, metadata):
return []
def search(self, limit=None, **query):
return []
def search_by_product(self, **query):
return []
def search_returning(self, field_names, limit=None, **query):
return []
def count(self, **query):
return 0
def count_by_product(self, **query):
return []
def count_by_product_through_time(self, period, **query):
return []
def count_product_through_time(self, period, **query):
return []
def search_summaries(self, **query):
return []
def temporal_extent(
self,
product: str | Product = None,
ids: Iterable[DSID] | None = None
) -> tuple[datetime.datetime, datetime.datetime]:
if product is None and ids is None:
raise ValueError("Must specify product or ids")
elif ids is not None and product is not None:
raise ValueError("Cannot specify both product and ids")
elif ids is not None:
raise KeyError(str(ids))
else:
raise KeyError(str(product))
# pylint: disable=redefined-outer-name
def search_returning_datasets_light(self, field_names: tuple, custom_offsets=None, limit=None, **query):
return []
def spatial_extent(self, ids, crs=None):
return None