From feb3d3336970dd9b9795a13ab6765b2d7989f31c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20H=C3=B6ning?= Date: Sun, 16 Oct 2022 12:35:18 +0200 Subject: [PATCH] Ability to provide a custom scheduling algorithm (#505) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * First implementation of loading scheduler function from non-FM code Signed-off-by: Nicolas Höning * test loading of custom scheduler with dummy Signed-off-by: Nicolas Höning * changelog entry Signed-off-by: Nicolas Höning * complete the switch to default data source name for scheduling being 'FlexMeasures' Signed-off-by: Nicolas Höning * more complete dummy scheduler for testing Signed-off-by: Nicolas Höning * allow a custom scheduler to be present in an importable package Signed-off-by: Nicolas Höning * add documentation Signed-off-by: Nicolas Höning * one more data source renaming in tests Signed-off-by: Nicolas Höning * save custom data source name on job, so it can be looked up in get_schedule endpoint Signed-off-by: Nicolas Höning * one more data source renaming in tests Signed-off-by: Nicolas Höning * move back from renaming default script data source - we made issue #508 to work on this specifically Signed-off-by: Nicolas Höning * forgot one more data source renaming Signed-off-by: Nicolas Höning * implement review comments Signed-off-by: Nicolas Höning * we cannot use inclusive yet, while we require/support Pandas 1.25 Signed-off-by: Nicolas Höning Signed-off-by: Nicolas Höning --- documentation/changelog.rst | 1 + documentation/plugin/customisation.rst | 65 +++++++++ documentation/plugin/introduction.rst | 6 +- documentation/tut/forecasting_scheduling.rst | 2 + flexmeasures/api/v1_3/implementations.py | 2 +- flexmeasures/api/v3_0/sensors.py | 4 +- flexmeasures/data/services/scheduling.py | 123 +++++++++++++----- flexmeasures/data/tests/dummy_scheduler.py | 21 +++ .../data/tests/test_scheduling_jobs.py | 90 ++++++++++++- 9 files changed, 273 insertions(+), 41 deletions(-) create mode 100644 flexmeasures/data/tests/dummy_scheduler.py diff --git a/documentation/changelog.rst b/documentation/changelog.rst index d09248dd0..96e45bcfd 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -9,6 +9,7 @@ New features ------------- * Hit the replay button to replay what happened, available on the sensor and asset pages [see `PR #463 `_] +* Ability to provide your own custom scheduling function [see `PR #505 `_] * Visually distinguish forecasts/schedules (dashed lines) from measurements (solid lines), and expand the tooltip with timing info regarding the forecast/schedule horizon or measurement lag [see `PR #503 `_] * The asset page also allows to show sensor data from other assets that belong to the same account [see `PR #500 `_] * Improved import of time series data from CSV file: 1) drop duplicate records with warning, and 2) allow configuring which column contains explicit recording times for each data point (use case: import forecasts) [see `PR #501 `_] diff --git a/documentation/plugin/customisation.rst b/documentation/plugin/customisation.rst index 1841082d4..bbe51fafe 100644 --- a/documentation/plugin/customisation.rst +++ b/documentation/plugin/customisation.rst @@ -5,6 +5,71 @@ Plugin Customizations ======================= +Adding your own scheduling algorithm +------------------------------------- + +FlexMeasures comes with in-built scheduling algorithms for often-used use cases. However, you can use your own algorithm, as well. + +The idea is that you'd still use FlexMeasures' API to post flexibility states and trigger new schedules to be computed (see :ref:`posting_flex_states`), +but in the background your custom scheduling algorithm is being used. + +Let's walk through an example! + +First, we need to write a function which accepts arguments just like the in-built schedulers (their code is `here `_). +The following minimal example gives you an idea of the inputs and outputs: + +.. code-block:: python + + from datetime import datetime, timedelta + import pandas as pd + from pandas.tseries.frequencies import to_offset + from flexmeasures.data.models.time_series import Sensor + + def compute_a_schedule( + sensor: Sensor, + start: datetime, + end: datetime, + resolution: timedelta, + *args, + **kwargs + ): + """Just a dummy scheduler, advising to do nothing""" + return pd.Series( + 0, index=pd.date_range(start, end, freq=resolution, closed="left") + ) + + +.. note:: It's possible to add arguments that describe the asset flexibility and the EMS context in more detail. For example, + for storage assets we support various state-of-charge parameters. For now, the existing schedulers are the best documentation. + + +Finally, make your scheduler be the one that FlexMeasures will use for certain sensors: + + +.. code-block:: python + + from flexmeasures.data.models.time_series import Sensor + + scheduler_specs = { + "module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below + "function": "compute_a_schedule", + "source": "My Company" + } + + my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none() + my_sensor.attributes["custom-scheduler"] = scheduler_specs + + +From now on, all schedules (see :ref:`tut_forecasting_scheduling`) which are requested for this sensor should +get computed by your custom function! For later lookup, the data will be linked to a new data source with the name "My Opinion". + +.. note:: To describe the module, we used an importable module here (actually a custom scheduling function we use to test this). + You can also provide a full file path to the module, e.g. "/path/to/my_file.py". + + +.. todo:: We're planning to use a similar approach to allow for custom forecasting algorithms, as well. + + Adding your own style sheets ---------------------------- diff --git a/documentation/plugin/introduction.rst b/documentation/plugin/introduction.rst index 3d9c573a9..8a0eff851 100644 --- a/documentation/plugin/introduction.rst +++ b/documentation/plugin/introduction.rst @@ -3,13 +3,11 @@ Writing Plugins ==================== -You can extend FlexMeasures with functionality like UI pages, API endpoints, or CLI functions. +You can extend FlexMeasures with functionality like UI pages, API endpoints, CLI functions and custom scheduling algorithms. This is eventually how energy flexibility services are built on top of FlexMeasures! In an nutshell, a FlexMeasures plugin adds functionality via one or more `Flask Blueprints `_. -.. todo:: We'll use this to allow for custom forecasting and scheduling algorithms, as well. - How to make FlexMeasures load your plugin ------------------------------------------ @@ -34,4 +32,4 @@ To hit the ground running with that approach, we provide a `CookieCutter templat It also includes a few Blueprint examples and best practices. -Continue reading the :ref:`plugin_showcase`. \ No newline at end of file +Continue reading the :ref:`plugin_showcase` or possibilities to do :ref:`plugin_customization`. \ No newline at end of file diff --git a/documentation/tut/forecasting_scheduling.rst b/documentation/tut/forecasting_scheduling.rst index d7f8836dc..69bc3fc2b 100644 --- a/documentation/tut/forecasting_scheduling.rst +++ b/documentation/tut/forecasting_scheduling.rst @@ -12,6 +12,8 @@ Let's take a look at how FlexMeasures users can access information from these se If you want to learn more about the actual algorithms used in the background, head over to :ref:`algorithms`. +.. note:: FlexMeasures comes with in-built scheduling algorithms. You can use your own algorithm, as well, see :ref:`plugin-customization`. + Maintaining the queues ------------------------------------ diff --git a/flexmeasures/api/v1_3/implementations.py b/flexmeasures/api/v1_3/implementations.py index c62442e00..8b2a5b857 100644 --- a/flexmeasures/api/v1_3/implementations.py +++ b/flexmeasures/api/v1_3/implementations.py @@ -146,7 +146,7 @@ def get_device_message_response(generic_asset_name_groups, duration): schedule_data_source_name = "Seita" scheduler_source = DataSource.query.filter_by( - name="Seita", type="scheduling script" + name=schedule_data_source_name, type="scheduling script" ).one_or_none() if scheduler_source is None: return unknown_schedule( diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 4449658f8..35f0248b8 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -519,8 +519,10 @@ def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwarg schedule_start = job.kwargs["start"] schedule_data_source_name = "Seita" + if "data_source_name" in job.meta: + schedule_data_source_name = job.meta["data_source_name"] scheduler_source = DataSource.query.filter_by( - name="Seita", type="scheduling script" + name=schedule_data_source_name, type="scheduling script" ).one_or_none() if scheduler_source is None: return unknown_schedule( diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index 24915ffaf..0e9d2524b 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -1,5 +1,9 @@ from datetime import datetime, timedelta -from typing import List, Optional +from typing import List, Tuple, Optional, Callable +import os +import sys +import importlib.util +from importlib.abc import Loader from flask import current_app import click @@ -139,49 +143,47 @@ def make_schedule( np.nan, index=pd.date_range(start, end, freq=resolution, closed="right") ) - if sensor.generic_asset.generic_asset_type.name == "battery": - consumption_schedule = schedule_battery( - sensor, - start, - end, - resolution, - soc_at_start, - soc_targets, - soc_min, - soc_max, - roundtrip_efficiency, - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_device_sensors, - belief_time=belief_time, - ) + data_source_name = "Seita" + + # Choose which algorithm to use + if "custom-scheduler" in sensor.attributes: + scheduler_specs = sensor.attributes.get("custom-scheduler") + scheduler, data_source_name = load_custom_scheduler(scheduler_specs) + if rq_job: + rq_job.meta["data_source_name"] = data_source_name + rq_job.save_meta() + elif sensor.generic_asset.generic_asset_type.name == "battery": + scheduler = schedule_battery elif sensor.generic_asset.generic_asset_type.name in ( "one-way_evse", "two-way_evse", ): - consumption_schedule = schedule_charging_station( - sensor, - start, - end, - resolution, - soc_at_start, - soc_targets, - soc_min, - soc_max, - roundtrip_efficiency, - consumption_price_sensor=consumption_price_sensor, - production_price_sensor=production_price_sensor, - inflexible_device_sensors=inflexible_device_sensors, - belief_time=belief_time, - ) + scheduler = schedule_charging_station + else: raise ValueError( "Scheduling is not (yet) supported for asset type %s." % sensor.generic_asset.generic_asset_type ) + consumption_schedule = scheduler( + sensor, + start, + end, + resolution, + soc_at_start, + soc_targets, + soc_min, + soc_max, + roundtrip_efficiency, + consumption_price_sensor=consumption_price_sensor, + production_price_sensor=production_price_sensor, + inflexible_device_sensors=inflexible_device_sensors, + belief_time=belief_time, + ) + data_source = get_data_source( - data_source_name="Seita", + data_source_name=data_source_name, data_source_type="scheduling script", ) if rq_job: @@ -204,6 +206,61 @@ def make_schedule( return True +def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]: + """ + Read in custom scheduling spec. + Attempt to load the Callable, also derive a data source name. + + Example specs: + + { + "module": "/path/to/module.py", # or sthg importable, e.g. "package.module" + "function": "name_of_function", + "source": "source name" + } + + """ + assert isinstance( + scheduler_specs, dict + ), f"Scheduler specs is {type(scheduler_specs)}, should be a dict" + assert "module" in scheduler_specs, "scheduler specs have no 'module'." + assert "function" in scheduler_specs, "scheduler specs have no 'function'" + + source_name = scheduler_specs.get( + "source", f"Custom scheduler - {scheduler_specs['function']}" + ) + scheduler_name = scheduler_specs["function"] + + # import module + module_descr = scheduler_specs["module"] + if os.path.exists(module_descr): + spec = importlib.util.spec_from_file_location(scheduler_name, module_descr) + assert spec, f"Could not load specs for scheduling module at {module_descr}." + module = importlib.util.module_from_spec(spec) + sys.modules[scheduler_name] = module + assert isinstance(spec.loader, Loader) + spec.loader.exec_module(module) + else: # assume importable module + try: + module = importlib.import_module(module_descr) + except TypeError as te: + current_app.log.error(f"Cannot load {module_descr}: {te}.") + raise + except ModuleNotFoundError: + current_app.logger.error( + f"Attempted to import module {module_descr} (as it is not a valid file path), but it is not installed." + ) + raise + assert module, f"Module {module_descr} could not be loaded." + + # get scheduling function + assert hasattr( + module, scheduler_specs["function"] + ), "Module at {module_descr} has no function {scheduler_specs['function']}" + + return getattr(module, scheduler_specs["function"]), source_name + + def handle_scheduling_exception(job, exc_type, exc_value, traceback): """ Store exception as job meta data. diff --git a/flexmeasures/data/tests/dummy_scheduler.py b/flexmeasures/data/tests/dummy_scheduler.py new file mode 100644 index 000000000..cf4f0ec89 --- /dev/null +++ b/flexmeasures/data/tests/dummy_scheduler.py @@ -0,0 +1,21 @@ +from datetime import datetime, timedelta + +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.planning.utils import initialize_series + + +def compute_a_schedule( + sensor: Sensor, + start: datetime, + end: datetime, + resolution: timedelta, + *args, + **kwargs +): + """Just a dummy scheduler.""" + return initialize_series( # simply creates a Pandas Series repeating one value + data=sensor.get_attribute("capacity_in_mw"), + start=start, + end=end, + resolution=resolution, + ) diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index 1e100cd40..a0111a445 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -1,11 +1,18 @@ # flake8: noqa: E402 from datetime import datetime, timedelta +import os + import pytz +import pytest +from rq.job import Job from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.tests.utils import work_on_rq, exception_reporter -from flexmeasures.data.services.scheduling import create_scheduling_job +from flexmeasures.data.services.scheduling import ( + create_scheduling_job, + load_custom_scheduler, +) def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): @@ -21,7 +28,9 @@ def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): resolution = timedelta(minutes=15) assert ( - DataSource.query.filter_by(name="Seita", type="scheduling script").one_or_none() + DataSource.query.filter_by( + name="FlexMeasures", type="scheduling script" + ).one_or_none() is None ) # Make sure the scheduler data source isn't there @@ -47,3 +56,80 @@ def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): ) print([v.event_value for v in power_values]) assert len(power_values) == 96 + + +scheduler_specs = { + "module": None, # use make_module_descr, see below + "function": "compute_a_schedule", + "source": "Test Source", +} + + +def make_module_descr(is_path): + if is_path: + path_to_here = os.path.dirname(__file__) + return os.path.join(path_to_here, "dummy_scheduler.py") + else: + return "flexmeasures.data.tests.dummy_scheduler" + + +@pytest.mark.parametrize("is_path", [False, True]) +def test_loading_custom_scheduler(is_path: bool): + """ + Simply check if loading a custom scheduler works. + """ + scheduler_specs["module"] = make_module_descr(is_path) + custom_scheduler, data_source = load_custom_scheduler(scheduler_specs) + assert data_source == "Test Source" + assert custom_scheduler.__name__ == "compute_a_schedule" + assert custom_scheduler.__doc__ == "Just a dummy scheduler." + + +@pytest.mark.parametrize("is_path", [False, True]) +def test_assigning_custom_scheduler(db, app, add_battery_assets, is_path: bool): + """ + Test if the custom scheduler is picked up when we assign it to a Sensor, + and that its dummy values are saved. + """ + scheduler_specs["module"] = make_module_descr(is_path) + + battery = Sensor.query.filter(Sensor.name == "Test battery").one_or_none() + battery.attributes["custom-scheduler"] = scheduler_specs + + tz = pytz.timezone("Europe/Amsterdam") + start = tz.localize(datetime(2015, 1, 2)) + end = tz.localize(datetime(2015, 1, 3)) + resolution = timedelta(minutes=15) + + job = create_scheduling_job( + battery.id, start, end, belief_time=start, resolution=resolution + ) + print("Job: %s" % job.id) + + work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter) + + # make sure we saved the data source for later lookup + redis_connection = app.queues["scheduling"].connection + finished_job = Job.fetch(job.id, connection=redis_connection) + assert finished_job.meta["data_source_name"] == scheduler_specs["source"] + + scheduler_source = DataSource.query.filter_by( + name=finished_job.meta["data_source_name"], type="scheduling script" + ).one_or_none() + assert ( + scheduler_source is not None + ) # Make sure the scheduler data source is now there + + power_values = ( + TimedBelief.query.filter(TimedBelief.sensor_id == battery.id) + .filter(TimedBelief.source_id == scheduler_source.id) + .all() + ) + assert len(power_values) == 96 + # test for negative value as we schedule consumption + assert all( + [ + v.event_value == -1 * battery.get_attribute("capacity_in_mw") + for v in power_values + ] + )