Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to provide a custom scheduling algorithm #505

Merged
merged 16 commits into from Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -9,6 +9,7 @@ New features
-------------

* Hit the replay button to replay what happened, available on the sensor and asset pages [see `PR #463 <http://www.github.com/FlexMeasures/flexmeasures/pull/463>`_]
* Ability to provide your own custom scheduling function [see `PR #505 <http://www.github.com/FlexMeasures/flexmeasures/pull/505>`_]
* Visually distinguish forecasts/schedules (dashed lines) from measurements (solid lines), and expand the tooltip with timing info regarding the forecast/schedule horizon or measurement lag [see `PR #503 <http://www.github.com/FlexMeasures/flexmeasures/pull/503>`_]
* The asset page also allows to show sensor data from other assets that belong to the same account [see `PR #500 <http://www.github.com/FlexMeasures/flexmeasures/pull/500>`_]
* Improved import of time series data from CSV file: 1) drop duplicate records with warning, and 2) allow configuring which column contains explicit recording times for each data point (use case: import forecasts) [see `PR #501 <http://www.github.com/FlexMeasures/flexmeasures/pull/501>`_]
Expand Down
65 changes: 65 additions & 0 deletions documentation/plugin/customisation.rst
Expand Up @@ -5,6 +5,71 @@ Plugin Customizations
=======================


Adding your own scheduling algorithm
-------------------------------------

FlexMeasures comes with in-built scheduling algorithms for often-used use cases. However, you can use your own algorithm, as well.

The idea is that you'd still use FlexMeasures' API to post flexibility states and trigger new schedules to be computed (see :ref:`posting_flex_states`),
but in the background your custom scheduling algorithm is being used.

Let's walk through an example!

First, we need to write a function which accepts arguments just like the in-built schedulers (their code is `here <https://github.com/FlexMeasures/flexmeasures/tree/main/flexmeasures/data/models/planning>`_).
The following minimal example gives you an idea of the inputs and outputs:

.. code-block:: python

from datetime import datetime, timedelta
import pandas as pd
from pandas.tseries.frequencies import to_offset
from flexmeasures.data.models.time_series import Sensor

def compute_a_schedule(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
*args,
**kwargs
):
"""Just a dummy scheduler, advising to do nothing"""
return pd.Series(
0, index=pd.date_range(start, end, freq=resolution, closed="left")
)


.. note:: It's possible to add arguments that describe the asset flexibility and the EMS context in more detail. For example,
for storage assets we support various state-of-charge parameters. For now, the existing schedulers are the best documentation.


Finally, make your scheduler be the one that FlexMeasures will use for certain sensors:


.. code-block:: python

from flexmeasures.data.models.time_series import Sensor

scheduler_specs = {
"module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below
"function": "compute_a_schedule",
"source": "My Company"
}

my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none()
my_sensor.attributes["custom-scheduler"] = scheduler_specs


From now on, all schedules (see :ref:`tut_forecasting_scheduling`) which are requested for this sensor should
get computed by your custom function! For later lookup, the data will be linked to a new data source with the name "My Opinion".

.. note:: To describe the module, we used an importable module here (actually a custom scheduling function we use to test this).
You can also provide a full file path to the module, e.g. "/path/to/my_file.py".


.. todo:: We're planning to use a similar approach to allow for custom forecasting algorithms, as well.


Adding your own style sheets
----------------------------

Expand Down
6 changes: 2 additions & 4 deletions documentation/plugin/introduction.rst
Expand Up @@ -3,13 +3,11 @@
Writing Plugins
====================

You can extend FlexMeasures with functionality like UI pages, API endpoints, or CLI functions.
You can extend FlexMeasures with functionality like UI pages, API endpoints, CLI functions and custom scheduling algorithms.
This is eventually how energy flexibility services are built on top of FlexMeasures!

In an nutshell, a FlexMeasures plugin adds functionality via one or more `Flask Blueprints <https://flask.palletsprojects.com/en/1.1.x/tutorial/views/>`_.

.. todo:: We'll use this to allow for custom forecasting and scheduling algorithms, as well.
nhoening marked this conversation as resolved.
Show resolved Hide resolved


How to make FlexMeasures load your plugin
------------------------------------------
Expand All @@ -34,4 +32,4 @@ To hit the ground running with that approach, we provide a `CookieCutter templat
It also includes a few Blueprint examples and best practices.


Continue reading the :ref:`plugin_showcase`.
Continue reading the :ref:`plugin_showcase` or possibilities to do :ref:`plugin_customization`.
2 changes: 2 additions & 0 deletions documentation/tut/forecasting_scheduling.rst
Expand Up @@ -12,6 +12,8 @@ Let's take a look at how FlexMeasures users can access information from these se

If you want to learn more about the actual algorithms used in the background, head over to :ref:`algorithms`.

.. note:: FlexMeasures comes with in-built scheduling algorithms. You can use your own algorithm, as well, see :ref:`plugin-customization`.


Maintaining the queues
------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v1_3/implementations.py
Expand Up @@ -146,7 +146,7 @@ def get_device_message_response(generic_asset_name_groups, duration):

schedule_data_source_name = "Seita"
scheduler_source = DataSource.query.filter_by(
name="Seita", type="scheduling script"
name=schedule_data_source_name, type="scheduling script"
).one_or_none()
if scheduler_source is None:
return unknown_schedule(
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/api/v3_0/sensors.py
Expand Up @@ -519,8 +519,10 @@ def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwarg
schedule_start = job.kwargs["start"]

schedule_data_source_name = "Seita"
if "data_source_name" in job.meta:
schedule_data_source_name = job.meta["data_source_name"]
scheduler_source = DataSource.query.filter_by(
name="Seita", type="scheduling script"
name=schedule_data_source_name, type="scheduling script"
).one_or_none()
if scheduler_source is None:
return unknown_schedule(
Expand Down
123 changes: 90 additions & 33 deletions flexmeasures/data/services/scheduling.py
@@ -1,5 +1,9 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import List, Tuple, Optional, Callable
import os
import sys
import importlib.util
from importlib.abc import Loader

from flask import current_app
import click
Expand Down Expand Up @@ -139,49 +143,47 @@ def make_schedule(
np.nan, index=pd.date_range(start, end, freq=resolution, closed="right")
)

if sensor.generic_asset.generic_asset_type.name == "battery":
consumption_schedule = schedule_battery(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)
data_source_name = "Seita"

# Choose which algorithm to use
if "custom-scheduler" in sensor.attributes:
scheduler_specs = sensor.attributes.get("custom-scheduler")
scheduler, data_source_name = load_custom_scheduler(scheduler_specs)
if rq_job:
rq_job.meta["data_source_name"] = data_source_name
rq_job.save_meta()
elif sensor.generic_asset.generic_asset_type.name == "battery":
scheduler = schedule_battery
elif sensor.generic_asset.generic_asset_type.name in (
"one-way_evse",
"two-way_evse",
):
consumption_schedule = schedule_charging_station(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)
scheduler = schedule_charging_station

else:
raise ValueError(
"Scheduling is not (yet) supported for asset type %s."
% sensor.generic_asset.generic_asset_type
)

consumption_schedule = scheduler(
sensor,
start,
end,
resolution,
soc_at_start,
soc_targets,
soc_min,
soc_max,
roundtrip_efficiency,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)

data_source = get_data_source(
data_source_name="Seita",
data_source_name=data_source_name,
data_source_type="scheduling script",
)
if rq_job:
Expand All @@ -204,6 +206,61 @@ def make_schedule(
return True


def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]:
"""
Read in custom scheduling spec.
Attempt to load the Callable, also derive a data source name.

Example specs:

{
"module": "/path/to/module.py", # or sthg importable, e.g. "package.module"
"function": "name_of_function",
"source": "source name"
}

"""
assert isinstance(
scheduler_specs, dict
), f"Scheduler specs is {type(scheduler_specs)}, should be a dict"
assert "module" in scheduler_specs, "scheduler specs have no 'module'."
assert "function" in scheduler_specs, "scheduler specs have no 'function'"

source_name = scheduler_specs.get(
"source", f"Custom scheduler - {scheduler_specs['function']}"
)
scheduler_name = scheduler_specs["function"]

# import module
module_descr = scheduler_specs["module"]
if os.path.exists(module_descr):
spec = importlib.util.spec_from_file_location(scheduler_name, module_descr)
assert spec, f"Could not load specs for scheduling module at {module_descr}."
module = importlib.util.module_from_spec(spec)
sys.modules[scheduler_name] = module
assert isinstance(spec.loader, Loader)
spec.loader.exec_module(module)
else: # assume importable module
try:
module = importlib.import_module(module_descr)
except TypeError as te:
current_app.log.error(f"Cannot load {module_descr}: {te}.")
raise
except ModuleNotFoundError:
current_app.logger.error(
f"Attempted to import module {module_descr} (as it is not a valid file path), but it is not installed."
)
raise
assert module, f"Module {module_descr} could not be loaded."

# get scheduling function
assert hasattr(
module, scheduler_specs["function"]
), "Module at {module_descr} has no function {scheduler_specs['function']}"

return getattr(module, scheduler_specs["function"]), source_name


def handle_scheduling_exception(job, exc_type, exc_value, traceback):
"""
Store exception as job meta data.
Expand Down
21 changes: 21 additions & 0 deletions flexmeasures/data/tests/dummy_scheduler.py
@@ -0,0 +1,21 @@
from datetime import datetime, timedelta

from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.planning.utils import initialize_series


def compute_a_schedule(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
*args,
**kwargs
):
"""Just a dummy scheduler."""
return initialize_series( # simply creates a Pandas Series repeating one value
data=sensor.get_attribute("capacity_in_mw"),
start=start,
end=end,
resolution=resolution,
)