Skip to content

Commit

Permalink
Ability to provide a custom scheduling algorithm (#505)
Browse files Browse the repository at this point in the history
* First implementation of loading scheduler function from non-FM code

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* test loading of custom scheduler with dummy

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* changelog entry

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* complete the switch to default data source name for scheduling being 'FlexMeasures'

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* more complete dummy scheduler for testing

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* allow a custom scheduler to be present in an importable package

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* add documentation

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* one more data source renaming in tests

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* save custom data source name on job, so it can be looked up in get_schedule endpoint

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* one more data source renaming in tests

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* move back from renaming default script data source - we made issue #508 to work on this specifically

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* forgot one more data source renaming

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* implement review comments

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

* we cannot use inclusive yet, while we require/support Pandas 1.25

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

Signed-off-by: Nicolas Höning <nicolas@seita.nl>
  • Loading branch information
nhoening committed Oct 16, 2022
1 parent 35ede27 commit feb3d33
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 41 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -9,6 +9,7 @@ New features
-------------

* Hit the replay button to replay what happened, available on the sensor and asset pages [see `PR #463 <http://www.github.com/FlexMeasures/flexmeasures/pull/463>`_]
* Ability to provide your own custom scheduling function [see `PR #505 <http://www.github.com/FlexMeasures/flexmeasures/pull/505>`_]
* Visually distinguish forecasts/schedules (dashed lines) from measurements (solid lines), and expand the tooltip with timing info regarding the forecast/schedule horizon or measurement lag [see `PR #503 <http://www.github.com/FlexMeasures/flexmeasures/pull/503>`_]
* The asset page also allows to show sensor data from other assets that belong to the same account [see `PR #500 <http://www.github.com/FlexMeasures/flexmeasures/pull/500>`_]
* Improved import of time series data from CSV file: 1) drop duplicate records with warning, and 2) allow configuring which column contains explicit recording times for each data point (use case: import forecasts) [see `PR #501 <http://www.github.com/FlexMeasures/flexmeasures/pull/501>`_]
Expand Down
65 changes: 65 additions & 0 deletions documentation/plugin/customisation.rst
Expand Up @@ -5,6 +5,71 @@ Plugin Customizations
=======================


Adding your own scheduling algorithm
-------------------------------------

FlexMeasures comes with in-built scheduling algorithms for often-used use cases. However, you can use your own algorithm, as well.

The idea is that you'd still use FlexMeasures' API to post flexibility states and trigger new schedules to be computed (see :ref:`posting_flex_states`),
but in the background your custom scheduling algorithm is being used.

Let's walk through an example!

First, we need to write a function which accepts arguments just like the in-built schedulers (their code is `here <https://github.com/FlexMeasures/flexmeasures/tree/main/flexmeasures/data/models/planning>`_).
The following minimal example gives you an idea of the inputs and outputs:

.. code-block:: python
from datetime import datetime, timedelta
import pandas as pd
from pandas.tseries.frequencies import to_offset
from flexmeasures.data.models.time_series import Sensor
def compute_a_schedule(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
*args,
**kwargs
):
"""Just a dummy scheduler, advising to do nothing"""
return pd.Series(
0, index=pd.date_range(start, end, freq=resolution, closed="left")
)
.. note:: It's possible to add arguments that describe the asset flexibility and the EMS context in more detail. For example,
for storage assets we support various state-of-charge parameters. For now, the existing schedulers are the best documentation.


Finally, make your scheduler be the one that FlexMeasures will use for certain sensors:


.. code-block:: python
from flexmeasures.data.models.time_series import Sensor
scheduler_specs = {
"module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below
"function": "compute_a_schedule",
"source": "My Company"
}
my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none()
my_sensor.attributes["custom-scheduler"] = scheduler_specs
From now on, all schedules (see :ref:`tut_forecasting_scheduling`) which are requested for this sensor should
get computed by your custom function! For later lookup, the data will be linked to a new data source with the name "My Opinion".

.. note:: To describe the module, we used an importable module here (actually a custom scheduling function we use to test this).
You can also provide a full file path to the module, e.g. "/path/to/my_file.py".


.. todo:: We're planning to use a similar approach to allow for custom forecasting algorithms, as well.


Adding your own style sheets
----------------------------

Expand Down
6 changes: 2 additions & 4 deletions documentation/plugin/introduction.rst
Expand Up @@ -3,13 +3,11 @@
Writing Plugins
====================

You can extend FlexMeasures with functionality like UI pages, API endpoints, or CLI functions.
You can extend FlexMeasures with functionality like UI pages, API endpoints, CLI functions and custom scheduling algorithms.
This is eventually how energy flexibility services are built on top of FlexMeasures!

In an nutshell, a FlexMeasures plugin adds functionality via one or more `Flask Blueprints <https://flask.palletsprojects.com/en/1.1.x/tutorial/views/>`_.

.. todo:: We'll use this to allow for custom forecasting and scheduling algorithms, as well.


How to make FlexMeasures load your plugin
------------------------------------------
Expand All @@ -34,4 +32,4 @@ To hit the ground running with that approach, we provide a `CookieCutter templat
It also includes a few Blueprint examples and best practices.


Continue reading the :ref:`plugin_showcase`.
Continue reading the :ref:`plugin_showcase` or possibilities to do :ref:`plugin_customization`.
2 changes: 2 additions & 0 deletions documentation/tut/forecasting_scheduling.rst
Expand Up @@ -12,6 +12,8 @@ Let's take a look at how FlexMeasures users can access information from these se

If you want to learn more about the actual algorithms used in the background, head over to :ref:`algorithms`.

.. note:: FlexMeasures comes with in-built scheduling algorithms. You can use your own algorithm, as well, see :ref:`plugin-customization`.


Maintaining the queues
------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v1_3/implementations.py
Expand Up @@ -146,7 +146,7 @@ def get_device_message_response(generic_asset_name_groups, duration):

schedule_data_source_name = "Seita"
scheduler_source = DataSource.query.filter_by(
name="Seita", type="scheduling script"
name=schedule_data_source_name, type="scheduling script"
).one_or_none()
if scheduler_source is None:
return unknown_schedule(
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/api/v3_0/sensors.py
Expand Up @@ -519,8 +519,10 @@ def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwarg
schedule_start = job.kwargs["start"]

schedule_data_source_name = "Seita"
if "data_source_name" in job.meta:
schedule_data_source_name = job.meta["data_source_name"]
scheduler_source = DataSource.query.filter_by(
name="Seita", type="scheduling script"
name=schedule_data_source_name, type="scheduling script"
).one_or_none()
if scheduler_source is None:
return unknown_schedule(
Expand Down
123 changes: 90 additions & 33 deletions flexmeasures/data/services/scheduling.py
@@ -1,5 +1,9 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import List, Tuple, Optional, Callable
import os
import sys
import importlib.util
from importlib.abc import Loader

from flask import current_app
import click
Expand Down Expand Up @@ -139,49 +143,47 @@ def make_schedule(
np.nan, index=pd.date_range(start, end, freq=resolution, closed="right")
)

if sensor.generic_asset.generic_asset_type.name == "battery":
consumption_schedule = schedule_battery(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)
data_source_name = "Seita"

# Choose which algorithm to use
if "custom-scheduler" in sensor.attributes:
scheduler_specs = sensor.attributes.get("custom-scheduler")
scheduler, data_source_name = load_custom_scheduler(scheduler_specs)
if rq_job:
rq_job.meta["data_source_name"] = data_source_name
rq_job.save_meta()
elif sensor.generic_asset.generic_asset_type.name == "battery":
scheduler = schedule_battery
elif sensor.generic_asset.generic_asset_type.name in (
"one-way_evse",
"two-way_evse",
):
consumption_schedule = schedule_charging_station(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)
scheduler = schedule_charging_station

else:
raise ValueError(
"Scheduling is not (yet) supported for asset type %s."
% sensor.generic_asset.generic_asset_type
)

consumption_schedule = scheduler(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)

data_source = get_data_source(
data_source_name="Seita",
data_source_name=data_source_name,
data_source_type="scheduling script",
)
if rq_job:
Expand All @@ -204,6 +206,61 @@ def make_schedule(
return True


def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]:
"""
Read in custom scheduling spec.
Attempt to load the Callable, also derive a data source name.
Example specs:
{
"module": "/path/to/module.py", # or sthg importable, e.g. "package.module"
"function": "name_of_function",
"source": "source name"
}
"""
assert isinstance(
scheduler_specs, dict
), f"Scheduler specs is {type(scheduler_specs)}, should be a dict"
assert "module" in scheduler_specs, "scheduler specs have no 'module'."
assert "function" in scheduler_specs, "scheduler specs have no 'function'"

source_name = scheduler_specs.get(
"source", f"Custom scheduler - {scheduler_specs['function']}"
)
scheduler_name = scheduler_specs["function"]

# import module
module_descr = scheduler_specs["module"]
if os.path.exists(module_descr):
spec = importlib.util.spec_from_file_location(scheduler_name, module_descr)
assert spec, f"Could not load specs for scheduling module at {module_descr}."
module = importlib.util.module_from_spec(spec)
sys.modules[scheduler_name] = module
assert isinstance(spec.loader, Loader)
spec.loader.exec_module(module)
else: # assume importable module
try:
module = importlib.import_module(module_descr)
except TypeError as te:
current_app.log.error(f"Cannot load {module_descr}: {te}.")
raise
except ModuleNotFoundError:
current_app.logger.error(
f"Attempted to import module {module_descr} (as it is not a valid file path), but it is not installed."
)
raise
assert module, f"Module {module_descr} could not be loaded."

# get scheduling function
assert hasattr(
module, scheduler_specs["function"]
), "Module at {module_descr} has no function {scheduler_specs['function']}"

return getattr(module, scheduler_specs["function"]), source_name


def handle_scheduling_exception(job, exc_type, exc_value, traceback):
"""
Store exception as job meta data.
Expand Down
21 changes: 21 additions & 0 deletions flexmeasures/data/tests/dummy_scheduler.py
@@ -0,0 +1,21 @@
from datetime import datetime, timedelta

from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.planning.utils import initialize_series


def compute_a_schedule(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
*args,
**kwargs
):
"""Just a dummy scheduler."""
return initialize_series( # simply creates a Pandas Series repeating one value
data=sensor.get_attribute("capacity_in_mw"),
start=start,
end=end,
resolution=resolution,
)

0 comments on commit feb3d33

Please sign in to comment.