/
sensor_data.py
356 lines (305 loc) · 13.2 KB
/
sensor_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
from datetime import timedelta
from typing import List, Union
from flask_login import current_user
from isodate import datetime_isoformat
from marshmallow import fields, post_load, validates_schema, ValidationError
from marshmallow.validate import OneOf
from marshmallow_polyfield import PolyField
from timely_beliefs import BeliefsDataFrame
import pandas as pd
from flexmeasures.data import ma
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.api.common.schemas.sensors import SensorField
from flexmeasures.api.common.utils.api_utils import upsample_values
from flexmeasures.data.models.planning.utils import initialize_index
from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField
from flexmeasures.data.services.time_series import simplify_index
from flexmeasures.utils.time_utils import duration_isoformat, server_now
from flexmeasures.utils.unit_utils import (
convert_units,
units_are_convertible,
is_energy_price_unit,
)
from flexmeasures.auth.policy import check_access
class SingleValueField(fields.Float):
"""Field that both de-serializes and serializes a single value to a list of floats (length 1)."""
def _deserialize(self, value, attr, obj, **kwargs) -> List[float]:
return [self._validated(value)]
def _serialize(self, value, attr, data, **kwargs) -> List[float]:
return [self._validated(value)]
def select_schema_to_ensure_list_of_floats(
values: Union[List[float], float], _
) -> Union[fields.List, SingleValueField]:
"""Allows both a single float and a list of floats. Always returns a list of floats.
Meant to improve user experience by not needing to make a list out of a single item, such that:
{
"values": [3.7]
}
can be written as:
{
"values": 3.7
}
Either will be de-serialized to [3.7].
Note that serialization always results in a list of floats.
This ensures that we are not requiring the same flexibility from users who are retrieving data.
"""
if isinstance(values, list):
return fields.List(fields.Float)
else:
return SingleValueField()
class SensorDataDescriptionSchema(ma.Schema):
"""
Schema describing sensor data (specifically, the sensor and the timing of the data).
"""
sensor = SensorField(required=True, entity_type="sensor", fm_scheme="fm1")
start = AwareDateTimeField(required=True, format="iso")
duration = DurationField(required=True)
horizon = DurationField(required=False)
prior = AwareDateTimeField(required=False, format="iso")
unit = fields.Str(required=True)
@validates_schema
def check_schema_unit_against_sensor_unit(self, data, **kwargs):
"""Allows units compatible with that of the sensor.
For example, a sensor with W units allows data to be posted with units:
- W, kW, MW, etc. (i.e. units with different prefixes)
- J/s, Nm/s, etc. (i.e. units that can be converted using some multiplier)
- Wh, kWh, etc. (i.e. units that represent a stock delta, which knowing the duration can be converted to a flow)
For compatible units, the SensorDataSchema converts values to the sensor's unit.
"""
posted_unit = data["unit"]
required_unit = data["sensor"].unit
if posted_unit != required_unit and not units_are_convertible(
posted_unit, required_unit
):
raise ValidationError(
f"Required unit for this sensor is {data['sensor'].unit}, got incompatible unit: {data['unit']}"
)
class GetSensorDataSchema(SensorDataDescriptionSchema):
resolution = DurationField(required=False)
# Optional field that can be used for extra validation
type = fields.Str(
required=False,
validate=OneOf(
[
"GetSensorDataRequest",
"GetMeterDataRequest",
"GetPrognosisRequest",
"GetPriceDataRequest",
]
),
)
@validates_schema
def check_user_may_read(self, data, **kwargs):
check_access(data["sensor"], "read")
@validates_schema
def check_schema_unit_against_type(self, data, **kwargs):
requested_unit = data["unit"]
_type = data.get("type", None)
if _type in (
"GetMeterDataRequest",
"GetPrognosisRequest",
) and not units_are_convertible(requested_unit, "MW"):
raise ValidationError(
f"The unit requested for this message type should be convertible from MW, got incompatible unit: {requested_unit}"
)
elif _type == "GetPriceDataRequest" and not is_energy_price_unit(
requested_unit
):
raise ValidationError(
f"The unit requested for this message type should be convertible from an energy price unit, got incompatible unit: {requested_unit}"
)
@post_load
def dump_bdf(self, sensor_data_description: dict, **kwargs) -> dict:
"""Turn the de-serialized and validated data description into a response.
Specifically, this function:
- queries data according to the given description
- converts to a single deterministic belief per event
- ensures the response respects the requested time frame
- converts values to the requested unit
- converts values to the requested resolution
"""
sensor: Sensor = sensor_data_description["sensor"]
start = sensor_data_description["start"]
duration = sensor_data_description["duration"]
end = sensor_data_description["start"] + duration
unit = sensor_data_description["unit"]
resolution = sensor_data_description.get("resolution")
# Post-load configuration of belief timing against message type
horizons_at_least = sensor_data_description.get("horizon", None)
horizons_at_most = None
_type = sensor_data_description.get("type", None)
if _type == "GetMeterDataRequest":
horizons_at_most = timedelta(0)
elif _type == "GetPrognosisRequest":
if horizons_at_least is None:
horizons_at_least = timedelta(0)
else:
# If the horizon field is used, ensure we still respect the minimum horizon for prognoses
horizons_at_least = max(horizons_at_least, timedelta(0))
df = simplify_index(
sensor.search_beliefs(
event_starts_after=start,
event_ends_before=end,
horizons_at_least=horizons_at_least,
horizons_at_most=horizons_at_most,
beliefs_before=sensor_data_description.get("prior", None),
one_deterministic_belief_per_event=True,
resolution=resolution,
as_json=False,
)
)
# Convert to desired time range
index = initialize_index(start=start, end=end, resolution=df.event_resolution)
df = df.reindex(index)
# Convert to desired unit
values: pd.Series = convert_units( # type: ignore
df["event_value"],
from_unit=sensor.unit,
to_unit=unit,
)
# Convert NaN to null
values = values.where(pd.notnull(values), None)
# Form the response
response = dict(
values=values.tolist(),
start=datetime_isoformat(start),
duration=duration_isoformat(duration),
unit=unit,
)
return response
class PostSensorDataSchema(SensorDataDescriptionSchema):
"""
This schema includes data, so it can be used for POST requests
or GET responses.
TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
(sets a resolution parameter which we can pass to the data collection function).
"""
# Optional field that can be used for extra validation
type = fields.Str(
required=False,
validate=OneOf(
[
"PostSensorDataRequest",
"PostMeterDataRequest",
"PostPrognosisRequest",
"PostPriceDataRequest",
"PostWeatherDataRequest",
]
),
)
values = PolyField(
deserialization_schema_selector=select_schema_to_ensure_list_of_floats,
serialization_schema_selector=select_schema_to_ensure_list_of_floats,
many=False,
)
@validates_schema
def check_user_may_create(self, data, **kwargs):
check_access(data["sensor"], "create-children")
@validates_schema
def check_schema_unit_against_type(self, data, **kwargs):
posted_unit = data["unit"]
_type = data.get("type", None)
if _type in (
"PostMeterDataRequest",
"PostPrognosisRequest",
) and not units_are_convertible(posted_unit, "MW"):
raise ValidationError(
f"The unit required for this message type should be convertible to MW, got incompatible unit: {posted_unit}"
)
elif _type == "PostPriceDataRequest" and not is_energy_price_unit(posted_unit):
raise ValidationError(
f"The unit required for this message type should be convertible to an energy price unit, got incompatible unit: {posted_unit}"
)
@validates_schema
def check_resolution_compatibility_of_values(self, data, **kwargs):
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution
# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)
@post_load()
def post_load_sequence(self, data: dict, **kwargs) -> BeliefsDataFrame:
"""If needed, upsample and convert units, then deserialize to a BeliefsDataFrame."""
data = self.possibly_upsample_values(data)
data = self.possibly_convert_units(data)
bdf = self.load_bdf(data)
# Post-load validation against message type
_type = data.get("type", None)
if _type == "PostMeterDataRequest":
if any(h > timedelta(0) for h in bdf.belief_horizons):
raise ValidationError("Meter data must lie in the past.")
elif _type == "PostPrognosisRequest":
if any(h < timedelta(0) for h in bdf.belief_horizons):
raise ValidationError("Prognoses must lie in the future.")
return bdf
@staticmethod
def possibly_convert_units(data):
"""
Convert values if needed, to fit the sensor's unit.
Marshmallow runs this after validation.
"""
data["values"] = convert_units(
data["values"],
from_unit=data["unit"],
to_unit=data["sensor"].unit,
event_resolution=data["sensor"].event_resolution,
)
return data
@staticmethod
def possibly_upsample_values(data):
"""
Upsample the data if needed, to fit to the sensor's resolution.
Marshmallow runs this after validation.
"""
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution
# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return data
# we already know resolutions are compatible (see validation)
if inferred_resolution != required_resolution:
data["values"] = upsample_values(
data["values"],
from_resolution=inferred_resolution,
to_resolution=required_resolution,
)
return data
@staticmethod
def load_bdf(sensor_data: dict) -> BeliefsDataFrame:
"""
Turn the de-serialized and validated data into a BeliefsDataFrame.
"""
source = DataSource.query.filter(
DataSource.user_id == current_user.id
).one_or_none()
if not source:
raise ValidationError(
f"User {current_user.id} is not an accepted data source."
)
num_values = len(sensor_data["values"])
event_resolution = sensor_data["duration"] / num_values
dt_index = pd.date_range(
sensor_data["start"],
periods=num_values,
freq=event_resolution,
)
s = pd.Series(sensor_data["values"], index=dt_index)
# Work out what the recording time should be
belief_timing = {}
if "prior" in sensor_data:
belief_timing["belief_time"] = sensor_data["prior"]
elif "horizon" in sensor_data:
belief_timing["belief_horizon"] = sensor_data["horizon"]
else:
belief_timing["belief_time"] = server_now()
return BeliefsDataFrame(
s,
source=source,
sensor=sensor_data["sensor"],
**belief_timing,
)