From 60e3ec3eb1244b6fcb0c076739b98bdc3c49a6d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20H=C3=B6ning?= Date: Thu, 29 Dec 2022 11:27:32 +0100 Subject: [PATCH] Refactor scheduler interface - API and inner logic (#537) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `flex_model` & `flex_context` to /schedules/trigger/ API endpoint, and refactor how flex configuration is deserialized (by the Scheduler implementation instead of by the API endpoint). * Add flex_model & flex_context in API endpoint; refactor design for Scheduler implementations (moving some endpoint logic here); Signed-off-by: Nicolas Höning * add new schema modules Signed-off-by: Nicolas Höning * include two other flex context params in solver test Signed-off-by: Nicolas Höning * also support deprecated flex_context parameters, and align spelling of params with underscore in docstring Signed-off-by: Nicolas Höning * correctly handle flex-model validation errors when they come up in the endpoint Signed-off-by: Nicolas Höning * changelog: add deprecation warnings and mentions this PR Signed-off-by: Nicolas Höning * fix internal link Signed-off-by: Nicolas Höning * move flex-model and flex-context docs to notation module; small fixes in dummy custom scheduler Signed-off-by: Nicolas Höning * deprecate soc-sensor-id field, store soc states on the asset attributes in v3.0 as well, using a way that lets all schedulers save (parts of) it if they want. Signed-off-by: Nicolas Höning * check (and potentially fill in defaults for) soc_min and soc_max before we apply the schema (which expects non-nan values here) Signed-off-by: Nicolas Höning * make add schedule command work with our refactored scheduling code, small refactoring to save lines Signed-off-by: Nicolas Höning * rename the CLI command as it only represents storage right now (and we might choose that the CLI will be specific to our in-built flex models) Signed-off-by: Nicolas Höning * More thorough checks for passed soc-values in StorageScheduler, leads to small fix in API endpoint and scheduling tests Signed-off-by: Nicolas Höning * doc improvements from review Signed-off-by: Nicolas Höning * Change parameter names for flex model and context which come through the API to use hyphens, which is conventionally preferred. Signed-off-by: Nicolas Höning * Make `flexmeasures add schedule` a subgroup (#557) * Make `flexmeasuress add schedule` a subgroup: - invoke a default subcommand - show a deprecation warning Signed-off-by: F.N. Claessen * adapt CLI command name so it's clearer what is being added Signed-off-by: Nicolas Höning Signed-off-by: F.N. Claessen Signed-off-by: Nicolas Höning Co-authored-by: Nicolas Höning * add one missing documentation improvement from review Signed-off-by: Nicolas Höning * make sure hyphens are used in flex-model to the outside world (API, CLI) Signed-off-by: Nicolas Höning * smaller review items, mostly documentation Signed-off-by: Nicolas Höning * remove soc checks which added interpretation (should be part of another PR, if at all) Signed-off-by: Nicolas Höning * fixes to notation docs Signed-off-by: Nicolas Höning * make sure scheduling tests work on empty queues, with new fixture Signed-off-by: Nicolas Höning * remove two tests for previously removed util function Signed-off-by: Nicolas Höning * batch of small review comments Signed-off-by: Nicolas Höning * make get_data_source_info a class method of Scheduler Signed-off-by: Nicolas Höning * small simplification of get_data_source_for_job Signed-off-by: Nicolas Höning * specify min/max inclusiveness of roundtrip-efficiency parameter Signed-off-by: Nicolas Höning * create_scheduling_jobs accepts both object and ID Signed-off-by: Nicolas Höning * fix type hinting Signed-off-by: Nicolas Höning * API changelog & flex config introduction Signed-off-by: Nicolas Höning * two missing fixes Signed-off-by: Nicolas Höning * remove line about previously undocumented & now depreacated line Signed-off-by: Nicolas Höning * Deprecation headers for old fields that moved to flex-model and flex-context (#564) * Add deprecation and sunset response headers when deprecated fields are used Signed-off-by: F.N. Claessen * Refactor: duplicate code becomes util function Signed-off-by: F.N. Claessen * Correct deprecation and sunset links Signed-off-by: F.N. Claessen * rename to represent plural-default of param, update link to 3.0.5 API changelog Signed-off-by: Nicolas Höning Signed-off-by: F.N. Claessen Signed-off-by: Nicolas Höning Co-authored-by: Nicolas Höning * refactor where the code lives that builds device equality constraints from soc targets Signed-off-by: Nicolas Höning * change a sentence in notation Signed-off-by: Nicolas Höning * Rename inspection to deserialization Signed-off-by: F.N. Claessen * Fix DummyScheduler in documentation Signed-off-by: F.N. Claessen * Simplify imports for plugin developers (also facilitates renaming the planning module without needing plugin developers to upgrade their code) Signed-off-by: F.N. Claessen Signed-off-by: Nicolas Höning Signed-off-by: F.N. Claessen Co-authored-by: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Co-authored-by: F.N. Claessen --- documentation/api/change_log.rst | 16 + documentation/api/notation.rst | 72 ++++ documentation/changelog.rst | 9 +- documentation/cli/change_log.rst | 1 + documentation/cli/commands.rst | 2 +- documentation/configuration.rst | 3 + documentation/dev/docker-compose.rst | 2 +- documentation/index.rst | 2 +- documentation/plugin/customisation.rst | 29 +- documentation/tut/forecasting_scheduling.rst | 26 +- documentation/tut/posting_data.rst | 19 +- .../tut/toy-example-from-scratch.rst | 6 +- flexmeasures/__init__.py | 1 + flexmeasures/api/common/responses.py | 9 + flexmeasures/api/common/utils/api_utils.py | 4 +- .../api/common/utils/deprecation_utils.py | 103 +++++- flexmeasures/api/tests/utils.py | 29 +- flexmeasures/api/v1_2/implementations.py | 20 +- flexmeasures/api/v1_3/implementations.py | 65 ++-- flexmeasures/api/v3_0/sensors.py | 317 +++++++++--------- flexmeasures/api/v3_0/tests/conftest.py | 7 + .../api/v3_0/tests/test_sensor_schedules.py | 80 ++++- flexmeasures/api/v3_0/tests/utils.py | 30 +- flexmeasures/cli/data_add.py | 121 ++++--- flexmeasures/cli/utils.py | 48 +++ flexmeasures/data/models/planning/__init__.py | 137 +++++++- flexmeasures/data/models/planning/storage.py | 263 +++++++++++++-- .../data/models/planning/tests/test_solver.py | 114 ++++--- flexmeasures/data/models/planning/utils.py | 65 ---- .../data/schemas/scheduling/__init__.py | 15 + .../data/schemas/scheduling/storage.py | 67 ++++ flexmeasures/data/services/scheduling.py | 166 ++++----- flexmeasures/data/tests/dummy_scheduler.py | 25 +- .../data/tests/test_scheduling_jobs.py | 20 +- .../tests/test_scheduling_jobs_fresh_db.py | 15 +- requirements/app.in | 1 + requirements/app.txt | 3 + 37 files changed, 1284 insertions(+), 628 deletions(-) create mode 100644 flexmeasures/cli/utils.py create mode 100644 flexmeasures/data/schemas/scheduling/__init__.py create mode 100644 flexmeasures/data/schemas/scheduling/storage.py diff --git a/documentation/api/change_log.rst b/documentation/api/change_log.rst index faf278f28..0e4830b97 100644 --- a/documentation/api/change_log.rst +++ b/documentation/api/change_log.rst @@ -5,6 +5,22 @@ API change log .. note:: The FlexMeasures API follows its own versioning scheme. This is also reflected in the URL, allowing developers to upgrade at their own pace. +v3.0-5 | 2022-12-30 +""""""""""""""""""" + +- Introduced ``flex-model`` and ``flex-context`` fields to `/sensors//schedules/trigger` (POST). They are dictionaries and group pre-existing fields: + + - ``soc-at-start`` -> send in ``flex-model`` instead + - ``soc-min`` -> send in ``flex-model`` instead + - ``soc-max`` -> send in ``flex-model`` instead + - ``soc-unit`` -> send in ``flex-model`` instead + - ``roundtrip-efficiency`` -> send in ``flex-model`` instead + - ``prefer-charging-sooner`` -> send in ``flex-model`` instead + - ``consumption-price-sensor`` -> send in ``flex-context`` instead + - ``production-price-sensor`` -> send in ``flex-context`` instead + - ``inflexible-device-sensors`` -> send in ``flex-context`` instead + + v3.0-4 | 2022-12-08 """"""""""""""""""" diff --git a/documentation/api/notation.rst b/documentation/api/notation.rst index e240a1c54..782c6f3fc 100644 --- a/documentation/api/notation.rst +++ b/documentation/api/notation.rst @@ -155,6 +155,78 @@ For version 1, 2 and 3 of the API, only equidistant timeseries data is expected - "duration" should also be a multiple of the sensor resolution. +.. _describing_flexibility: + +Describing flexibility +^^^^^^^^^^^^^^^^^^^^^^^ + +FlexMeasures computes schedules for energy systems that consist of multiple devices that consume and/or produce electricity. +We model a device as an asset with a power sensor, and compute schedules only for flexible devices, while taking into account inflexible devices. + +To compute a schedule, FlexMeasures first needs to assess the flexibility state of the system. +This is described by the `flex model` (information about the state and possible actions of the flexible device) and the `flex-context` +(information about the system as a whole, in order to assess the value of activating flexibility). + +This information goes beyond the usual time series recorded by an asset's sensors. It's being sent through the API when triggering schedule computation. +Some parts of it can be persisted on the asset & sensor model as attributes (that's design work in progress). + +We distinguish the information with two groups: + +Flex model +"""""""""""" + +The flexibility model describes to the scheduler what the flexible asset's state is, +and what constraints or preferences should be taken into account. +Which type of flexibility model is relevant to a scheduler usually relates to the type of device. + +Usually, not the whole flexibility model is needed. +FlexMeasures can infer missing values in the flex model, and even get them (as default) from the sensor's attributes. +This means that API and CLI users don't have to send the whole flex model every time. + +Here are the three types of flexibility models you can expect to be built-in: + +1) For storage devices (e.g. batteries, charge points, electric vehicle batteries connected to charge points), the schedule deals with the state of charge (SOC). + + The possible flexibility parameters are: + + - ``soc-at-start`` (defaults to 0) + - ``soc-unit`` (kWh or MWh) + - ``soc-min`` (defaults to 0) + - ``soc-max`` (defaults to max soc target) + - ``soc-targets`` (defaults to NaN values) + - ``roundtrip-efficiency`` (defaults to 100%) + - ``prefer-charging-sooner`` (defaults to True, also signals a preference to discharge later) + + For some examples, see the `[POST] /sensors/(id)/schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_ endpoint docs. + +2) Shiftable process + + .. todo:: A simple algorithm exists, needs integration into FlexMeasures and asset type clarified. + +3) Heat pumps + + .. todo:: Also work in progress, needs model for heat loss compensation. + +In addition, folks who write their own custom scheduler (see :ref:`plugin_customization`) might also require their custom flexibility model. +That's no problem, FlexMeasures will let the scheduler decide which flexibility model is relevant and how it should be validated. + +.. note:: We also aim to model situations with more than one flexible asset, with different types of flexibility. + This is ongoing architecture design work, and therefore happens in development settings, until we are happy + with the outcomes. Thoughts welcome :) + + +Flex context +""""""""""""" + +With the flexibility context, we aim to describe the system in which the flexible assets operates: + +- ``inflexible-device-sensors`` ― power sensors that are relevant, but not flexible, such as a sensor recording rooftop solar power connected behind the main meter, whose production falls under the same contract as the flexible device(s) being scheduled +- ``consumption-price-sensor`` ― the sensor which defines costs/revenues of consuming energy +- ``production-price-sensor`` ― the sensor which defines cost/revenues of producing energy + +These should be independent on the asset type and consequently also do not depend on which scheduling algorithm is being used. + + .. _beliefs: Tracking the recording time of beliefs diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 5e868282d..1110582ce 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -40,14 +40,19 @@ Infrastructure / Support * Remove bokeh dependency and obsolete UI views [see `PR #476 `_] * Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` not working in docker containers [see `PR #530 `_] and incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed [see `PR #526 `_] * Plugins can save BeliefsSeries, too, instead of just BeliefsDataFrames [see `PR #523 `_] -* Improve documentation and code w.r.t. storage flexibility modelling ― prepare for handling other schedulers & merge battery and car charging schedulers [see `PR #511 `_] +* Improve documentation and code w.r.t. storage flexibility modelling ― prepare for handling other schedulers & merge battery and car charging schedulers [see `PR #511 `_ and `PR #537 `_] * Revised strategy for removing unchanged beliefs when saving data: retain the oldest measurement (ex-post belief), too [see `PR #518 `_] * Scheduling test for maximizing self-consumption, and improved time series db queries for fixed tariffs (and other long-term constants) [see `PR #532 `_] * Clean up table formatting for ``flexmeasures show`` CLI commands [see `PR #540 `_] * Add ``"Deprecation"`` and ``"Sunset"`` response headers for API users of deprecated API versions, and log warnings for FlexMeasures hosts when users still use them [see `PR #554 `_] * Explain how to avoid potential ``SMTPRecipientsRefused`` errors when using FlexMeasures in combination with a mail server [see `PR #558 `_] -.. warning:: The CLI command ``flexmeasures monitor tasks`` has been renamed to ``flexmeasures monitor last-run``. The old name will stop working in version 0.13. +.. warning:: The API endpoint (`[POST] /sensors/(id)/schedules/trigger `_) to make new schedules will (in v0.13) sunset the storage flexibility parameters (they move to the ``flex-model`` parameter group), as well as the parameters describing other sensors (they move to ``flex-context``). + +.. warning:: The CLI command ``flexmeasures monitor tasks`` has been deprecated (it's being renamed to ``flexmeasures monitor last-run``). The old name will be sunset in version 0.13. + +.. warning:: The CLI command ``flexmeasures add schedule`` has been renamed to ``flexmeasures add schedule for-storage``. The old name will be sunset in version 0.13. + v0.11.3 | November 2, 2022 diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index 220f8aafc..8e6fb07fb 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -12,6 +12,7 @@ since v0.12.0 | November XX, 2022 * Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed. * Add ``flexmeasures monitor last-seen``. * Rename ``flexmeasures monitor tasks`` to ``flexmeasures monitor last-run``. +* Rename ``flexmeasures add schedule`` to ``flexmeasures add schedule for-storage`` (in expectation of more scheduling commands, based on in-built flex models). since v0.11.0 | August 28, 2022 ============================== diff --git a/documentation/cli/commands.rst b/documentation/cli/commands.rst index dfd40a7ea..347e90659 100644 --- a/documentation/cli/commands.rst +++ b/documentation/cli/commands.rst @@ -34,7 +34,7 @@ of which some are referred to in this documentation. ``flexmeasures add sensor`` Add a new sensor. ``flexmeasures add beliefs`` Load beliefs from file. ``flexmeasures add forecasts`` Create forecasts. -``flexmeasures add schedule`` Create a charging schedule. +``flexmeasures add schedule for-storage`` Create a charging schedule for a storage asset. ``flexmeasures add holidays`` Add holiday annotations to accounts and/or assets. ``flexmeasures add annotation`` Add annotation to accounts, assets and/or sensors. ``flexmeasures add toy-account`` Create a toy account, for tutorials and trying things. diff --git a/documentation/configuration.rst b/documentation/configuration.rst index 2725ae511..adb2c8a2c 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -247,6 +247,9 @@ Time to live for UDI event ids of successful scheduling jobs. Set a negative tim Default: ``timedelta(days=7)`` + +.. _planning_horizon_config: + FLEXMEASURES_PLANNING_HORIZON ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/documentation/dev/docker-compose.rst b/documentation/dev/docker-compose.rst index 03278a0be..a1e1a2030 100644 --- a/documentation/dev/docker-compose.rst +++ b/documentation/dev/docker-compose.rst @@ -96,7 +96,7 @@ Next, we put a scheduling job in the worker's queue. This only works because we .. code-block:: console - flexmeasures add schedule --sensor-id 2 --optimization-context-id 3 \ + flexmeasures add schedule for-storage --sensor-id 2 --optimization-context-id 3 \ --start ${TOMORROW}T07:00+01:00 --duration PT12H --soc-at-start 50% \ --roundtrip-efficiency 90% --as-job diff --git a/documentation/index.rst b/documentation/index.rst index 0abdf5031..9e9920e82 100644 --- a/documentation/index.rst +++ b/documentation/index.rst @@ -41,7 +41,7 @@ A tiny, but complete example: Let's install FlexMeasures from scratch. Then, usi $ flexmeasures db upgrade # create tables $ flexmeasures add toy-account --kind battery # setup account & a user, a battery (Id 2) and a market (Id 3) $ flexmeasures add beliefs --sensor-id 3 --source toy-user prices-tomorrow.csv --timezone utc # load prices, also possible per API - $ flexmeasures add schedule --sensor-id 2 --consumption-price-sensor 3 \ + $ flexmeasures add schedule for-storage --sensor-id 2 --consumption-price-sensor 3 \ --start ${TOMORROW}T07:00+01:00 --duration PT12H \ --soc-at-start 50% --roundtrip-efficiency 90% # this is also possible per API $ flexmeasures show beliefs --sensor-id 2 --start ${TOMORROW}T07:00:00+01:00 --duration PT12H # also visible per UI, of course diff --git a/documentation/plugin/customisation.rst b/documentation/plugin/customisation.rst index d13401d6b..a41bc6acc 100644 --- a/documentation/plugin/customisation.rst +++ b/documentation/plugin/customisation.rst @@ -23,8 +23,7 @@ The following minimal example gives you an idea of some meta information you can from datetime import datetime, timedelta import pandas as pd from pandas.tseries.frequencies import to_offset - from flexmeasures.data.models.time_series import Sensor - from flexmeasures.data.models.planning import Scheduler + from flexmeasures import Scheduler, Sensor class DummyScheduler(Scheduler): @@ -32,12 +31,8 @@ The following minimal example gives you an idea of some meta information you can __author__ = "My Company" __version__ = "2" - def schedule( + def compute_schedule( self, - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, *args, **kwargs ): @@ -46,22 +41,26 @@ The following minimal example gives you an idea of some meta information you can (Schedulers return positive values for consumption, and negative values for production) """ return pd.Series( - sensor.get_attribute("capacity_in_mw"), - index=pd.date_range(start, end, freq=resolution, closed="left"), + self.sensor.get_attribute("capacity_in_mw"), + index=pd.date_range(self.start, self.end, freq=self.resolution, closed="left"), ) + + def deserialize_config(self): + """Do not care about any flex config sent in.""" + self.config_deserialized = True -.. note:: It's possible to add arguments that describe the asset flexibility and the EMS context in more detail. For example, - for storage assets we support various state-of-charge parameters. For now, the existing in-built schedulers are the best documentation. - We are working on documenting this better, so the learning curve becomes easier. - +.. note:: It's possible to add arguments that describe the asset flexibility model and the flexibility (EMS) context in more detail. + For example, for storage assets we support various state-of-charge parameters. For details on flexibility model and context, + see :ref:`describing_flexibility` and the `[POST] /sensors/(id)/schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_ endpoint. + Finally, make your scheduler be the one that FlexMeasures will use for certain sensors: .. code-block:: python - from flexmeasures.data.models.time_series import Sensor + from flexmeasures import Sensor scheduler_specs = { "module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below @@ -206,7 +205,7 @@ We demonstrate this here, and also show how you can add your own custom field sc from typing import Optional import click - from flexmeasures.data.schemas.times import AwareDateTimeField + from flexmeasures.data.schemas import AwareDateTimeField from flexmeasures.data.schemas.utils import MarshmallowClickMixin from marshmallow import fields diff --git a/documentation/tut/forecasting_scheduling.rst b/documentation/tut/forecasting_scheduling.rst index 69bc3fc2b..9c0453ced 100644 --- a/documentation/tut/forecasting_scheduling.rst +++ b/documentation/tut/forecasting_scheduling.rst @@ -101,22 +101,24 @@ There are two ways to queue a scheduling job: First, we can add a scheduling job to the queue via the API. We already learned about the `[POST] /schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_ endpoint in :ref:`posting_flex_states`, where we saw how to post a flexibility state (in this case, the state of charge of a battery at a certain point in time). -Here, we extend that example with an additional target value, representing a desired future state of charge. +Here, we extend that (storage) example with an additional target value, representing a desired future state of charge. .. code-block:: json { - "value": 12.1, - "datetime": "2015-06-02T10:00:00+00:00", - "unit": "kWh", - "targets": [ - { - "value": 25, - "datetime": "2015-06-02T16:00:00+00:00" - } - ] + "start": "2015-06-02T10:00:00+00:00", + "flex-model": { + "soc-at-start": 12.1, + "soc-unit": "kWh" + "soc-targets": [ + { + "value": 25, + "datetime": "2015-06-02T16:00:00+00:00" + } + } } + We now have described the state of charge at 10am to be ``12.1``. In addition, we requested that it should be ``25`` at 4pm. For instance, this could mean that a car should be charged at 90% at that time. @@ -129,13 +131,13 @@ A second way to add scheduling jobs is via the CLI, so this is available for peo .. code-block:: console - flexmeasures add schedule --sensor-id 2 --optimization-context-id 3 \ + flexmeasures add schedule for-storage --sensor-id 2 --optimization-context-id 3 \ --start 2022-07-05T07:00+01:00 --duration PT12H \ --soc-at-start 50% --roundtrip-efficiency 90% --as-job Here, the ``--as-job`` parameter makes the difference for queueing ― without it, the schedule is computed right away. -Run ``flexmeasures add schedule --help`` for more information. +Run ``flexmeasures add schedule for-storage --help`` for more information. .. _getting_prognoses: diff --git a/documentation/tut/posting_data.rst b/documentation/tut/posting_data.rst index 36dbedf77..51d623dd5 100644 --- a/documentation/tut/posting_data.rst +++ b/documentation/tut/posting_data.rst @@ -269,29 +269,32 @@ Posting flexibility states There is one more crucial kind of data that FlexMeasures needs to know about: What are the current states of flexible devices? For example, a battery has a certain state of charge, which is relevant to describe the flexibility that the battery currently has. +In our terminology, this is called the "flex model" and you can read more at :ref:`describing_flexibility`. -Owners of such devices can post these states along with triggering the creation of a new schedule, to `[POST] /schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_. +Owners of such devices can post the flex model along with triggering the creation of a new schedule, to `[POST] /schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_. The URL might look like this: .. code-block:: html https://company.flexmeasures.io/api//sensors/10/schedules/trigger -This example triggers a schedule for a power sensor (with ID 10) of a battery asset, asking to take into account the battery's current state of charge. +The following example triggers a schedule for a power sensor (with ID 10) of a battery asset, asking to take into account the battery's current state of charge. From this, FlexMeasures derives the energy flexibility this battery has in the next 48 hours and computes an optimal charging schedule. -The endpoint allows to limit the flexibility range and also to set target values. +The endpoint also allows to limit the flexibility range and also to set target values. .. code-block:: json { - "value": 12.1, - "datetime": "2015-06-02T10:00:00+00:00", - "unit": "kWh" + "start": "2015-06-02T10:00:00+00:00", + "flex-model": { + "soc-at-start": 12.1, + "soc-unit": "kWh" + } } .. note:: At the moment, FlexMeasures only supports flexibility models suitable for batteries and car chargers here (asset types "battery", "one-way_evse" or "two-way_evse"). This will be expanded to other flexible assets as needed. -.. note:: Flexibility states are not persisted. To record a history of the state of charge, set up a separate sensor and post data to it using `[POST] /sensors/data <../api/v3_0.html#post--api-v3_0-sensors-data>`_ (see :ref:`posting_sensor_data`). +.. note:: Flexibility states are persisted on sensor attributes. To record a more complete history of the state of charge, set up a separate sensor and post data to it using `[POST] /sensors/data <../api/v3_0.html#post--api-v3_0-sensors-data>`_ (see :ref:`posting_sensor_data`). -In :ref:`how_queue_scheduling`, we'll cover what happens when FlexMeasurers is triggered to create a new schedule, and how those schedules can be retrieved via the API, so they can be used to steer assets. \ No newline at end of file +In :ref:`how_queue_scheduling`, we'll cover what happens when FlexMeasures is triggered to create a new schedule, and how those schedules can be retrieved via the API, so they can be used to steer assets. \ No newline at end of file diff --git a/documentation/tut/toy-example-from-scratch.rst b/documentation/tut/toy-example-from-scratch.rst index b9996241f..c1cdc09e1 100644 --- a/documentation/tut/toy-example-from-scratch.rst +++ b/documentation/tut/toy-example-from-scratch.rst @@ -21,7 +21,7 @@ Below are the ``flexmeasures`` CLI commands we'll run, and which we'll explain s # load prices to optimise the schedule against $ flexmeasures add beliefs --sensor-id 3 --source toy-user prices-tomorrow.csv --timezone utc # make the schedule - $ flexmeasures add schedule --sensor-id 2 --consumption-price-sensor 3 \ + $ flexmeasures add schedule for-storage --sensor-id 2 --consumption-price-sensor 3 \ --start ${TOMORROW}T07:00+01:00 --duration PT12H \ --soc-at-start 50% --roundtrip-efficiency 90% @@ -268,7 +268,7 @@ To keep it short, we'll only ask for a 12-hour window starting at 7am. Finally, .. code-block:: console - $ flexmeasures add schedule --sensor-id 2 --consumption-price-sensor 3 \ + $ flexmeasures add schedule for-storage --sensor-id 2 --consumption-price-sensor 3 \ --start ${TOMORROW}T07:00+01:00 --duration PT12H \ --soc-at-start 50% --roundtrip-efficiency 90% New schedule is stored. @@ -314,4 +314,4 @@ We can also look at the charging schedule in the `FlexMeasures UI ResponseTuple: return dict(result="Rejected", status="UNKNOWN_SCHEDULE", message=message), 400 +def invalid_flex_config(message: str) -> ResponseTuple: + return ( + dict( + result="Rejected", status="UNPROCESSABLE_ENTITY", message=dict(json=message) + ), + 422, + ) + + @BaseMessage("The requested backup is not known.") def unrecognized_backup(message: str) -> ResponseTuple: return dict(result="Rejected", status="UNRECOGNIZED_BACKUP", message=message), 400 diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index 4319c5a03..b7ba5bbc3 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -81,7 +81,7 @@ def parse_as_list( return connections -# TODO: we should be using webargs to get data from a request, it's more descriptive and has error handling +# TODO: deprecate ― we should be using webargs to get data from a request, it's more descriptive and has error handling def get_form_from_request(_request) -> Union[dict, None]: if _request.method == "GET": d = _request.args.to_dict( @@ -247,7 +247,7 @@ def unique_ever_seen(iterable: Sequence, selector: Sequence): def message_replace_name_with_ea(message_with_connections_as_asset_names: dict) -> dict: """ For each connection in the message specified by a name, replace that name with the correct entity address. - TODO: This function is now only used in tests and should go (also asset_replace_name_with_id) + TODO: Deprecated. This function is now only used in tests of deprecated API versions and should go (also asset_replace_name_with_id) """ message_with_connections_as_eas = copy.deepcopy( message_with_connections_as_asset_names diff --git a/flexmeasures/api/common/utils/deprecation_utils.py b/flexmeasures/api/common/utils/deprecation_utils.py index b9916a312..ec84966b0 100644 --- a/flexmeasures/api/common/utils/deprecation_utils.py +++ b/flexmeasures/api/common/utils/deprecation_utils.py @@ -1,12 +1,78 @@ from __future__ import annotations -from flask import current_app, request, Blueprint, Response +from flask import current_app, request, Blueprint, Response, after_this_request from flask_security.core import current_user import pandas as pd from flexmeasures.utils.time_utils import to_http_time +def deprecate_fields( + fields: str | list[str], + deprecation_date: pd.Timestamp | str | None = None, + deprecation_link: str | None = None, + sunset_date: pd.Timestamp | str | None = None, + sunset_link: str | None = None, +): + """Deprecates a field (or fields) on a route by adding the "Deprecation" header with a deprecation date. + + Also logs a warning when a deprecated field is used. + + >>> from flask_classful import route + >>> @route("/item/", methods=["POST"]) + @use_kwargs( + { + "color": ColorField, + "length": LengthField, + } + ) + def post_item(color, length): + deprecate_field( + "color", + deprecation_date="2022-12-14", + deprecation_link="https://flexmeasures.readthedocs.io/some-deprecation-notice", + sunset_date="2023-02-01", + sunset_link="https://flexmeasures.readthedocs.io/some-sunset-notice", + ) + + :param fields: The fields (as a list of strings) to be deprecated + :param deprecation_date: date indicating when the field was deprecated, used for the "Deprecation" header + if no date is given, defaults to "true" + see https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-deprecation-header#section-2-1 + :param deprecation_link: url providing more information about the deprecation + :param sunset_date: date indicating when the field is likely to become unresponsive + :param sunset_link: url providing more information about the sunset + + References + ---------- + - Deprecation header: https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-deprecation-header + - Sunset header: https://www.rfc-editor.org/rfc/rfc8594 + """ + if not isinstance(fields, list): + fields = [fields] + deprecation, sunset = _format_deprecation_and_sunset(deprecation_date, sunset_date) + + @after_this_request + def _after_request_handler(response: Response) -> Response: + deprecated_fields_used = set(fields) & set( + request.json.keys() + ) # sets intersect + + # If any deprecated field is used, log a warning and add deprecation and sunset headers + if deprecated_fields_used: + current_app.logger.warning( + f"Endpoint {request.endpoint} called by {current_user} with deprecated fields: {deprecated_fields_used}" + ) + return _add_headers( + response, + deprecation, + deprecation_link, + sunset, + sunset_link, + ) + return response + + def deprecate_blueprint( blueprint: Blueprint, deprecation_date: pd.Timestamp | str | None = None, @@ -16,6 +82,8 @@ def deprecate_blueprint( ): """Deprecates every route on a blueprint by adding the "Deprecation" header with a deprecation date. + Also logs a warning when a deprecated endpoint is called. + >>> from flask import Flask, Blueprint >>> app = Flask('some_app') >>> deprecated_bp = Blueprint('API version 1', 'v1_bp') @@ -23,9 +91,9 @@ def deprecate_blueprint( >>> deprecate_blueprint( deprecated_bp, deprecation_date="2022-12-14", - deprecation_link="https://flexmeasures.readthedocs.org/some-deprecation-notice", + deprecation_link="https://flexmeasures.readthedocs.io/some-deprecation-notice", sunset_date="2023-02-01", - sunset_link="https://flexmeasures.readthedocs.org/some-sunset-notice", + sunset_link="https://flexmeasures.readthedocs.io/some-sunset-notice", ) :param blueprint: The blueprint to be deprecated @@ -38,17 +106,15 @@ def deprecate_blueprint( References ---------- - - Deprecation field: https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-deprecation-header - - Sunset field: https://www.rfc-editor.org/rfc/rfc8594 + - Deprecation header: https://datatracker.ietf.org/doc/html/draft-ietf-httpapi-deprecation-header + - Sunset header: https://www.rfc-editor.org/rfc/rfc8594 """ - if deprecation_date: - deprecation = to_http_time(pd.Timestamp(deprecation_date) - pd.Timedelta("1s")) - else: - deprecation = "true" - if sunset_date: - sunset = to_http_time(pd.Timestamp(sunset_date) - pd.Timedelta("1s")) + deprecation, sunset = _format_deprecation_and_sunset(deprecation_date, sunset_date) def _after_request_handler(response: Response) -> Response: + current_app.logger.warning( + f"Deprecated endpoint {request.endpoint} called by {current_user}" + ) return _add_headers( response, deprecation, @@ -74,9 +140,6 @@ def _add_headers( response = _add_link(response, deprecation_link, "deprecation") if sunset_link: response = _add_link(response, sunset_link, "sunset") - current_app.logger.warning( - f"Deprecated endpoint {request.endpoint} called by {current_user}" - ) return response @@ -87,3 +150,15 @@ def _add_link(response: Response, link: str, rel: str) -> Response: else: response.headers["Link"] = link_text return response + + +def _format_deprecation_and_sunset(deprecation_date, sunset_date): + if deprecation_date: + deprecation = to_http_time(pd.Timestamp(deprecation_date) - pd.Timedelta("1s")) + else: + deprecation = "true" + if sunset_date: + sunset = to_http_time(pd.Timestamp(sunset_date) - pd.Timedelta("1s")) + else: + sunset = None + return deprecation, sunset diff --git a/flexmeasures/api/tests/utils.py b/flexmeasures/api/tests/utils.py index 12e960b9b..e31f5bfa7 100644 --- a/flexmeasures/api/tests/utils.py +++ b/flexmeasures/api/tests/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json from flask import url_for, current_app, Response @@ -104,10 +106,25 @@ def post_task_run(client, task_name: str, status: bool = True): ) -def check_deprecation(response: Response): +def check_deprecation( + response: Response, + deprecation: str | None = "Tue, 13 Dec 2022 23:59:59 GMT", + sunset: str | None = "Tue, 31 Jan 2023 23:59:59 GMT", +): + """Check deprecation and sunset headers. + + Also make sure we link to some url for further info. + """ print(response.headers) - assert "Tue, 13 Dec 2022 23:59:59 GMT" in response.headers["Deprecation"] - assert "Tue, 31 Jan 2023 23:59:59 GMT" in response.headers["Sunset"] - # Make sure we link to some url for both deprecation and sunset - assert 'rel="deprecation"' in response.headers["Link"] - assert 'rel="sunset"' in response.headers["Link"] + if deprecation: + assert deprecation in response.headers.get("Deprecation", []) + assert 'rel="deprecation"' in response.headers.get("Link", []) + else: + assert deprecation not in response.headers.get("Deprecation", []) + assert 'rel="deprecation"' not in response.headers.get("Link", []) + if sunset: + assert sunset in response.headers.get("Sunset", []) + assert 'rel="sunset"' in response.headers.get("Link", []) + else: + assert sunset not in response.headers.get("Sunset", []) + assert 'rel="sunset"' not in response.headers.get("Link", []) diff --git a/flexmeasures/api/v1_2/implementations.py b/flexmeasures/api/v1_2/implementations.py index 1ae9f964e..c18ffd48f 100644 --- a/flexmeasures/api/v1_2/implementations.py +++ b/flexmeasures/api/v1_2/implementations.py @@ -39,7 +39,6 @@ ) from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.resources import has_assets, can_access_asset -from flexmeasures.data.models.planning.utils import ensure_storage_specs from flexmeasures.utils.time_utils import duration_isoformat @@ -98,17 +97,18 @@ def get_device_message_response(generic_asset_name_groups, duration): resolution = sensor.event_resolution # Schedule the asset - storage_specs = dict( - soc_at_start=sensor.generic_asset.get_attribute("soc_in_mwh"), - prefer_charging_sooner=False, - ) - storage_specs = ensure_storage_specs( - storage_specs, sensor, start, end, resolution + scheduler = StorageScheduler( + sensor=sensor, + start=start, + end=end, + resolution=resolution, + flex_model={ + "soc-at-start": sensor.generic_asset.get_attribute("soc_in_mwh"), + "prefer-charging-sooner": False, + }, ) try: - schedule = StorageScheduler().schedule( - sensor, start, end, resolution, storage_specs=storage_specs - ) + schedule = scheduler.compute_schedule() except UnknownPricesException: return unknown_prices() except UnknownMarketException: diff --git a/flexmeasures/api/v1_3/implementations.py b/flexmeasures/api/v1_3/implementations.py index 88996e16c..d843df9a0 100644 --- a/flexmeasures/api/v1_3/implementations.py +++ b/flexmeasures/api/v1_3/implementations.py @@ -148,7 +148,7 @@ def get_device_message_response(generic_asset_name_groups, duration): return unknown_schedule("Scheduling job has an unknown status.") schedule_start = job.kwargs["start"] - data_source = get_data_source_for_job(job, sensor=sensor) + data_source = get_data_source_for_job(job) if data_source is None: return unknown_schedule( message + f"no data source could be found for job {job}." @@ -274,41 +274,39 @@ def post_udi_event_response(unit: str, prior: datetime): event_id, sensor.generic_asset.get_attribute("soc_udi_event_id") ) + flex_model = {} + # get value if "value" not in form: return ptus_incomplete() try: - value = float(form.get("value")) + flex_model["soc-at-start"] = float(form.get("value")) except ValueError: extra_info = "Request includes empty or ill-formatted value(s)." current_app.logger.warning(extra_info) return ptus_incomplete(extra_info) - if unit == "kWh": - value = value / 1000.0 + flex_model["soc-unit"] = unit # get optional efficiency roundtrip_efficiency = form.get("roundtrip_efficiency", None) + if roundtrip_efficiency: + flex_model["roundtrip-efficiency"] = roundtrip_efficiency # get optional min and max SOC soc_min = form.get("soc_min", None) soc_max = form.get("soc_max", None) - if soc_min is not None and unit == "kWh": - soc_min = soc_min / 1000.0 - if soc_max is not None and unit == "kWh": - soc_max = soc_max / 1000.0 + if soc_min: + flex_model["soc-min"] = soc_min + if soc_max: + flex_model["soc-max"] = soc_max # set soc targets start_of_schedule = datetime end_of_schedule = datetime + current_app.config.get("FLEXMEASURES_PLANNING_HORIZON") - resolution = sensor.event_resolution - soc_targets = initialize_series( - np.nan, - start=start_of_schedule, - end=end_of_schedule, - resolution=resolution, - inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") - ) + # SOC targets + targets = form.get("targets", []) + # parse eventual values and generate responses if event_type == "soc-with-targets": if "targets" not in form: return incomplete_event( @@ -316,19 +314,17 @@ def post_udi_event_response(unit: str, prior: datetime): event_type, "Cannot process event %s with missing targets." % form.get("event"), ) - for target in form.get("targets"): + for target in targets: # get target value if "value" not in target: return ptus_incomplete("Target missing value parameter.") try: - target_value = float(target["value"]) + float(target["value"]) except ValueError: extra_info = "Request includes empty or ill-formatted target value(s)." current_app.logger.warning(extra_info) return ptus_incomplete(extra_info) - if unit == "kWh": - target_value = target_value / 1000.0 # get target datetime if "datetime" not in target: @@ -352,26 +348,16 @@ def post_udi_event_response(unit: str, prior: datetime): return invalid_datetime( f'Target datetime exceeds {end_of_schedule}. Maximum scheduling horizon is {current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")}.' ) - target_datetime = target_datetime.astimezone( - soc_targets.index.tzinfo - ) # otherwise DST would be problematic - # set target - soc_targets.loc[target_datetime] = target_value + flex_model["soc-targets"] = targets create_scheduling_job( - sensor, - start_of_schedule, - end_of_schedule, - resolution=resolution, + sensor=sensor, + start=start_of_schedule, + end=end_of_schedule, + resolution=sensor.event_resolution, belief_time=prior, # server time if no prior time was sent - storage_specs=dict( - soc_at_start=value, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, - ), + flex_model=flex_model, job_id=form.get("event"), enqueue=True, ) @@ -379,7 +365,12 @@ def post_udi_event_response(unit: str, prior: datetime): # Store new soc info as GenericAsset attributes sensor.generic_asset.set_attribute("soc_datetime", datetime.isoformat()) sensor.generic_asset.set_attribute("soc_udi_event_id", event_id) - sensor.generic_asset.set_attribute("soc_in_mwh", value) + if unit == "kWh": + sensor.generic_asset.set_attribute( + "soc_in_mwh", flex_model["soc-at-start"] / 1000 + ) + else: + sensor.generic_asset.set_attribute("soc_in_mwh", flex_model["soc-at-start"]) db.session.commit() return request_processed() diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 9a6191c3b..dc69f4e2b 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -1,29 +1,26 @@ from datetime import datetime, timedelta -from typing import List, Optional +from typing import List, Dict, Optional from flask import current_app from flask_classful import FlaskView, route from flask_json import as_json from flask_security import auth_required import isodate -from marshmallow import validate, fields, Schema +from marshmallow import fields, validate, ValidationError from marshmallow.validate import OneOf -import numpy as np from rq.job import Job, NoSuchJobError from timely_beliefs import BeliefsDataFrame from webargs.flaskparser import use_args, use_kwargs from flexmeasures.api.common.responses import ( - invalid_datetime, - invalid_timezone, request_processed, unrecognized_event, unknown_schedule, - ptus_incomplete, + invalid_flex_config, ) +from flexmeasures.api.common.utils.deprecation_utils import deprecate_fields from flexmeasures.api.common.utils.validators import ( optional_duration_accepted, - optional_prior_accepted, ) from flexmeasures.api.common.schemas.sensor_data import ( GetSensorDataSchema, @@ -32,15 +29,18 @@ from flexmeasures.api.common.schemas.users import AccountIdField from flexmeasures.api.common.utils.api_utils import save_and_enqueue from flexmeasures.auth.decorators import permission_required_for_context -from flexmeasures.data.models.planning.utils import initialize_series +from flexmeasures.data import db from flexmeasures.data.models.user import Account from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.queries.utils import simplify_index from flexmeasures.data.schemas.sensors import SensorSchema, SensorIdField +from flexmeasures.data.schemas.times import AwareDateTimeField from flexmeasures.data.schemas.units import QuantityField -from flexmeasures.data.schemas import AwareDateTimeField +from flexmeasures.data.schemas.scheduling.storage import SOCTargetSchema +from flexmeasures.data.schemas.scheduling import FlexContextSchema from flexmeasures.data.services.sensors import get_sensors from flexmeasures.data.services.scheduling import ( + find_scheduler_class, create_scheduling_job, get_data_source_for_job, ) @@ -48,16 +48,24 @@ from flexmeasures.utils.unit_utils import ur -class TargetSchema(Schema): - value = fields.Float() - datetime = AwareDateTimeField() - - # Instantiate schemas outside of endpoint logic to minimize response time get_sensor_schema = GetSensorDataSchema() post_sensor_schema = PostSensorDataSchema() sensors_schema = SensorSchema(many=True) +DEPRECATED_FLEX_CONFIGURATION_FIELDS = [ + "soc-at-start", + "soc-min", + "soc-max", + "soc-unit", + "roundtrip-efficiency", + "prefer-charging-sooner", + "soc-targets", + "consumption-price-sensor", + "production-price-sensor", + "inflexible-device-sensors", +] + class SensorAPI(FlaskView): @@ -206,8 +214,14 @@ def get_data(self, response: dict): {"sensor": SensorIdField(data_key="id")}, location="path", ) + # TODO: Everything other than start_of_schedule, prior, flex_model and flex_context is to be deprecated in 0.13. We let the scheduler decide (flex model) or nest (portfolio) @use_kwargs( { + "start_of_schedule": AwareDateTimeField( + data_key="start", format="iso", required=True + ), + "belief_time": AwareDateTimeField(format="iso", data_key="prior"), + "flex_model": fields.Dict(data_key="flex-model"), "soc_sensor_id": fields.Str(data_key="soc-sensor", required=False), "roundtrip_efficiency": QuantityField( "%", @@ -217,9 +231,6 @@ def get_data(self, response: dict): "start_value": fields.Float(data_key="soc-at-start"), "soc_min": fields.Float(data_key="soc-min"), "soc_max": fields.Float(data_key="soc-max"), - "start_of_schedule": AwareDateTimeField( - data_key="start", format="iso", required=False - ), "unit": fields.Str( data_key="soc-unit", validate=OneOf( @@ -229,11 +240,15 @@ def get_data(self, response: dict): ] ), ), # todo: allow unit to be set per field, using QuantityField("%", validate=validate.Range(min=0, max=1)) - "targets": fields.List(fields.Nested(TargetSchema), data_key="soc-targets"), + "targets": fields.List( + fields.Nested(SOCTargetSchema), data_key="soc-targets" + ), "prefer_charging_sooner": fields.Bool( data_key="prefer-charging-sooner", required=False ), - # todo: add a duration parameter, instead of falling back to FLEXMEASURES_PLANNING_HORIZON + "flex_context": fields.Nested( + FlexContextSchema, required=False, data_key="flex-context" + ), "consumption_price_sensor": SensorIdField( data_key="consumption-price-sensor", required=False ), @@ -246,18 +261,23 @@ def get_data(self, response: dict): }, location="json", ) - @optional_prior_accepted() def trigger_schedule( # noqa: C901 self, sensor: Sensor, start_of_schedule: datetime, - unit: str, - prior: datetime, + belief_time: Optional[datetime] = None, + start_value: Optional[float] = None, + soc_min: Optional[float] = None, + soc_max: Optional[float] = None, + unit: Optional[str] = None, roundtrip_efficiency: Optional[ur.Quantity] = None, prefer_charging_sooner: Optional[bool] = True, consumption_price_sensor: Optional[Sensor] = None, production_price_sensor: Optional[Sensor] = None, inflexible_device_sensors: Optional[List[Sensor]] = None, + soc_sensor_id: Optional[int] = None, + flex_model: Optional[dict] = None, + flex_context: Optional[dict] = None, **kwargs, ): """ @@ -270,28 +290,25 @@ def trigger_schedule( # noqa: C901 In this request, you can describe: - - the schedule (start, unit, prior) - - the flexibility model for the sensor (see below, only storage models are supported at the moment) - - the EMS the sensor operates in (inflexible device sensors, and sensors that put a price on consumption and/or production) + - the schedule's main features (when does it start, what unit should it report, prior to what time can we assume knowledge) + - the flexibility model for the sensor (state and constraint variables, e.g. current state of charge of a battery, or connection capacity) + - the flexibility context which the sensor operates in (other sensors under the same EMS which are relevant, e.g. prices) - Note: This endpoint does not support to schedule an EMS with multiple flexible sensors at once. This will happen in another endpoint. - See https://github.com/FlexMeasures/flexmeasures/issues/485. Until then, it is possible to call this endpoint for one flexible endpoint at a time - (considering already scheduled sensors as inflexible). + For details on flexibility model and context, see :ref:`describing_flexibility`. + Below, we'll also list some examples. - Flexibility models apply to the sensor's asset type: + .. note:: This endpoint does not support to schedule an EMS with multiple flexible sensors at once. This will happen in another endpoint. + See https://github.com/FlexMeasures/flexmeasures/issues/485. Until then, it is possible to call this endpoint for one flexible endpoint at a time + (considering already scheduled sensors as inflexible). - 1) For storage sensors (e.g. battery, charge points), the schedule deals with the state of charge (SOC). - The possible flexibility parameters are: + The length of schedules is set by the config setting :ref:`planning_horizon_config`, defaulting to 12 hours. - - soc-at-start (defaults to 0) - - soc-unit (kWh or MWh) - - soc-min (defaults to 0) - - soc-max (defaults to max soc target) - - soc-targets (defaults to NaN values) - - roundtrip-efficiency (defaults to 100%) - - prefer-charging-sooner (defaults to True, also signals a preference to discharge later) + .. todo:: add a schedule duration parameter, instead of always falling back to FLEXMEASURES_PLANNING_HORIZON - 2) Heat pump sensors are work in progress. + The appropriate algorithm is chosen by FlexMeasures (based on asset type). + It's also possible to use custom schedulers and custom flexibility models, see :ref:`plugin_customization`. + + If you have ideas for algorithms that should be part of FlexMeasures, let us know: https://flexmeasures.io/get-in-touch/ **Example request A** @@ -301,13 +318,15 @@ def trigger_schedule( # noqa: C901 { "start": "2015-06-02T10:00:00+00:00", - "soc-at-start": 12.1, - "soc-unit": "kWh" + "flex-model": { + "soc-at-start": 12.1, + "soc-unit": "kWh" + } } **Example request B** - This message triggers a schedule starting at 10.00am, at which the state of charge (soc) is 12.1 kWh, + This message triggers a schedule for a storage asset, starting at 10.00am, at which the state of charge (soc) is 12.1 kWh, with a target state of charge of 25 kWh at 4.00pm. The minimum and maximum soc are set to 10 and 25 kWh, respectively. Roundtrip efficiency for use in scheduling is set to 98%. @@ -321,20 +340,24 @@ def trigger_schedule( # noqa: C901 { "start": "2015-06-02T10:00:00+00:00", - "soc-at-start": 12.1, - "soc-unit": "kWh", - "soc-targets": [ - { - "value": 25, - "datetime": "2015-06-02T16:00:00+00:00" - } - ], - "soc-min": 10, - "soc-max": 25, - "roundtrip-efficiency": 0.98, - "consumption-price-sensor": 9, - "production-price-sensor": 10, - "inflexible-device-sensors": [13, 14, 15] + "flex-model": { + "soc-at-start": 12.1, + "soc-unit": "kWh", + "soc-targets": [ + { + "value": 25, + "datetime": "2015-06-02T16:00:00+00:00" + } + ], + "soc-min": 10, + "soc-max": 25, + "roundtrip-efficiency": 0.98, + }, + "flex-context": { + "consumption-price-sensor": 9, + "production-price-sensor": 10, + "inflexible-device-sensors": [13, 14, 15] + } } **Example response** @@ -356,120 +379,108 @@ def trigger_schedule( # noqa: C901 :reqheader Content-Type: application/json :resheader Content-Type: application/json :status 200: PROCESSED - :status 400: INVALID_TIMEZONE, INVALID_DATETIME, INVALID_DOMAIN, INVALID_UNIT, PTUS_INCOMPLETE + :status 400: INVALID_DATA :status 401: UNAUTHORIZED :status 403: INVALID_SENDER :status 405: INVALID_METHOD + :status 422: UNPROCESSABLE_ENTITY """ - # todo: if a soc-sensor entity address is passed, persist those values to the corresponding sensor - # (also update the note in posting_data.rst about flexibility states not being persisted). + # -- begin deprecation logic, can be removed after 0.13 + deprecate_fields( + DEPRECATED_FLEX_CONFIGURATION_FIELDS, + deprecation_date="2022-12-14", + deprecation_link="https://flexmeasures.readthedocs.io/en/latest/api/change_log.html#v3-0-5-2022-12-30", + sunset_date="2023-02-01", + sunset_link="https://flexmeasures.readthedocs.io/en/latest/api/change_log.html#v3-0-5-2022-12-30", + ) + found_fields: Dict[str, List[str]] = dict(model=[], context=[]) + deprecation_message = "" + # flex-model + for param, param_name in [ + (start_value, "soc-at-start"), + (soc_min, "soc-min"), + (soc_max, "soc-max"), + (unit, "soc-unit"), + (roundtrip_efficiency, "roundtrip-efficiency"), + ( + prefer_charging_sooner, + "prefer-charging-sooner", + ), + ]: + if flex_model is None: + flex_model = {} + if param is not None: + if param_name not in flex_model: + if param_name == "roundtrip-efficiency" and type(param) != float: + param = param.to(ur.Quantity("dimensionless")).magnitude # type: ignore + flex_model[param_name] = param + found_fields["model"].append(param_name) + # flex-context + for param, param_name in [ + ( + consumption_price_sensor, + "consumption-price-sensor", + ), + ( + production_price_sensor, + "production-price-sensor", + ), + ( + inflexible_device_sensors, + "inflexible-device-sensors", + ), + ]: + if flex_context is None: + flex_context = {} + if param is not None: + if param_name not in flex_context: + flex_context[param_name] = param + found_fields["context"].append(param_name) + if found_fields["model"] or found_fields["context"]: + deprecation_message = "The following fields you sent are deprecated and will be sunset in the next version:" + if found_fields["model"]: + deprecation_message += f" {', '.join(found_fields['model'])} (please pass as part of flex_model)." + if found_fields["context"]: + deprecation_message += f" {', '.join(found_fields['context'])} (please pass as part of flex_context)." + + if soc_sensor_id is not None: + deprecation_message += ( + "The field soc-sensor-id is be deprecated and will be sunset in v0.13." + ) + # -- end deprecation logic - # get starting value - if "start_value" not in kwargs: - return ptus_incomplete() - try: - start_value = float(kwargs.get("start_value")) # type: ignore - except ValueError: - extra_info = "Request includes empty or ill-formatted value(s)." - current_app.logger.warning(extra_info) - return ptus_incomplete(extra_info) - if unit == "kWh": - start_value = start_value / 1000.0 - - # Convert round-trip efficiency to dimensionless (to the (0,1] range) - if roundtrip_efficiency is not None: - roundtrip_efficiency = roundtrip_efficiency.to( - ur.Quantity("dimensionless") - ).magnitude - - # get optional min and max SOC - soc_min = kwargs.get("soc_min", None) - soc_max = kwargs.get("soc_max", None) - # TODO: review when we moved away from capacity having to be described in MWh - if soc_min is not None and unit == "kWh": - soc_min = soc_min / 1000.0 - if soc_max is not None and unit == "kWh": - soc_max = soc_max / 1000.0 - - # set soc targets end_of_schedule = start_of_schedule + current_app.config.get( # type: ignore "FLEXMEASURES_PLANNING_HORIZON" ) - resolution = sensor.event_resolution - soc_targets = initialize_series( - np.nan, + scheduler_kwargs = dict( + sensor=sensor, start=start_of_schedule, end=end_of_schedule, - resolution=resolution, - inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") + resolution=sensor.event_resolution, + belief_time=belief_time, # server time if no prior time was sent + flex_model=flex_model, + flex_context=flex_context, ) - # todo: move this deserialization of targets into newly-created ScheduleTargetSchema - for target in kwargs.get("targets", []): - # get target value - if "value" not in target: - return ptus_incomplete("Target missing 'value' parameter.") - try: - target_value = float(target["value"]) - except ValueError: - extra_info = "Request includes empty or ill-formatted soc target(s)." - current_app.logger.warning(extra_info) - return ptus_incomplete(extra_info) - if unit == "kWh": - target_value = target_value / 1000.0 - - # get target datetime - if "datetime" not in target: - return invalid_datetime("Target missing datetime parameter.") - else: - target_datetime = target["datetime"] - if target_datetime is None: - return invalid_datetime( - "Cannot parse target datetime string %s as iso date" - % target["datetime"] - ) - if target_datetime.tzinfo is None: - current_app.logger.warning( - "Cannot parse timezone of target 'datetime' value %s" - % target["datetime"] - ) - return invalid_timezone( - "Target datetime should explicitly state a timezone." - ) - if target_datetime > end_of_schedule: - return invalid_datetime( - f'Target datetime exceeds {end_of_schedule}. Maximum scheduling horizon is {current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")}.' - ) - target_datetime = target_datetime.astimezone( - soc_targets.index.tzinfo - ) # otherwise DST would be problematic - - # set target - soc_targets.loc[target_datetime] = target_value + try: + # We create a scheduler, so the flex config is also checked and errors are returned here + scheduler = find_scheduler_class(sensor)(**scheduler_kwargs) + scheduler.deserialize_config() + except ValidationError as err: + return invalid_flex_config(err.messages) + except ValueError as err: + return invalid_flex_config(str(err)) job = create_scheduling_job( - sensor, - start_of_schedule, - end_of_schedule, - resolution=resolution, - belief_time=prior, # server time if no prior time was sent - storage_specs=dict( - soc_at_start=start_value, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, - prefer_charging_sooner=prefer_charging_sooner, - ), - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_device_sensors, + **scheduler_kwargs, enqueue=True, ) - response = dict(schedule=job.id) + scheduler.persist_flex_model() + db.session.commit() - d, s = request_processed() + response = dict(schedule=job.id) + d, s = request_processed(deprecation_message) return dict(**response, **d), s @route("//schedules/", methods=["GET"]) @@ -563,7 +574,7 @@ def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwarg return unknown_schedule("Scheduling job has an unknown status.") schedule_start = job.kwargs["start"] - data_source = get_data_source_for_job(job, sensor=sensor) + data_source = get_data_source_for_job(job) if data_source is None: return unknown_schedule( error_message + f"no data source could be found for {data_source}." diff --git a/flexmeasures/api/v3_0/tests/conftest.py b/flexmeasures/api/v3_0/tests/conftest.py index be9ddd120..95447a7e1 100644 --- a/flexmeasures/api/v3_0/tests/conftest.py +++ b/flexmeasures/api/v3_0/tests/conftest.py @@ -54,6 +54,13 @@ def setup_inactive_user(db, setup_accounts, setup_roles_users): ) +@pytest.fixture(scope="function") +def keep_scheduling_queue_empty(app): + app.queues["scheduling"].empty() + yield + app.queues["scheduling"].empty() + + def add_incineration_line(db, test_supplier_user) -> dict[str, Sensor]: incineration_type = GenericAssetType( name="waste incinerator", diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index d15b38174..d26fabd06 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -1,14 +1,13 @@ from flask import url_for import pytest -from datetime import timedelta from isodate import parse_datetime import pandas as pd from rq.job import Job -from flexmeasures.api.tests.utils import get_auth_token +from flexmeasures.api.tests.utils import check_deprecation, get_auth_token from flexmeasures.api.v1_3.tests.utils import message_for_get_device_message -from flexmeasures.api.v3_0.tests.utils import message_for_post_udi_event +from flexmeasures.api.v3_0.tests.utils import message_for_trigger_schedule from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.tests.utils import work_on_rq from flexmeasures.data.services.scheduling import ( @@ -18,11 +17,65 @@ from flexmeasures.utils.calculations import integrate_time_series +@pytest.mark.parametrize( + "message, field, sent_value, err_msg", + [ + (message_for_trigger_schedule(), "soc-minn", 3, "Unknown field"), + ( + message_for_trigger_schedule(), + "soc-min", + "not-a-float", + "Not a valid number", + ), + (message_for_trigger_schedule(), "soc-unit", "MWH", "Must be one of"), + ], +) +def test_trigger_schedule_with_invalid_flexmodel( + app, + add_battery_assets, + keep_scheduling_queue_empty, + message, + field, + sent_value, + err_msg, +): + sensor = Sensor.query.filter(Sensor.name == "Test battery").one_or_none() + with app.test_client() as client: + if sent_value: # if None, field is a term we expect in the response, not more + message["flex-model"][field] = sent_value + + auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest") + trigger_schedule_response = client.post( + url_for("SensorAPI:trigger_schedule", id=sensor.id), + json=message, + headers={"Authorization": auth_token}, + ) + print("Server responded with:\n%s" % trigger_schedule_response.json) + check_deprecation(trigger_schedule_response, deprecation=None, sunset=None) + assert trigger_schedule_response.status_code == 422 + assert field in trigger_schedule_response.json["message"]["json"] + if isinstance(trigger_schedule_response.json["message"]["json"], str): + # ValueError + assert err_msg in trigger_schedule_response.json["message"]["json"] + else: + # ValidationError (marshmallow) + assert ( + err_msg in trigger_schedule_response.json["message"]["json"][field][0] + ) + + @pytest.mark.parametrize( "message, asset_name", [ - (message_for_post_udi_event(), "Test battery"), - (message_for_post_udi_event(targets=True), "Test charging station"), + (message_for_trigger_schedule(deprecated_format_pre012=True), "Test battery"), + (message_for_trigger_schedule(), "Test battery"), + ( + message_for_trigger_schedule( + with_targets=True, deprecated_format_pre012=True + ), + "Test charging station", + ), + (message_for_trigger_schedule(with_targets=True), "Test charging station"), ], ) def test_trigger_and_get_schedule( @@ -31,16 +84,21 @@ def test_trigger_and_get_schedule( add_battery_assets, battery_soc_sensor, add_charging_station_assets, + keep_scheduling_queue_empty, message, asset_name, ): # trigger a schedule through the /sensors//schedules/trigger [POST] api endpoint message["roundtrip-efficiency"] = 0.98 message["soc-min"] = 0 - message["soc-max"] = 25 + message["soc-max"] = 4 + assert len(app.queues["scheduling"]) == 0 + + sensor = Sensor.query.filter(Sensor.name == asset_name).one_or_none() + # This makes sure we have fresh data. A hack we can remove after the deprecation cases are removed. + TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id).delete() + with app.test_client() as client: - sensor = Sensor.query.filter(Sensor.name == asset_name).one_or_none() - message["soc-sensor"] = f"ea1.2018-06.localhost:fm1.{battery_soc_sensor.id}" auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest") trigger_schedule_response = client.post( url_for("SensorAPI:trigger_schedule", id=sensor.id), @@ -48,7 +106,11 @@ def test_trigger_and_get_schedule( headers={"Authorization": auth_token}, ) print("Server responded with:\n%s" % trigger_schedule_response.json) + check_deprecation(trigger_schedule_response) assert trigger_schedule_response.status_code == 200 + assert ( + "soc-min" in trigger_schedule_response.json["message"] + ) # deprecation warning job_id = trigger_schedule_response.json["schedule"] # look for scheduling jobs in queue @@ -80,7 +142,7 @@ def test_trigger_and_get_schedule( .filter(TimedBelief.source_id == scheduler_source.id) .all() ) - resolution = timedelta(minutes=15) + resolution = sensor.event_resolution consumption_schedule = pd.Series( [-v.event_value for v in power_values], index=pd.DatetimeIndex([v.event_start for v in power_values], freq=resolution), diff --git a/flexmeasures/api/v3_0/tests/utils.py b/flexmeasures/api/v3_0/tests/utils.py index a44827861..d7875acfc 100644 --- a/flexmeasures/api/v3_0/tests/utils.py +++ b/flexmeasures/api/v3_0/tests/utils.py @@ -40,21 +40,35 @@ def get_asset_post_data(account_id: int = 1, asset_type_id: int = 1) -> dict: return post_data -def message_for_post_udi_event( +def message_for_trigger_schedule( unknown_prices: bool = False, - targets: bool = False, + with_targets: bool = False, + realistic_targets: bool = True, + deprecated_format_pre012: bool = False, ) -> dict: message = { "start": "2015-01-01T00:00:00+01:00", - "soc-at-start": 12.1, - "soc-unit": "kWh", } - if targets: - message["soc-targets"] = [ - {"value": 25, "datetime": "2015-01-02T23:00:00+01:00"} - ] if unknown_prices: message[ "start" ] = "2040-01-01T00:00:00+01:00" # We have no beliefs in our test database about 2040 prices + + if deprecated_format_pre012: + message["soc-at-start"] = 12.1 + message["soc-unit"] = "kWh" + else: + message["flex-model"] = {} + message["flex-model"]["soc-at-start"] = 12.1 + message["flex-model"]["soc-unit"] = "kWh" + if with_targets: + if realistic_targets: + targets = [{"value": 3500, "datetime": "2015-01-02T23:00:00+01:00"}] + else: + # this target is actually higher than soc_max_in_mwh on the battery's sensor attributes + targets = [{"value": 25000, "datetime": "2015-01-02T23:00:00+01:00"}] + if deprecated_format_pre012: + message["soc-targets"] = targets + else: + message["flex-model"]["soc-targets"] = targets return message diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index de89a99d5..bd8d809b0 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -6,7 +6,6 @@ import json from marshmallow import validate -import numpy as np import pandas as pd import pytz from flask import current_app as app @@ -19,6 +18,7 @@ import timely_beliefs.utils as tb_utils from workalendar.registry import registry as workalendar_registry +from flexmeasures.cli.utils import DeprecatedDefaultGroup from flexmeasures.data import db from flexmeasures.data.scripts.data_gen import ( add_transmission_zone_asset, @@ -28,7 +28,6 @@ from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.data.services.scheduling import make_schedule, create_scheduling_job from flexmeasures.data.services.users import create_user -from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts from flexmeasures.data.models.time_series import ( Sensor, @@ -823,14 +822,37 @@ def create_forecasts( ) -@fm_add_data.command("schedule") +# todo: repurpose `flexmeasures add schedule` (deprecated since v0.12), +# - see https://github.com/FlexMeasures/flexmeasures/pull/537#discussion_r1048680231 +# - hint for repurposing to invoke custom logic instead of a default subcommand: +# @fm_add_data.group("schedule", invoke_without_command=True) +# def create_schedule(): +# if ctx.invoked_subcommand: +# ... +@fm_add_data.group( + "schedule", + cls=DeprecatedDefaultGroup, + default="storage", + deprecation_message="The command 'flexmeasures add schedule' is deprecated. Please use `flexmeasures add schedule for-storage` instead.", +) +@click.pass_context +@with_appcontext +def create_schedule(ctx): + """(Deprecated) Create a new schedule for a given power sensor. + + THIS COMMAND HAS BEEN RENAMED TO `flexmeasures add schedule for-storage` + """ + pass + + +@create_schedule.command("for-storage") @with_appcontext @click.option( "--sensor-id", "power_sensor", type=SensorIdField(), required=True, - help="Create schedule for this sensor. Follow up with the sensor's ID.", + help="Create schedule for this sensor. Should be a power sensor. Follow up with the sensor's ID.", ) @click.option( "--consumption-price-sensor", @@ -914,7 +936,7 @@ def create_forecasts( help="Whether to queue a scheduling job instead of computing directly. " "To process the job, run a worker (on any computer, but configured to the same databases) to process the 'scheduling' queue. Defaults to False.", ) -def create_schedule( +def add_schedule_for_storage( power_sensor: Sensor, consumption_price_sensor: Sensor, production_price_sensor: Sensor, @@ -928,12 +950,12 @@ def create_schedule( roundtrip_efficiency: Optional[ur.Quantity] = None, as_job: bool = False, ): - """Create a new schedule for a given power sensor. + """Create a new schedule for a storage asset. Current limitations: - - only supports battery assets and Charge Points - - only supports datetimes on the hour or a multiple of the sensor resolution thereafter + - Limited to power sensors (probably possible to generalize to non-electric assets) + - Only supports datetimes on the hour or a multiple of the sensor resolution thereafter """ # todo: deprecate the 'optimization-context-id' argument in favor of 'consumption-price-sensor' (announced v0.11.0) @@ -944,36 +966,23 @@ def create_schedule( consumption_price_sensor, ) - # Parse input + # Parse input and required sensor attributes if not power_sensor.measures_power: click.echo(f"Sensor with ID {power_sensor.id} is not a power sensor.") raise click.Abort() if production_price_sensor is None: production_price_sensor = consumption_price_sensor end = start + duration - for attribute in ("min_soc_in_mwh", "max_soc_in_mwh"): - try: - check_required_attributes(power_sensor, [(attribute, float)]) - except MissingAttributeException: - click.echo(f"{power_sensor} has no {attribute} attribute.") - raise click.Abort() - soc_targets = initialize_series( - np.nan, - start=pd.Timestamp(start).tz_convert(power_sensor.timezone), - end=pd.Timestamp(end).tz_convert(power_sensor.timezone), - resolution=power_sensor.event_resolution, - inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") - ) - - # Convert round-trip efficiency to dimensionless - if roundtrip_efficiency is not None: - roundtrip_efficiency = roundtrip_efficiency.to( - ur.Quantity("dimensionless") - ).magnitude - # Convert SoC units to MWh, given the storage capacity + # Convert SoC units (we ask for % in this CLI) to MWh, given the storage capacity + try: + check_required_attributes(power_sensor, [("max_soc_in_mwh", float)]) + except MissingAttributeException: + click.echo(f"Sensor {power_sensor} has no max_soc_in_mwh attribute.") + raise click.Abort() capacity_str = f"{power_sensor.get_attribute('max_soc_in_mwh')} MWh" soc_at_start = convert_units(soc_at_start.magnitude, soc_at_start.units, "MWh", capacity=capacity_str) # type: ignore + soc_targets = [] for soc_target_tuple in soc_target_strings: soc_target_value_str, soc_target_dt_str = soc_target_tuple soc_target_value = convert_units( @@ -983,48 +992,36 @@ def create_schedule( capacity=capacity_str, ) soc_target_datetime = pd.Timestamp(soc_target_dt_str) - soc_targets.loc[soc_target_datetime] = soc_target_value + soc_targets.append(dict(value=soc_target_value, datetime=soc_target_datetime)) if soc_min is not None: soc_min = convert_units(soc_min.magnitude, str(soc_min.units), "MWh", capacity=capacity_str) # type: ignore if soc_max is not None: soc_max = convert_units(soc_max.magnitude, str(soc_max.units), "MWh", capacity=capacity_str) # type: ignore + scheduling_kwargs = dict( + sensor=power_sensor, + start=start, + end=end, + belief_time=server_now(), + resolution=power_sensor.event_resolution, + flex_model={ + "soc-at-start": soc_at_start, + "soc-targets": soc_targets, + "soc-min": soc_min, + "soc-max": soc_max, + "roundtrip-efficiency": roundtrip_efficiency, + }, + flex_context={ + "consumption-price-sensor": consumption_price_sensor.id, + "production-price-sensor": production_price_sensor.id, + }, + ) if as_job: - job = create_scheduling_job( - sensor=power_sensor, - start_of_schedule=start, - end_of_schedule=end, - belief_time=server_now(), - resolution=power_sensor.event_resolution, - storage_specs=dict( - soc_at_start=soc_at_start, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, - ), - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - ) + job = create_scheduling_job(**scheduling_kwargs) if job: print(f"New scheduling job {job.id} has been added to the queue.") else: - success = make_schedule( - sensor_id=power_sensor.id, - start=start, - end=end, - belief_time=server_now(), - resolution=power_sensor.event_resolution, - storage_specs=dict( - soc_at_start=soc_at_start, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, - ), - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - ) + success = make_schedule(**scheduling_kwargs) if success: print("New schedule is stored.") diff --git a/flexmeasures/cli/utils.py b/flexmeasures/cli/utils.py new file mode 100644 index 000000000..cb5695ff9 --- /dev/null +++ b/flexmeasures/cli/utils.py @@ -0,0 +1,48 @@ +import click +from click_default_group import DefaultGroup + + +class DeprecatedDefaultGroup(DefaultGroup): + """Invokes a default subcommand, *and* shows a deprecation message. + + Also adds the `invoked_default` boolean attribute to the context. + A group callback can use this information to figure out if it's being executed directly + (invoking the default subcommand) or because the execution flow passes onwards to a subcommand. + By default it's None, but it can be the name of the default subcommand to execute. + + .. sourcecode:: python + + import click + from flexmeasures.cli.utils import DeprecatedDefaultGroup + + @click.group(cls=DeprecatedDefaultGroup, default="bar", deprecation_message="renamed to `foo bar`.") + def foo(ctx): + if ctx.invoked_default: + click.echo("foo") + + @foo.command() + def bar(): + click.echo("bar") + + .. sourcecode:: console + + $ flexmeasures foo + DeprecationWarning: renamed to `foo bar`. + foo + bar + $ flexmeasures foo bar + bar + """ + + def __init__(self, *args, **kwargs): + self.deprecation_message = "DeprecationWarning: " + kwargs.pop( + "deprecation_message", "" + ) + super().__init__(*args, **kwargs) + + def get_command(self, ctx, cmd_name): + ctx.invoked_default = None + if cmd_name not in self.commands: + click.echo(click.style(self.deprecation_message, fg="red"), err=True) + ctx.invoked_default = self.default_cmd_name + return super().get_command(ctx, cmd_name) diff --git a/flexmeasures/data/models/planning/__init__.py b/flexmeasures/data/models/planning/__init__.py index 622b2ccf4..4b4a1f14d 100644 --- a/flexmeasures/data/models/planning/__init__.py +++ b/flexmeasures/data/models/planning/__init__.py @@ -1,11 +1,144 @@ +from datetime import datetime, timedelta from typing import Optional + import pandas as pd +from flask import current_app + +from flexmeasures.data.models.time_series import Sensor class Scheduler: """ - Superclass for all FlexMeasures Schedulers + Superclass for all FlexMeasures Schedulers. + + A scheduler currently computes the schedule for one flexible asset. + TODO: extend to multiple flexible assets. + + The scheduler knows the power sensor of the flexible asset. + It also knows the basic timing parameter of the schedule (start, end, resolution), including the point in time when + knowledge can be assumed to be available (belief_time). + + Furthermore, the scheduler needs to have knowledge about the asset's flexibility model (under what constraints + can the schedule be optimized?) and the system's flexibility context (which other sensors are relevant, e.g. prices). + These two flexibility configurations are usually fed in from outside, so the scheduler should check them. + The deserialize_flex_config function can be used for that. + """ - def schedule(*args, **kwargs) -> Optional[pd.Series]: + __version__ = None + __author__ = None + + sensor: Sensor + start: datetime + end: datetime + resolution: timedelta + belief_time: datetime + round_to_decimals: int + flex_model: Optional[dict] = None + flex_context: Optional[dict] = None + + config_deserialized = False # This flag allows you to let the scheduler skip checking config, like timing, flex_model and flex_context + + def __init__( + self, + sensor, + start, + end, + resolution, + belief_time: Optional[datetime] = None, + round_to_decimals: Optional[int] = 6, + flex_model: Optional[dict] = None, + flex_context: Optional[dict] = None, + ): + """ + Initialize a new Scheduler. + + TODO: We might adapt the class design, so that A Scheduler object is initialized with configuration parameters, + and can then be used multiple times (via compute_schedule()) to compute schedules of different kinds, e.g. + If we started later (put in a later start), what would the schedule be? + If we could change set points less often (put in a coarser resolution), what would the schedule be? + If we knew what was going to happen (put in a later belief_time), what would the schedule have been? + For now, we don't see the best separation between config and state parameters (esp. within flex models) + E.g. start and flex_model[soc_at_start] are intertwined. + """ + self.sensor = sensor + self.start = start + self.end = end + self.resolution = resolution + self.belief_time = belief_time + self.round_to_decimals = round_to_decimals + if flex_model is None: + flex_model = {} + self.flex_model = flex_model + if flex_context is None: + flex_context = {} + self.flex_context = flex_context + + def compute_schedule(self) -> Optional[pd.Series]: + """ + Overwrite with the actual computation of your schedule. + """ return None + + @classmethod + def get_data_source_info(cls: type) -> dict: + """ + Create and return the data source info, from which a data source lookup/creation is possible. + See for instance get_data_source_for_job(). + """ + source_info = dict( + model=cls.__name__, version="1", name="Unknown author" + ) # default + + if hasattr(cls, "__version__"): + source_info["version"] = str(cls.__version__) + else: + current_app.logger.warning( + f"Scheduler {cls.__name__} loaded, but has no __version__ attribute." + ) + if hasattr(cls, "__author__"): + source_info["name"] = str(cls.__author__) + else: + current_app.logger.warning( + f"Scheduler {cls.__name__} has no __author__ attribute." + ) + return source_info + + def persist_flex_model(self): + """ + If useful, (parts of) the flex model can be persisted (e.g. on the sensor) here, + e.g. as asset attributes, sensor attributes or as sensor data (beliefs). + """ + pass + + def deserialize_config(self): + """ + Check all configurations we have, throwing either ValidationErrors or ValueErrors. + Other code can decide if/how to handle those. + """ + self.deserialize_timing_config() + self.deserialize_flex_config() + self.config_deserialized = True + + def deserialize_timing_config(self): + """ + Check if the timing of the schedule is valid. + Raises ValueErrors. + """ + if self.start > self.end: + raise ValueError(f"Start {self.start} cannot be after end {self.end}.") + # TODO: check if resolution times X fits into schedule length + # TODO: check if scheduled events would start "on the clock" w.r.t. resolution (see GH#10) + + def deserialize_flex_config(self): + """ + Check if the flex model and flex context are valid. Should be overwritten. + + Ideas: + - Apply a schema to check validity (see in-built flex model schemas) + - Check for inconsistencies between settings (can also happen in Marshmallow) + - fill in missing values from the scheduler's knowledge (e.g. sensor attributes) + + Raises ValidationErrors or ValueErrors. + """ + pass diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py index 030c81d44..c29429c7f 100644 --- a/flexmeasures/data/models/planning/storage.py +++ b/flexmeasures/data/models/planning/storage.py @@ -1,9 +1,12 @@ +from __future__ import annotations + from datetime import datetime, timedelta -from typing import Optional, List, Union +from typing import List, Dict import pandas as pd +import numpy as np +from flask import current_app -from flexmeasures import Sensor from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.linear_optimization import device_scheduler from flexmeasures.data.models.planning.utils import ( @@ -14,6 +17,8 @@ get_power_values, fallback_charging_policy, ) +from flexmeasures.data.schemas.scheduling.storage import StorageFlexModelSchema +from flexmeasures.data.schemas.scheduling import FlexContextSchema class StorageScheduler(Scheduler): @@ -21,32 +26,35 @@ class StorageScheduler(Scheduler): __version__ = "1" __author__ = "Seita" - def schedule( + def compute_schedule( self, - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - storage_specs: dict, - consumption_price_sensor: Optional[Sensor] = None, - production_price_sensor: Optional[Sensor] = None, - inflexible_device_sensors: Optional[List[Sensor]] = None, - belief_time: Optional[datetime] = None, - round_to_decimals: Optional[int] = 6, - ) -> Union[pd.Series, None]: + ) -> pd.Series | None: """Schedule a battery or Charge Point based directly on the latest beliefs regarding market prices within the specified time window. For the resulting consumption schedule, consumption is defined as positive values. """ + if not self.config_deserialized: + self.deserialize_config() + + start = self.start + end = self.end + resolution = self.resolution + belief_time = self.belief_time + sensor = self.sensor + soc_at_start = self.flex_model.get("soc_at_start") + soc_targets = self.flex_model.get("soc_targets") + soc_min = self.flex_model.get("soc_min") + soc_max = self.flex_model.get("soc_max") + roundtrip_efficiency = self.flex_model.get("roundtrip_efficiency") + prefer_charging_sooner = self.flex_model.get("prefer_charging_sooner", True) - soc_at_start = storage_specs.get("soc_at_start") - soc_targets = storage_specs.get("soc_targets") - soc_min = storage_specs.get("soc_min") - soc_max = storage_specs.get("soc_max") - roundtrip_efficiency = storage_specs.get("roundtrip_efficiency") - prefer_charging_sooner = storage_specs.get("prefer_charging_sooner", True) + consumption_price_sensor = self.flex_context.get("consumption_price_sensor") + production_price_sensor = self.flex_context.get("production_price_sensor") + inflexible_device_sensors = self.flex_context.get( + "inflexible_device_sensors", [] + ) # Check for required Sensor attributes - sensor.check_required_attributes([("capacity_in_mw", (float, int))]) + self.sensor.check_required_attributes([("capacity_in_mw", (float, int))]) # Check for known prices or price forecasts, trimming planning window accordingly up_deviation_prices, (start, end) = get_prices( @@ -80,7 +88,7 @@ def schedule( ) # Set up commitments to optimise for - commitment_quantities = [initialize_series(0, start, end, resolution)] + commitment_quantities = [initialize_series(0, start, end, self.resolution)] # Todo: convert to EUR/(deviation of commitment, which is in MW) commitment_upwards_deviation_price = [ @@ -101,8 +109,6 @@ def schedule( "derivative down efficiency", "derivative up efficiency", ] - if inflexible_device_sensors is None: - inflexible_device_sensors = [] device_constraints = [ initialize_df(columns, start, end, resolution) for i in range(1 + len(inflexible_device_sensors)) @@ -114,15 +120,17 @@ def schedule( beliefs_before=belief_time, sensor=inflexible_sensor, ) - if soc_targets is not None and not soc_targets.empty: - soc_targets = soc_targets.tz_convert("UTC") - device_constraints[0]["equals"] = soc_targets.shift( - -1, freq=resolution - ).values * (timedelta(hours=1) / resolution) - soc_at_start * ( - timedelta(hours=1) / resolution - ) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time, - # while the "equals" constraint defines what the total stock should be at the end of a time slot, - # where the time slot is indexed by its starting time) + if soc_targets is not None: + # make an equality series with the SOC targets set in the flex model + # device_constraints[0] refers to the flexible device we are scheduling + device_constraints[0]["equals"] = build_device_soc_targets( + soc_targets, + soc_at_start, + start, + end, + resolution, + ) + device_constraints[0]["min"] = (soc_min - soc_at_start) * ( timedelta(hours=1) / resolution ) @@ -172,7 +180,192 @@ def schedule( battery_schedule = ems_schedule[0] # Round schedule - if round_to_decimals: - battery_schedule = battery_schedule.round(round_to_decimals) + if self.round_to_decimals: + battery_schedule = battery_schedule.round(self.round_to_decimals) return battery_schedule + + def persist_flex_model(self): + """Store new soc info as GenericAsset attributes""" + self.sensor.generic_asset.set_attribute("soc_datetime", self.start.isoformat()) + if self.flex_model.get("soc_unit") == "kWh": + self.sensor.generic_asset.set_attribute( + "soc_in_mwh", self.flex_model["soc_at_start"] / 1000 + ) + else: + self.sensor.generic_asset.set_attribute( + "soc_in_mwh", self.flex_model["soc_at_start"] + ) + + def deserialize_flex_config(self): + """ + Deserialize storage flex model and the flex context against schemas. + Before that, we fill in values from wider context, if possible. + Mostly, we allow several fields to come from sensor attributes. + + Note: Before we apply the flex config schemas, we need to use the flex config identifiers with hyphens, + (this is how they are represented to outside, e.g. by the API), after deserialization + we use internal schema names (with underscores). + """ + if self.flex_model is None: + self.flex_model = {} + + # Check state of charge. + # Preferably, a starting soc is given. + # Otherwise, we try to retrieve the current state of charge from the asset (if that is the valid one at the start). + # If that doesn't work, we set the starting soc to 0 (some assets don't use the concept of a state of charge, + # and without soc targets and limits the starting soc doesn't matter). + if ( + "soc-at-start" not in self.flex_model + or self.flex_model["soc-at-start"] is None + ): + if ( + self.start == self.sensor.get_attribute("soc_datetime") + and self.sensor.get_attribute("soc_in_mwh") is not None + ): + self.flex_model["soc-at-start"] = self.sensor.get_attribute( + "soc_in_mwh" + ) + else: + self.flex_model["soc-at-start"] = 0 + + # Check for round-trip efficiency + if ( + "roundtrip-efficiency" not in self.flex_model + or self.flex_model["roundtrip-efficiency"] is None + ): + # Get default from sensor, or use 100% otherwise + self.flex_model["roundtrip-efficiency"] = self.sensor.get_attribute( + "roundtrip_efficiency", 1 + ) + if ( + self.flex_model["roundtrip-efficiency"] <= 0 + or self.flex_model["roundtrip-efficiency"] > 1 + ): + raise ValueError("roundtrip efficiency expected within the interval (0, 1]") + + self.ensure_soc_min_max() + + # Now it's time to check if our flex configurations holds up to schemas + self.flex_model = StorageFlexModelSchema().load(self.flex_model) + self.flex_context = FlexContextSchema().load(self.flex_context) + + return self.flex_model + + def get_min_max_targets( + self, deserialized_names: bool = True + ) -> tuple[float | None, float | None]: + min_target = None + max_target = None + soc_targets_label = "soc_targets" if deserialized_names else "soc-targets" + if ( + soc_targets_label in self.flex_model + and len(self.flex_model[soc_targets_label]) > 0 + ): + min_target = min( + [target["value"] for target in self.flex_model[soc_targets_label]] + ) + max_target = max( + [target["value"] for target in self.flex_model[soc_targets_label]] + ) + return min_target, max_target + + def get_min_max_soc_on_sensor( + self, adjust_unit: bool = False, deserialized_names: bool = True + ) -> tuple[float | None]: + soc_min_sensor = self.sensor.get_attribute("min_soc_in_mwh", None) + soc_max_sensor = self.sensor.get_attribute("max_soc_in_mwh", None) + soc_unit_label = "soc_unit" if deserialized_names else "soc-unit" + if adjust_unit: + if soc_min_sensor and self.flex_model.get(soc_unit_label) == "kWh": + soc_min_sensor *= 1000 # later steps assume soc data is kWh + if soc_max_sensor and self.flex_model.get(soc_unit_label) == "kWh": + soc_max_sensor *= 1000 + return soc_min_sensor, soc_max_sensor + + def ensure_soc_min_max(self): + """ + Make sure we have min and max SOC. + If not passed directly, then get default from sensor or targets. + """ + _, max_target = self.get_min_max_targets(deserialized_names=False) + soc_min_sensor, soc_max_sensor = self.get_min_max_soc_on_sensor( + adjust_unit=True, deserialized_names=False + ) + if "soc-min" not in self.flex_model or self.flex_model["soc-min"] is None: + # Default is 0 - can't drain the storage by more than it contains + self.flex_model["soc-min"] = soc_min_sensor if soc_min_sensor else 0 + if "soc-max" not in self.flex_model or self.flex_model["soc-max"] is None: + self.flex_model["soc-max"] = soc_max_sensor + # Lacking information about the battery's nominal capacity, we use the highest target value as the maximum state of charge + if self.flex_model["soc-max"] is None: + if max_target: + self.flex_model["soc-max"] = max_target + else: + raise ValueError( + "Need maximal permitted state of charge, please specify soc-max or some soc-targets." + ) + + +def build_device_soc_targets( + targets: List[Dict[datetime, float]] | pd.Series, + soc_at_start: float, + start_of_schedule: datetime, + end_of_schedule: datetime, + resolution: timedelta, +) -> pd.Series: + """ + Utility function to create a Pandas series from SOC targets we got from the flex-model. + + Should set NaN anywhere where there is no target. + + Target SOC values should be indexed by their due date. For example, for quarter-hourly targets between 5 and 6 AM: + >>> df = pd.Series(data=[1, 2, 2.5, 3], index=pd.date_range(datetime(2010,1,1,5), datetime(2010,1,1,6), freq=timedelta(minutes=15), inclusive="right")) + >>> print(df) + 2010-01-01 05:15:00 1.0 + 2010-01-01 05:30:00 2.0 + 2010-01-01 05:45:00 2.5 + 2010-01-01 06:00:00 3.0 + Freq: 15T, dtype: float64 + + TODO: this function could become the deserialization method of a new SOCTargetsSchema (targets, plural), which wraps SOCTargetSchema. + + """ + if isinstance(targets, pd.Series): # some teats prepare it this way + device_targets = targets + else: + device_targets = initialize_series( + np.nan, + start=start_of_schedule, + end=end_of_schedule, + resolution=resolution, + inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") + ) + + for target in targets: + target_value = target["value"] + target_datetime = target["datetime"].astimezone( + device_targets.index.tzinfo + ) # otherwise DST would be problematic + if target_datetime > end_of_schedule: + raise ValueError( + f'Target datetime exceeds {end_of_schedule}. Maximum scheduling horizon is {current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")}.' + ) + + device_targets.loc[target_datetime] = target_value + + # soc targets are at the end of each time slot, while prices are indexed by the start of each time slot + device_targets = device_targets[ + start_of_schedule + resolution : end_of_schedule + ] + + device_targets = device_targets.tz_convert("UTC") + + # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time, + # while the "equals" constraint defines what the total stock should be at the end of a time slot, + # where the time slot is indexed by its starting time) + device_targets = device_targets.shift(-1, freq=resolution).values * ( + timedelta(hours=1) / resolution + ) - soc_at_start * (timedelta(hours=1) / resolution) + + return device_targets diff --git a/flexmeasures/data/models/planning/tests/test_solver.py b/flexmeasures/data/models/planning/tests/test_solver.py index 08e15334c..25ce89ed4 100644 --- a/flexmeasures/data/models/planning/tests/test_solver.py +++ b/flexmeasures/data/models/planning/tests/test_solver.py @@ -6,9 +6,9 @@ import pandas as pd from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.storage import StorageScheduler from flexmeasures.data.models.planning.utils import ( - ensure_storage_specs, initialize_series, ) from flexmeasures.utils.calculations import integrate_time_series @@ -29,19 +29,21 @@ def test_battery_solver_day_1( end = tz.localize(datetime(2015, 1, 2)) resolution = timedelta(minutes=15) soc_at_start = battery.get_attribute("soc_in_mwh") - storage_specs = ensure_storage_specs( - dict(soc_at_start=soc_at_start), battery, start, end, resolution - ) - schedule = StorageScheduler().schedule( + scheduler: Scheduler = StorageScheduler( battery, start, end, resolution, - storage_specs=storage_specs, - inflexible_device_sensors=add_inflexible_device_forecasts.keys() - if use_inflexible_device - else None, + flex_model={"soc-at-start": soc_at_start}, + flex_context={ + "inflexible-device-sensors": [ + s.id for s in add_inflexible_device_forecasts.keys() + ] + if use_inflexible_device + else [] + }, ) + schedule = scheduler.compute_schedule() soc_schedule = integrate_time_series(schedule, soc_at_start, decimal_precision=6) with pd.option_context("display.max_rows", None, "display.max_columns", 3): @@ -86,25 +88,19 @@ def test_battery_solver_day_2(add_battery_assets, roundtrip_efficiency: float): soc_at_start = battery.get_attribute("soc_in_mwh") soc_min = 0.5 soc_max = 4.5 - storage_specs = ensure_storage_specs( - dict( - soc_at_start=soc_at_start, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, - ), + scheduler = StorageScheduler( battery, start, end, resolution, + flex_model={ + "soc-at-start": soc_at_start, + "soc-min": soc_min, + "soc-max": soc_max, + "roundtrip-efficiency": roundtrip_efficiency, + }, ) - schedule = StorageScheduler().schedule( - battery, - start, - end, - resolution, - storage_specs=storage_specs, - ) + schedule = scheduler.compute_schedule() soc_schedule = integrate_time_series( schedule, soc_at_start, @@ -176,16 +172,27 @@ def test_charging_station_solver_day_2(target_soc, charging_station_name): target_soc_datetime = start + duration_until_target soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") soc_targets.loc[target_soc_datetime] = target_soc - storage_specs = ensure_storage_specs( - dict(soc_at_start=soc_at_start, soc_targets=soc_targets), + scheduler = StorageScheduler( charging_station, start, end, resolution, + flex_model={ + "soc_at_start": soc_at_start, + "soc_min": charging_station.get_attribute("min_soc_in_mwh", 0), + "soc_max": charging_station.get_attribute( + "max_soc_in_mwh", max(soc_targets.values) + ), + "roundtrip_efficiency": charging_station.get_attribute( + "roundtrip_efficiency", 1 + ), + "soc_targets": soc_targets, + }, ) - consumption_schedule = StorageScheduler().schedule( - charging_station, start, end, resolution, storage_specs=storage_specs + scheduler.config_deserialized = ( + True # soc targets are already a DataFrame, names get underscore ) + consumption_schedule = scheduler.compute_schedule() soc_schedule = integrate_time_series( consumption_schedule, soc_at_start, decimal_precision=6 ) @@ -238,20 +245,27 @@ def test_fallback_to_unsolvable_problem(target_soc, charging_station_name): target_soc_datetime = start + duration_until_target soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") soc_targets.loc[target_soc_datetime] = target_soc - storage_specs = ensure_storage_specs( - dict(soc_at_start=soc_at_start, soc_targets=soc_targets), + scheduler = StorageScheduler( charging_station, start, end, resolution, + flex_model={ + "soc_at_start": soc_at_start, + "soc_min": charging_station.get_attribute("min_soc_in_mwh", 0), + "soc_max": charging_station.get_attribute( + "max_soc_in_mwh", max(soc_targets.values) + ), + "roundtrip_efficiency": charging_station.get_attribute( + "roundtrip_efficiency", 1 + ), + "soc_targets": soc_targets, + }, ) - consumption_schedule = StorageScheduler().schedule( - charging_station, - start, - end, - resolution, - storage_specs=storage_specs, + scheduler.config_deserialized = ( + True # soc targets are already a DataFrame, names get underscore ) + consumption_schedule = scheduler.compute_schedule() soc_schedule = integrate_time_series( consumption_schedule, soc_at_start, decimal_precision=6 ) @@ -325,27 +339,27 @@ def test_building_solver_day_2( soc_at_start = 2.5 soc_min = 0.5 soc_max = 4.5 - storage_specs = ensure_storage_specs( - dict( - soc_at_start=soc_at_start, - soc_min=soc_min, - soc_max=soc_max, - ), + scheduler = StorageScheduler( battery, start, end, resolution, + flex_model={ + "soc_at_start": soc_at_start, + "soc_min": soc_min, + "soc_max": soc_max, + "roundtrip_efficiency": battery.get_attribute("roundtrip_efficiency", 1), + }, + flex_context={ + "inflexible_device_sensors": inflexible_devices.values(), + "production_price_sensor": production_price_sensor, + "consumption_price_sensor": consumption_price_sensor, + }, ) - schedule = StorageScheduler().schedule( - battery, - start, - end, - resolution, - storage_specs=storage_specs, - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_devices.values(), + scheduler.config_deserialized = ( + True # inflexible device sensors are already objects, names get underscore ) + schedule = scheduler.compute_schedule() soc_schedule = integrate_time_series(schedule, soc_at_start, decimal_precision=6) with pd.option_context("display.max_rows", None, "display.max_columns", 3): diff --git a/flexmeasures/data/models/planning/utils.py b/flexmeasures/data/models/planning/utils.py index c8a6624ff..6fa1a3dad 100644 --- a/flexmeasures/data/models/planning/utils.py +++ b/flexmeasures/data/models/planning/utils.py @@ -64,71 +64,6 @@ def initialize_index( ) -def ensure_storage_specs( - specs: Optional[dict], - sensor: Sensor, - start_of_schedule: datetime, - end_of_schedule: datetime, - resolution: timedelta, -) -> dict: - """ - Check storage specs and fill in values from context, if possible. - - Storage specs are: - - soc_at_start - - soc_min - - soc_max - - soc_targets - - roundtrip_efficiency - - prefer_charging_sooner - """ - if specs is None: - specs = {} - - # Check state of charge - # Preferably, a starting soc is given. - # Otherwise, we try to retrieve the current state of charge from the asset (if that is the valid one at the start). - # Otherwise, we set the starting soc to 0 (some assets don't use the concept of a state of charge, - # and without soc targets and limits the starting soc doesn't matter). - if "soc_at_start" not in specs or specs["soc_at_start"] is None: - if ( - start_of_schedule == sensor.get_attribute("soc_datetime") - and sensor.get_attribute("soc_in_mwh") is not None - ): - specs["soc_at_start"] = sensor.get_attribute("soc_in_mwh") - else: - specs["soc_at_start"] = 0 - - # init default targets - if "soc_targets" not in specs or specs["soc_targets"] is None: - specs["soc_targets"] = initialize_series( - np.nan, start_of_schedule, end_of_schedule, resolution, inclusive="right" - ) - # soc targets are at the end of each time slot, while prices are indexed by the start of each time slot - specs["soc_targets"] = specs["soc_targets"][ - start_of_schedule + resolution : end_of_schedule - ] - - # Check for min and max SOC, or get default from sensor - if "soc_min" not in specs or specs["soc_min"] is None: - # Can't drain the storage by more than it contains - specs["soc_min"] = sensor.get_attribute("min_soc_in_mwh", 0) - if "soc_max" not in specs or specs["soc_max"] is None: - # Lacking information about the battery's nominal capacity, we use the highest target value as the maximum state of charge - specs["soc_max"] = sensor.get_attribute( - "max_soc_in_mwh", max(specs["soc_targets"].values) - ) - - # Check for round-trip efficiency - if "roundtrip_efficiency" not in specs or specs["roundtrip_efficiency"] is None: - # Get default from sensor, or use 100% otherwise - specs["roundtrip_efficiency"] = sensor.get_attribute("roundtrip_efficiency", 1) - if specs["roundtrip_efficiency"] <= 0 or specs["roundtrip_efficiency"] > 1: - raise ValueError("roundtrip_efficiency expected within the interval (0, 1]") - - return specs - - def add_tiny_price_slope( prices: pd.DataFrame, col_name: str = "event_value", d: float = 10**-3 ) -> pd.DataFrame: diff --git a/flexmeasures/data/schemas/scheduling/__init__.py b/flexmeasures/data/schemas/scheduling/__init__.py new file mode 100644 index 000000000..382f4430a --- /dev/null +++ b/flexmeasures/data/schemas/scheduling/__init__.py @@ -0,0 +1,15 @@ +from marshmallow import Schema, fields + +from flexmeasures.data.schemas.sensors import SensorIdField + + +class FlexContextSchema(Schema): + """ + This schema lists fields that can be used to describe sensors in the optimised portfolio + """ + + consumption_price_sensor = SensorIdField(data_key="consumption-price-sensor") + production_price_sensor = SensorIdField(data_key="production-price-sensor") + inflexible_device_sensors = fields.List( + SensorIdField(), data_key="inflexible-device-sensors" + ) diff --git a/flexmeasures/data/schemas/scheduling/storage.py b/flexmeasures/data/schemas/scheduling/storage.py new file mode 100644 index 000000000..45cda79eb --- /dev/null +++ b/flexmeasures/data/schemas/scheduling/storage.py @@ -0,0 +1,67 @@ +from marshmallow import Schema, post_load, validate, fields +from marshmallow.validate import OneOf + +from flexmeasures.data.schemas.times import AwareDateTimeField +from flexmeasures.data.schemas.units import QuantityField +from flexmeasures.utils.unit_utils import ur + + +class SOCTargetSchema(Schema): + """ + A point in time with a target value. + """ + + value = fields.Float(required=True) + datetime = AwareDateTimeField(required=True) + + +class StorageFlexModelSchema(Schema): + """ + This schema lists fields we require when scheduling storage assets. + Some fields are not required, as they might live on the Sensor.attributes. + You can use StorageScheduler.deserialize_flex_config to get that filled in. + """ + + soc_at_start = fields.Float(required=True, data_key="soc-at-start") + soc_min = fields.Float(validate=validate.Range(min=0), data_key="soc-min") + soc_max = fields.Float(data_key="soc-max") + soc_unit = fields.Str( + validate=OneOf( + [ + "kWh", + "MWh", + ] + ), + data_key="soc-unit", + ) # todo: allow unit to be set per field, using QuantityField("%", validate=validate.Range(min=0, max=1)) + soc_targets = fields.List(fields.Nested(SOCTargetSchema()), data_key="soc-targets") + roundtrip_efficiency = QuantityField( + "%", + validate=validate.Range(min=0, max=1, min_inclusive=False, max_inclusive=True), + data_key="roundtrip-efficiency", + ) + prefer_charging_sooner = fields.Bool(data_key="prefer-charging-sooner") + + @post_load() + def post_load_sequence(self, data: dict, **kwargs) -> dict: + """Perform some checks and corrections after we loaded.""" + # currently we only handle MWh internally + # TODO: review when we moved away from capacity having to be described in MWh + if data.get("soc_unit") == "kWh": + data["soc_at_start"] /= 1000.0 + if data.get("soc_min") is not None: + data["soc_min"] /= 1000.0 + if data.get("soc_max") is not None: + data["soc_max"] /= 1000.0 + if data.get("soc_targets"): + for target in data["soc_targets"]: + target["value"] /= 1000.0 + data["soc_unit"] == "MWh" + + # Convert round-trip efficiency to dimensionless (to the (0,1] range) + if data.get("roundtrip_efficiency") is not None: + data["roundtrip_efficiency"] = ( + data["roundtrip_efficiency"].to(ur.Quantity("dimensionless")).magnitude + ) + + return data diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index 497bf7ba5..c5702da70 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from datetime import datetime, timedelta -from typing import List, Tuple, Optional import os import sys import importlib.util @@ -12,12 +13,12 @@ import timely_beliefs as tb from flexmeasures.data import db +from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.storage import StorageScheduler -from flexmeasures.data.models.planning.utils import ensure_storage_specs from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.utils import get_data_source, save_to_db +from flexmeasures.utils.time_utils import server_now """ The life cycle of a scheduling job: @@ -29,52 +30,26 @@ def create_scheduling_job( - sensor: Sensor, - start_of_schedule: datetime, - end_of_schedule: datetime, - belief_time: datetime, - resolution: timedelta, - consumption_price_sensor: Optional[Sensor] = None, - production_price_sensor: Optional[Sensor] = None, - inflexible_device_sensors: Optional[List[Sensor]] = None, - job_id: Optional[str] = None, + sensor: int | Sensor, + job_id: str | None = None, enqueue: bool = True, - storage_specs: Optional[dict] = None, + **scheduler_kwargs, ) -> Job: """ Create a new Job, which is queued for later execution. - Before enqueuing, we perform some checks on sensor type and specs, for errors we want to bubble up early. - To support quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event. That means one event leads to one job (i.e. actions are event driven). - Target SOC values should be indexed by their due date. For example, for quarter-hourly targets between 5 and 6 AM: - >>> df = pd.Series(data=[1, 2, 2.5, 3], index=pd.date_range(datetime(2010,1,1,5), datetime(2010,1,1,6), freq=timedelta(minutes=15), inclusive="right")) - >>> print(df) - 2010-01-01 05:15:00 1.0 - 2010-01-01 05:30:00 2.0 - 2010-01-01 05:45:00 2.5 - 2010-01-01 06:00:00 3.0 - Freq: 15T, dtype: float64 + As a rule of thumb, keep arguments to the job simple, and deserializable. """ - storage_specs = ensure_storage_specs( - storage_specs, sensor, start_of_schedule, end_of_schedule, resolution - ) + # From here on, we handle IDs again, not objects + if isinstance(sensor, Sensor): + sensor = sensor.id job = Job.create( make_schedule, - kwargs=dict( - sensor_id=sensor.id, - start=start_of_schedule, - end=end_of_schedule, - belief_time=belief_time, - resolution=resolution, - storage_specs=storage_specs, - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_device_sensors, - ), # TODO: maybe also pass these sensors as IDs, to avoid potential db sessions confusion + kwargs=dict(sensor_id=sensor, **scheduler_kwargs), id=job_id, connection=current_app.queues["scheduling"].connection, ttl=int( @@ -97,19 +72,18 @@ def make_schedule( sensor_id: int, start: datetime, end: datetime, - belief_time: datetime, resolution: timedelta, - storage_specs: Optional[dict], - consumption_price_sensor: Optional[Sensor] = None, - production_price_sensor: Optional[Sensor] = None, - inflexible_device_sensors: Optional[List[Sensor]] = None, + belief_time: datetime | None = None, + flex_model: dict | None = None, + flex_context: dict | None = None, ) -> bool: """ - This function is meant to be queued as a job. - It thus potentially runs on a different FlexMeasures node than where the job is created. + This function is meant to be queued as a job. It returns True if it ran successfully. + + Note: This function thus potentially runs on a different FlexMeasures node than where the job is created. - - Choose which scheduling function can be used - - Compute schedule + This is what this function does + - Find out which scheduler should be used & compute the schedule - Turn scheduled values into beliefs and save them to db """ # https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork @@ -124,39 +98,21 @@ def make_schedule( % (rq_job.id, sensor, start, end) ) - data_source_info = {} - # Choose which algorithm to use TODO: unify loading this into a func store concept - if "custom-scheduler" in sensor.attributes: - scheduler_specs = sensor.attributes.get("custom-scheduler") - scheduler, data_source_info = load_custom_scheduler(scheduler_specs) - elif sensor.generic_asset.generic_asset_type.name in ( - "battery", - "one-way_evse", - "two-way_evse", - ): - scheduler = StorageScheduler - data_source_info["model"] = scheduler.__name__ - data_source_info["name"] = scheduler.__author__ - data_source_info["version"] = scheduler.__version__ - else: - raise ValueError( - "Scheduling is not (yet) supported for asset type %s." - % sensor.generic_asset.generic_asset_type - ) - - storage_specs = ensure_storage_specs(storage_specs, sensor, start, end, resolution) + scheduler_class = find_scheduler_class(sensor) + data_source_info = scheduler_class.get_data_source_info() - consumption_schedule = scheduler().schedule( + if belief_time is None: + belief_time = server_now() + scheduler: Scheduler = scheduler_class( sensor, start, end, resolution, - storage_specs=storage_specs, - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_device_sensors, belief_time=belief_time, + flex_model=flex_model, + flex_context=flex_context, ) + consumption_schedule = scheduler.compute_schedule() if rq_job: click.echo("Job %s made schedule." % rq_job.id) @@ -190,10 +146,34 @@ def make_schedule( return True -def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Scheduler, dict]: +def find_scheduler_class(sensor: Sensor) -> type: + """ + Find out which scheduler to use, given a sensor. + This will morph into a logic store utility, and schedulers should be registered for asset types there, + instead of this fixed lookup logic. + """ + # Choose which algorithm to use TODO: unify loading this into a func store concept + if "custom-scheduler" in sensor.attributes: + scheduler_specs = sensor.attributes.get("custom-scheduler") + scheduler_class = load_custom_scheduler(scheduler_specs) + elif sensor.generic_asset.generic_asset_type.name in ( + "battery", + "one-way_evse", + "two-way_evse", + ): + scheduler_class = StorageScheduler + else: + raise ValueError( + "Scheduling is not (yet) supported for asset type %s." + % sensor.generic_asset.generic_asset_type + ) + return scheduler_class + + +def load_custom_scheduler(scheduler_specs: dict) -> type: """ Read in custom scheduling spec. - Attempt to load the Callable, also derive data source info. + Attempt to load the Scheduler class to use. The scheduler class should be derived from flexmeasures.data.models.planning.Scheduler. The Callable is assumed to be named "schedule". @@ -213,9 +193,6 @@ def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Scheduler, dict]: assert "class" in scheduler_specs, "scheduler specs have no 'class'" scheduler_name = scheduler_specs["class"] - source_info = dict( - model=scheduler_name, version="1", name="Unknown author" - ) # default # import module module_descr = scheduler_specs["module"] @@ -242,29 +219,15 @@ def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Scheduler, dict]: # get scheduling function assert hasattr( module, scheduler_specs["class"] - ), "Module at {module_descr} has no class {scheduler_specs['class']}" + ), f"Module at {module_descr} has no class {scheduler_specs['class']}" scheduler_class = getattr(module, scheduler_specs["class"]) - - if hasattr(scheduler_class, "__version__"): - source_info["version"] = str(scheduler_class.__version__) - else: - current_app.logger.warning( - f"Scheduler {scheduler_class.__name__} loaded, but has no __version__ attribute." - ) - if hasattr(scheduler_class, "__author__"): - source_info["name"] = str(scheduler_class.__author__) - else: - current_app.logger.warning( - f"Scheduler {scheduler_class.__name__} loaded, but has no __author__ attribute." - ) - - schedule_function_name = "schedule" + schedule_function_name = "compute_schedule" if not hasattr(scheduler_class, schedule_function_name): raise NotImplementedError( f"No function {schedule_function_name} in {scheduler_class}. Cannot load custom scheduler." ) - return scheduler_class, source_info + return scheduler_class def handle_scheduling_exception(job, exc_type, exc_value, traceback): @@ -276,23 +239,22 @@ def handle_scheduling_exception(job, exc_type, exc_value, traceback): job.save_meta() -def get_data_source_for_job( - job: Optional[Job], sensor: Optional[Sensor] = None -) -> Optional[DataSource]: +def get_data_source_for_job(job: Job | None) -> DataSource | None: """ Try to find the data source linked by this scheduling job. We expect that enough info on the source was placed in the meta dict. - For a transition period, we might have to guess a bit. - TODO: Afterwards, this can be lighter. We should also expect a job and no sensor is needed, - once API v1.3 is deprecated. + This only happened with v0.12. For a transition period, we might have to support older jobs who haven't got that info. + TODO: We should expect a job, once API v1.3 is deprecated. """ data_source_info = None if job: data_source_info = job.meta.get("data_source_info") - if data_source_info and "id" in data_source_info: + if ( + data_source_info and "id" in data_source_info + ): # this is the expected outcome return DataSource.query.get(data_source_info["id"]) - if data_source_info is None and sensor: + if data_source_info is None: data_source_info = dict(name="Seita", model="StorageScheduler") # TODO: change to raise later (v0.13) - all scheduling jobs now get full info current_app.logger.warning( diff --git a/flexmeasures/data/tests/dummy_scheduler.py b/flexmeasures/data/tests/dummy_scheduler.py index 7a97c2b46..f75df8a04 100644 --- a/flexmeasures/data/tests/dummy_scheduler.py +++ b/flexmeasures/data/tests/dummy_scheduler.py @@ -1,6 +1,3 @@ -from datetime import datetime, timedelta - -from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.utils import initialize_series @@ -10,22 +7,18 @@ class DummyScheduler(Scheduler): __author__ = "Test Organization" __version__ = "3" - def schedule( - self, - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - *args, - **kwargs - ): + def compute_schedule(self): """ Just a dummy scheduler that always plans to consume at maximum capacity. (Schedulers return positive values for consumption, and negative values for production) """ return initialize_series( # simply creates a Pandas Series repeating one value - data=sensor.get_attribute("capacity_in_mw"), - start=start, - end=end, - resolution=resolution, + data=self.sensor.get_attribute("capacity_in_mw"), + start=self.start, + end=self.end, + resolution=self.resolution, ) + + def deserialize_config(self): + """Do not care about any config sent in.""" + self.config_deserialized = True diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index 357b3709a..517273c39 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -35,7 +35,11 @@ def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): ) # Make sure the scheduler data source isn't there job = create_scheduling_job( - battery, start, end, belief_time=start, resolution=resolution + sensor=battery, + start=start, + end=end, + belief_time=start, + resolution=resolution, ) print("Job: %s" % job.id) @@ -78,12 +82,14 @@ def test_loading_custom_scheduler(is_path: bool): Simply check if loading a custom scheduler works. """ scheduler_specs["module"] = make_module_descr(is_path) - custom_scheduler, data_source_info = load_custom_scheduler(scheduler_specs) + custom_scheduler = load_custom_scheduler(scheduler_specs) + assert custom_scheduler.__name__ == "DummyScheduler" + assert "Just a dummy scheduler" in custom_scheduler.compute_schedule.__doc__ + + data_source_info = custom_scheduler.get_data_source_info() assert data_source_info["name"] == "Test Organization" assert data_source_info["version"] == "3" assert data_source_info["model"] == "DummyScheduler" - assert custom_scheduler.__name__ == "DummyScheduler" - assert "Just a dummy scheduler" in custom_scheduler.schedule.__doc__ @pytest.mark.parametrize("is_path", [False, True]) @@ -103,7 +109,11 @@ def test_assigning_custom_scheduler(db, app, add_battery_assets, is_path: bool): resolution = timedelta(minutes=15) job = create_scheduling_job( - battery, start, end, belief_time=start, resolution=resolution + sensor=battery, + start=start, + end=end, + belief_time=start, + resolution=resolution, ) print("Job: %s" % job.id) diff --git a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py index 534435773..5b8e8dfcd 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py @@ -1,11 +1,9 @@ from datetime import timedelta, datetime import pytz -import numpy as np import pandas as pd from flexmeasures.data.models.data_sources import DataSource -from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.tests.utils import work_on_rq, exception_reporter @@ -30,10 +28,9 @@ def test_scheduling_a_charging_station( tz = pytz.timezone("Europe/Amsterdam") start = tz.localize(datetime(2015, 1, 2)) end = tz.localize(datetime(2015, 1, 3)) + target_datetime = start + duration_until_target resolution = timedelta(minutes=15) - target_soc_datetime = start + duration_until_target - soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") - soc_targets.loc[target_soc_datetime] = target_soc + soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)] assert ( DataSource.query.filter_by(name="Seita", type="scheduling script").one_or_none() @@ -41,12 +38,12 @@ def test_scheduling_a_charging_station( ) # Make sure the scheduler data source isn't there job = create_scheduling_job( - charging_station, - start, - end, + sensor=charging_station, + start=start, + end=end, belief_time=start, resolution=resolution, - storage_specs=dict(soc_at_start=soc_at_start, soc_targets=soc_targets), + flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets}, ) print("Job: %s" % job.id) diff --git a/requirements/app.in b/requirements/app.in index 32744fa5f..2459d0036 100644 --- a/requirements/app.in +++ b/requirements/app.in @@ -17,6 +17,7 @@ pytz numpy isodate click +click-default-group email_validator rq rq-dashboard diff --git a/requirements/app.txt b/requirements/app.txt index a04ed8704..9c3155861 100644 --- a/requirements/app.txt +++ b/requirements/app.txt @@ -31,8 +31,11 @@ charset-normalizer==2.1.1 click==8.1.3 # via # -r requirements/app.in + # click-default-group # flask # rq +click-default-group==1.2.2 + # via -r requirements/app.in colour==0.1.5 # via -r requirements/app.in convertdate==2.4.0