/
sensors.py
595 lines (515 loc) · 22.6 KB
/
sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
from datetime import datetime, timedelta
import json
from typing import List, Optional
from flask import current_app
from flask_classful import FlaskView, route
from flask_json import as_json
from flask_security import auth_required
import isodate
from marshmallow import validate, fields, Schema
from marshmallow.validate import OneOf
import numpy as np
import pandas as pd
from rq.job import Job, NoSuchJobError
from timely_beliefs import BeliefsDataFrame
from webargs.flaskparser import use_args, use_kwargs
from flexmeasures.api.common.responses import (
invalid_datetime,
invalid_timezone,
request_processed,
unrecognized_event,
unknown_schedule,
ptus_incomplete,
)
from flexmeasures.api.common.utils.validators import (
optional_duration_accepted,
optional_prior_accepted,
)
from flexmeasures.api.common.schemas.sensor_data import (
GetSensorDataSchema,
PostSensorDataSchema,
)
from flexmeasures.api.common.schemas.users import AccountIdField
from flexmeasures.api.common.utils.api_utils import save_and_enqueue
from flexmeasures.auth.decorators import permission_required_for_context
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.queries.utils import simplify_index
from flexmeasures.data.schemas.sensors import SensorSchema, SensorIdField
from flexmeasures.data.schemas.units import QuantityField
from flexmeasures.data.schemas import AwareDateTimeField
from flexmeasures.data.services.sensors import get_sensors
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.utils.time_utils import duration_isoformat
from flexmeasures.utils.unit_utils import ur
class TargetSchema(Schema):
value = fields.Float()
datetime = AwareDateTimeField()
# Instantiate schemas outside of endpoint logic to minimize response time
get_sensor_schema = GetSensorDataSchema()
post_sensor_schema = PostSensorDataSchema()
sensors_schema = SensorSchema(many=True)
class SensorAPI(FlaskView):
route_base = "/sensors"
trailing_slash = False
decorators = [auth_required()]
@route("", methods=["GET"])
@use_kwargs(
{
"account": AccountIdField(
data_key="account_id", load_default=AccountIdField.load_current
),
},
location="query",
)
@permission_required_for_context("read", arg_name="account")
@as_json
def index(self, account: Account):
"""API endpoint to list all sensors of an account.
.. :quickref: Sensor; Download sensor list
This endpoint returns all accessible sensors.
Accessible sensors are sensors in the same account as the current user.
Only admins can use this endpoint to fetch sensors from a different account (by using the `account_id` query parameter).
**Example response**
An example of one sensor being returned:
.. sourcecode:: json
[
{
"entity_address": "ea1.2021-01.io.flexmeasures.company:fm1.42",
"event_resolution": 15,
"generic_asset_id": 1,
"name": "Gas demand",
"timezone": "Europe/Amsterdam",
"unit": "m\u00b3/h"
}
]
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_REQUEST
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 422: UNPROCESSABLE_ENTITY
"""
sensors = get_sensors(account_name=account.name)
return sensors_schema.dump(sensors), 200
@route("/data", methods=["POST"])
@use_args(
post_sensor_schema,
location="json",
)
def post_data(self, bdf: BeliefsDataFrame):
"""
Post sensor data to FlexMeasures.
.. :quickref: Data; Upload sensor data
**Example request**
.. code-block:: json
{
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"values": [-11.28, -11.28, -11.28, -11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": "PT1H",
"unit": "m³/h"
}
The above request posts four values for a duration of one hour, where the first
event start is at the given start time, and subsequent values start in 15 minute intervals throughout the one hour duration.
The sensor is the one with ID=1.
The unit has to be convertible to the sensor's unit.
The resolution of the data has to match the sensor's required resolution, but
FlexMeasures will attempt to upsample lower resolutions.
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_REQUEST
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 422: UNPROCESSABLE_ENTITY
"""
response, code = save_and_enqueue(bdf)
return response, code
@route("/data", methods=["GET"])
@use_args(
get_sensor_schema,
location="query",
)
def get_data(self, response: dict):
"""Get sensor data from FlexMeasures.
.. :quickref: Data; Download sensor data
**Example request**
.. code-block:: json
{
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"start": "2021-06-07T00:00:00+02:00",
"duration": "PT1H",
"resolution": "PT15M",
"unit": "m³/h"
}
The unit has to be convertible from the sensor's unit.
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_REQUEST
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 422: UNPROCESSABLE_ENTITY
"""
return json.dumps(response)
@route("/<id>/schedules/trigger", methods=["POST"])
@use_kwargs(
{"sensor": SensorIdField(data_key="id")},
location="path",
)
@use_kwargs(
{
"soc_sensor_id": fields.Str(data_key="soc-sensor", required=False),
"roundtrip_efficiency": QuantityField(
"%",
validate=validate.Range(min=0, max=1),
data_key="roundtrip-efficiency",
),
"start_value": fields.Float(data_key="soc-at-start"),
"soc_min": fields.Float(data_key="soc-min"),
"soc_max": fields.Float(data_key="soc-max"),
"start_of_schedule": AwareDateTimeField(
data_key="start", format="iso", required=False
),
"unit": fields.Str(
data_key="soc-unit",
validate=OneOf(
[
"kWh",
"MWh",
]
),
), # todo: allow unit to be set per field, using QuantityField("%", validate=validate.Range(min=0, max=1))
"targets": fields.List(fields.Nested(TargetSchema), data_key="soc-targets"),
"prefer_charging_sooner": fields.Bool(
data_key="prefer-charging-sooner", required=False
),
# todo: add a duration parameter, instead of falling back to FLEXMEASURES_PLANNING_HORIZON
"consumption_price_sensor": SensorIdField(
data_key="consumption-price-sensor", required=False
),
"production_price_sensor": SensorIdField(
data_key="production-price-sensor", required=False
),
"inflexible_device_sensors": fields.List(
SensorIdField, data_key="inflexible-device-sensors", required=False
),
},
location="json",
)
@optional_prior_accepted()
def trigger_schedule( # noqa: C901
self,
sensor: Sensor,
start_of_schedule: datetime,
unit: str,
prior: datetime,
roundtrip_efficiency: Optional[ur.Quantity] = None,
prefer_charging_sooner: Optional[bool] = True,
consumption_price_sensor: Optional[Sensor] = None,
production_price_sensor: Optional[Sensor] = None,
inflexible_device_sensors: Optional[List[Sensor]] = None,
**kwargs,
):
"""
Trigger FlexMeasures to create a schedule.
.. :quickref: Schedule; Trigger scheduling job
Trigger FlexMeasures to create a schedule for this sensor.
The assumption is that this sensor is the power sensor on a flexible asset.
In this request, you can describe:
- the schedule (start, unit, prior)
- the flexibility model for the sensor (see below, only storage models are supported at the moment)
- the EMS the sensor operates in (inflexible device sensors, and sensors that put a price on consumption and/or production)
Note: This endpoint does not support to schedule an EMS with multiple flexible sensors at once. This will happen in another endpoint.
See https://github.com/FlexMeasures/flexmeasures/issues/485. Until then, it is possible to call this endpoint for one flexible endpoint at a time
(considering already scheduled sensors as inflexible).
Flexibility models apply to the sensor's asset type:
1) For storage sensors (e.g. battery, charge points), the schedule deals with the state of charge (SOC).
The possible flexibility parameters are:
- soc-at-start (defaults to 0)
- soc-unit (kWh or MWh)
- soc-min (defaults to 0)
- soc-max (defaults to max soc target)
- soc-targets (defaults to NaN values)
- roundtrip-efficiency (defaults to 100%)
- prefer-charging-sooner (defaults to True)
2) Heat pump sensors are work in progress.
**Example request A**
This message triggers a schedule for a storage asset, starting at 10.00am, at which the state of charge (soc) is 12.1 kWh.
.. code-block:: json
{
"start": "2015-06-02T10:00:00+00:00",
"soc-at-start": 12.1,
"soc-unit": "kWh"
}
**Example request B**
This message triggers a schedule starting at 10.00am, at which the state of charge (soc) is 12.1 kWh,
with a target state of charge of 25 kWh at 4.00pm.
The minimum and maximum soc are set to 10 and 25 kWh, respectively.
Roundtrip efficiency for use in scheduling is set to 98%.
Aggregate consumption (of all devices within this EMS) should be priced by sensor 9,
and aggregate production should be priced by sensor 10,
where the aggregate power flow in the EMS is described by the sum over sensors 13, 14 and 15
(plus the flexible sensor being optimized, of course).
Note that, if forecasts for sensors 13, 14 and 15 are not available, a schedule cannot be computed.
.. code-block:: json
{
"start": "2015-06-02T10:00:00+00:00",
"soc-at-start": 12.1,
"soc-unit": "kWh",
"soc-targets": [
{
"value": 25,
"datetime": "2015-06-02T16:00:00+00:00"
}
],
"soc-min": 10,
"soc-max": 25,
"roundtrip-efficiency": 0.98,
"consumption-price-sensor": 9,
"production-price-sensor": 10,
"inflexible-device-sensors": [13, 14, 15]
}
**Example response**
This message indicates that the scheduling request has been processed without any error.
A scheduling job has been created with some Universally Unique Identifier (UUID),
which will be picked up by a worker.
The given UUID may be used to obtain the resulting schedule: see /sensors/<id>/schedules/<uuid>.
.. sourcecode:: json
{
"status": "PROCESSED",
"schedule": "364bfd06-c1fa-430b-8d25-8f5a547651fb",
"message": "Request has been processed."
}
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_TIMEZONE, INVALID_DATETIME, INVALID_DOMAIN, INVALID_UNIT, PTUS_INCOMPLETE
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 405: INVALID_METHOD
"""
# todo: if a soc-sensor entity address is passed, persist those values to the corresponding sensor
# (also update the note in posting_data.rst about flexibility states not being persisted).
# get starting value
if "start_value" not in kwargs:
return ptus_incomplete()
try:
start_value = float(kwargs.get("start_value")) # type: ignore
except ValueError:
extra_info = "Request includes empty or ill-formatted value(s)."
current_app.logger.warning(extra_info)
return ptus_incomplete(extra_info)
if unit == "kWh":
start_value = start_value / 1000.0
# Convert round-trip efficiency to dimensionless (to the (0,1] range)
if roundtrip_efficiency is not None:
roundtrip_efficiency = roundtrip_efficiency.to(
ur.Quantity("dimensionless")
).magnitude
# get optional min and max SOC
soc_min = kwargs.get("soc_min", None)
soc_max = kwargs.get("soc_max", None)
# TODO: review when we moved away from capacity having to be described in MWh
if soc_min is not None and unit == "kWh":
soc_min = soc_min / 1000.0
if soc_max is not None and unit == "kWh":
soc_max = soc_max / 1000.0
# set soc targets
end_of_schedule = start_of_schedule + current_app.config.get( # type: ignore
"FLEXMEASURES_PLANNING_HORIZON"
)
resolution = sensor.event_resolution
soc_targets = pd.Series(
np.nan,
index=pd.date_range(
start_of_schedule, end_of_schedule, freq=resolution, closed="right"
), # note that target values are indexed by their due date (i.e. closed="right")
)
# todo: move this deserialization of targets into newly-created ScheduleTargetSchema
for target in kwargs.get("targets", []):
# get target value
if "value" not in target:
return ptus_incomplete("Target missing 'value' parameter.")
try:
target_value = float(target["value"])
except ValueError:
extra_info = "Request includes empty or ill-formatted soc target(s)."
current_app.logger.warning(extra_info)
return ptus_incomplete(extra_info)
if unit == "kWh":
target_value = target_value / 1000.0
# get target datetime
if "datetime" not in target:
return invalid_datetime("Target missing datetime parameter.")
else:
target_datetime = target["datetime"]
if target_datetime is None:
return invalid_datetime(
"Cannot parse target datetime string %s as iso date"
% target["datetime"]
)
if target_datetime.tzinfo is None:
current_app.logger.warning(
"Cannot parse timezone of target 'datetime' value %s"
% target["datetime"]
)
return invalid_timezone(
"Target datetime should explicitly state a timezone."
)
if target_datetime > end_of_schedule:
return invalid_datetime(
f'Target datetime exceeds {end_of_schedule}. Maximum scheduling horizon is {current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")}.'
)
target_datetime = target_datetime.astimezone(
soc_targets.index.tzinfo
) # otherwise DST would be problematic
# set target
soc_targets.loc[target_datetime] = target_value
job = create_scheduling_job(
sensor.id,
start_of_schedule,
end_of_schedule,
resolution=resolution,
belief_time=prior, # server time if no prior time was sent
storage_specs=dict(
soc_at_start=start_value,
soc_targets=soc_targets,
soc_min=soc_min,
soc_max=soc_max,
roundtrip_efficiency=roundtrip_efficiency,
prefer_charging_sooner=prefer_charging_sooner,
),
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
enqueue=True,
)
response = dict(schedule=job.id)
d, s = request_processed()
return dict(**response, **d), s
@route("/<id>/schedules/<uuid>", methods=["GET"])
@use_kwargs(
{
"sensor": SensorIdField(data_key="id"),
"job_id": fields.Str(data_key="uuid"),
},
location="path",
)
@optional_duration_accepted(
timedelta(hours=6)
) # todo: make this a Marshmallow field
def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwargs):
"""Get a schedule from FlexMeasures.
.. :quickref: Schedule; Download schedule from the platform
**Optional fields**
- "duration" (6 hours by default; can be increased to plan further into the future)
**Example response**
This message contains a schedule indicating to consume at various power
rates from 10am UTC onwards for a duration of 45 minutes.
.. sourcecode:: json
{
"values": [
2.15,
3,
2
],
"start": "2015-06-02T10:00:00+00:00",
"duration": "PT45M",
"unit": "MW"
}
:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_TIMEZONE, INVALID_DOMAIN, INVALID_UNIT, UNKNOWN_SCHEDULE, UNRECOGNIZED_CONNECTION_GROUP
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 405: INVALID_METHOD
:status 422: UNPROCESSABLE_ENTITY
"""
planning_horizon = min( # type: ignore
duration, current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")
)
# Look up the scheduling job
connection = current_app.queues["scheduling"].connection
try: # First try the scheduling queue
job = Job.fetch(job_id, connection=connection)
except NoSuchJobError:
return unrecognized_event(job_id, "job")
if job.is_finished:
error_message = "A scheduling job has been processed with your job ID, but "
elif job.is_failed: # Try to inform the user on why the job failed
e = job.meta.get(
"exception",
Exception(
"The job does not state why it failed. "
"The worker may be missing an exception handler, "
"or its exception handler is not storing the exception as job meta data."
),
)
return unknown_schedule(
f"Scheduling job failed with {type(e).__name__}: {e}"
)
elif job.is_started:
return unknown_schedule("Scheduling job in progress.")
elif job.is_queued:
return unknown_schedule("Scheduling job waiting to be processed.")
elif job.is_deferred:
try:
preferred_job = job.dependency
except NoSuchJobError:
return unknown_schedule(
"Scheduling job waiting for unknown job to be processed."
)
return unknown_schedule(
f'Scheduling job waiting for {preferred_job.status} job "{preferred_job.id}" to be processed.'
)
else:
return unknown_schedule("Scheduling job has an unknown status.")
schedule_start = job.kwargs["start"]
schedule_data_source_name = "Seita"
if "data_source_name" in job.meta:
schedule_data_source_name = job.meta["data_source_name"]
scheduler_source = DataSource.query.filter_by(
name=schedule_data_source_name, type="scheduling script"
).one_or_none()
if scheduler_source is None:
return unknown_schedule(
error_message + f'no data is known from "{schedule_data_source_name}".'
)
power_values = sensor.search_beliefs(
event_starts_after=schedule_start,
event_ends_before=schedule_start + planning_horizon,
source=scheduler_source,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
)
# For consumption schedules, positive values denote consumption. For the db, consumption is negative
consumption_schedule = -simplify_index(power_values)["event_value"]
if consumption_schedule.empty:
return unknown_schedule(
error_message + "the schedule was not found in the database."
)
# Update the planning window
resolution = sensor.event_resolution
start = consumption_schedule.index[0]
duration = min(duration, consumption_schedule.index[-1] + resolution - start)
consumption_schedule = consumption_schedule[
start : start + duration - resolution
]
response = dict(
values=consumption_schedule.tolist(),
start=isodate.datetime_isoformat(start),
duration=duration_isoformat(duration),
unit=sensor.unit,
)
d, s = request_processed()
return dict(**response, **d), s