Skip to content

Commit

Permalink
feat: add ProcessScheduler (#729)
Browse files Browse the repository at this point in the history
* test: add shiftable_load fixture

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* test: move fixture setup_dummy_sensors from test_reporting.py to conftest.py

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: add ShiftableLoadFlexModelSchema

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* test: add ShiftableLoadFlexModelSchema tests

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: add ShiftableLoadScheduler

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* tests: add ShiftableLoadScheduler tests

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* test: add required parameter

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* docs: improve docstrings

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix: pandas 2.0 deprecated argument

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* docs: add changelog

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* refactor: move TimeIntervalSchema to data.schemas.time

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* refactor: rename cost_sensor to consumption_price_sensor

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* docs: add attribute description

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* address change requests

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* use consumption_price_sensor from flex_context

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* making block_invalid_starting_times_for_whole_process_scheduling work only when optimizing a INFLEXIBLE or SHIFTABLE load type.

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* remove consumption_price_sensor from flex_model

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* refactor: rename shiftable_load to process

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* rename optimization_sense to optimization_direction

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* consistent capitalization of INFLEXIBLE, SHIFTABLE AND BREAKABLE

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix typo

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix capitalizationof `inflexible-device-sensors`

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* rename OptimizationSense to OptimizationDirection

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix missing optimization direction renaming

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

---------

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
  • Loading branch information
victorgarcia98 committed Jul 31, 2023
1 parent 654623e commit 1b62bd7
Show file tree
Hide file tree
Showing 12 changed files with 780 additions and 33 deletions.
9 changes: 7 additions & 2 deletions documentation/api/notation.rst
Expand Up @@ -202,9 +202,14 @@ Here are the three types of flexibility models you can expect to be built-in:
For some examples, see the `[POST] /sensors/(id)/schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_ endpoint docs.

2) For **shiftable processes**
2) For **processes**

.. todo:: A simple and proven algorithm exists, but is awaiting proper integration into FlexMeasures, see `PR 729 <https://github.com/FlexMeasures/flexmeasures/pull/729>`_.
- ``power``: nominal power of the load.
- ``duration``: time that the load last.
- ``optimization_sense``: objective of the scheduler, to maximize or minimize.
- ``time_restrictions``: time periods in which the load cannot be schedule to.
- ``process_type``: INFLEXIBLE, BREAKABLE or SHIFTABLE.


3) For **buffer devices** (e.g. thermal energy storage systems connected to heat pumps), use the same flexibility parameters described above for storage devices. Here are some tips to model a buffer with these parameters:

Expand Down
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -17,6 +17,7 @@ New features
* DataSource table now allows storing arbitrary attributes as a JSON (without content validation), similar to the Sensor and GenericAsset tables [see `PR #750 <https://www.github.com/FlexMeasures/flexmeasures/pull/750>`_]
* Added API endpoint `/sensor/<id>` for fetching a single sensor. [see `PR #759 <https://www.github.com/FlexMeasures/flexmeasures/pull/759>`_]
* The CLI now allows to set lists and dicts as asset & sensor attributes (formerly only single values) [see `PR #762 <https://www.github.com/FlexMeasures/flexmeasures/pull/762>`_]
* Add `ProcessScheduler` class, which optimizes the starting time of processes using one of the following policies: INFLEXIBLE, SHIFTABLE and BREAKABLE [see `PR #729 <https://www.github.com/FlexMeasures/flexmeasures/pull/729>`_]

Bugfixes
-----------
Expand Down
272 changes: 272 additions & 0 deletions flexmeasures/data/models/planning/process.py
@@ -0,0 +1,272 @@
from __future__ import annotations

from math import ceil
from datetime import timedelta
import pytz

import pandas as pd

from flexmeasures.data.models.planning import Scheduler

from flexmeasures.data.queries.utils import simplify_index
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.schemas.scheduling.process import (
ProcessSchedulerFlexModelSchema,
ProcessType,
OptimizationDirection,
)
from flexmeasures.data.schemas.scheduling import FlexContextSchema


class ProcessScheduler(Scheduler):

__version__ = "1"
__author__ = "Seita"

def compute(self) -> pd.Series | None:
"""Schedule a process, defined as a `power` and a `duration`, within the specified time window.
To schedule a battery, please, refer to the StorageScheduler.
For example, this scheduler can plan the start of a process of type `SHIFTABLE` that lasts 5h and requires a power of 10kW.
In that case, the scheduler will find the best (as to minimize/maximize the cost) hour to start the process.
This scheduler supports three types of `process_types`:
- INFLEXIBLE: this process needs to be scheduled as soon as possible.
- BREAKABLE: this process can be divisible in smaller consumption periods.
- SHIFTABLE: this process can start at any time within the specified time window.
The resulting schedule provides the power flow at each time period.
Parameters
==========
consumption_price_sensor: it defines the utility (economic, environmental, ) in each
time period. It has units of quantity/energy, for example, EUR/kWh.
power: nominal power of the process.
duration: time that the process last.
optimization_direction: objective of the scheduler, to maximize or minimize.
time_restrictions: time periods in which the process cannot be schedule to.
process_type: INFLEXIBLE, BREAKABLE or SHIFTABLE.
:returns: The computed schedule.
"""

if not self.config_deserialized:
self.deserialize_config()

start = self.start.astimezone(pytz.utc)
end = self.end.astimezone(pytz.utc)
resolution = self.resolution
belief_time = self.belief_time
sensor = self.sensor

consumption_price_sensor: Sensor = self.flex_context.get(
"consumption_price_sensor"
)
duration: timedelta = self.flex_model.get("duration")
power = self.flex_model.get("power")
optimization_direction = self.flex_model.get("optimization_direction")
process_type: ProcessType = self.flex_model.get("process_type")
time_restrictions = self.flex_model.get("time_restrictions")

# get cost data
cost = consumption_price_sensor.search_beliefs(
event_starts_after=start,
event_ends_before=end,
resolution=resolution,
one_deterministic_belief_per_event=True,
beliefs_before=belief_time,
)
cost = simplify_index(cost)

# create an empty schedule
schedule = pd.Series(
index=pd.date_range(
start,
end,
freq=sensor.event_resolution,
inclusive="left",
name="event_start",
),
data=0,
name="event_value",
)

# convert power to energy using the resolution of the sensor.
# e.g. resolution=15min, power=1kW -> energy=250W
energy = power * consumption_price_sensor.event_resolution / timedelta(hours=1)

# we can fill duration/resolution rows or, if the duration is larger than the schedule
# window, fill the entire window.
rows_to_fill = min(
ceil(duration / consumption_price_sensor.event_resolution), len(schedule)
)

# duration of the process exceeds the scheduling window
if rows_to_fill == len(schedule):
if time_restrictions.sum() > 0:
raise ValueError(
"Cannot handle time restrictions if the duration of the process exceeds that of the schedule window."
)

schedule[:] = energy
return schedule

if process_type in [ProcessType.INFLEXIBLE, ProcessType.SHIFTABLE]:
start_time_restrictions = (
self.block_invalid_starting_times_for_whole_process_scheduling(
process_type, time_restrictions, duration, rows_to_fill
)
)
else: # ProcessType.BREAKABLE
if (~time_restrictions).sum() < rows_to_fill:
raise ValueError(
"Cannot allocate a block of time {duration} given the time restrictions provided."
)

# create schedule
if process_type == ProcessType.INFLEXIBLE:
self.compute_inflexible(
schedule, start_time_restrictions, rows_to_fill, energy
)
elif process_type == ProcessType.BREAKABLE:
self.compute_breakable(
schedule,
optimization_direction,
time_restrictions,
cost,
rows_to_fill,
energy,
)
elif process_type == ProcessType.SHIFTABLE:
self.compute_shiftable(
schedule,
optimization_direction,
start_time_restrictions,
cost,
rows_to_fill,
energy,
)
else:
raise ValueError(f"Unknown process type '{process_type}'")

return schedule.tz_convert(self.start.tzinfo)

def block_invalid_starting_times_for_whole_process_scheduling(
self,
process_type: ProcessType,
time_restrictions: pd.Series,
duration: timedelta,
rows_to_fill: int,
) -> pd.Series:
"""Blocks time periods where the process cannot be schedule into, making
sure no other time restrictions runs in the middle of the activation of the process
More technically, this function applying an erosion of the time_restrictions array with a block of length duration.
Then, the condition if time_restrictions.sum() == len(time_restrictions):, makes sure that at least we have a spot to place the process.
For example:
time_restriction = [1 0 0 1 1 1 0 0 1 0]
# applying a dilation with duration = 2
time_restriction = [1 0 1 1 1 1 0 1 1 1]
We can only fit a block of duration = 2 in the positions 1 and 6. sum(start_time_restrictions) == 8,
while the len(time_restriction) == 10, which means we have 10-8=2 positions.
:param process_type: INFLEXIBLE, SHIFTABLE or BREAKABLE
:param time_restrictions: boolean time series indicating time periods in which the process cannot be scheduled.
:param duration: (datetime) duration of the length
:param rows_to_fill: (int) time periods that the process lasts
:return: filtered time restrictions
"""

# get start time instants that are not feasible, i.e. some time during the ON period goes through
# a time restriction interval
start_time_restrictions = (
time_restrictions.rolling(duration).max().shift(-rows_to_fill + 1)
)
start_time_restrictions = (
start_time_restrictions == 1
) | start_time_restrictions.isna()

if (~start_time_restrictions).sum() == 0:
raise ValueError(
"Cannot allocate a block of time {duration} given the time restrictions provided."
)

return start_time_restrictions

def compute_inflexible(
self,
schedule: pd.Series,
time_restrictions: pd.Series,
rows_to_fill: int,
energy: float,
) -> None:
"""Schedule process as early as possible."""
start = time_restrictions[~time_restrictions].index[0]

schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = energy

def compute_breakable(
self,
schedule: pd.Series,
optimization_direction: OptimizationDirection,
time_restrictions: pd.Series,
cost: pd.DataFrame,
rows_to_fill: int,
energy: float,
) -> None:
"""Break up schedule and divide it over the time slots with the largest utility (max/min cost depending on optimization_direction)."""
cost = cost[~time_restrictions].reset_index()

if optimization_direction == OptimizationDirection.MIN:
cost_ranking = cost.sort_values(
by=["event_value", "event_start"], ascending=[True, True]
)
else:
cost_ranking = cost.sort_values(
by=["event_value", "event_start"], ascending=[False, True]
)

schedule.loc[cost_ranking.head(rows_to_fill).event_start] = energy

def compute_shiftable(
self,
schedule: pd.Series,
optimization_direction: OptimizationDirection,
time_restrictions: pd.Series,
cost: pd.DataFrame,
rows_to_fill: int,
energy: float,
) -> None:
"""Schedules a block of consumption/production of `rows_to_fill` periods to maximize a utility."""
block_cost = simplify_index(
cost.rolling(rows_to_fill).sum().shift(-rows_to_fill + 1)
)

if optimization_direction == OptimizationDirection.MIN:
start = block_cost[~time_restrictions].idxmin()
else:
start = block_cost[~time_restrictions].idxmax()

start = start.iloc[0]

schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = energy

def deserialize_flex_config(self):
"""Deserialize flex_model using the schema ProcessSchedulerFlexModelSchema and
flex_context using FlexContextSchema
"""
if self.flex_model is None:
self.flex_model = {}

self.flex_model = ProcessSchedulerFlexModelSchema(
start=self.start, end=self.end, sensor=self.sensor
).load(self.flex_model)

self.flex_context = FlexContextSchema().load(self.flex_context)
16 changes: 16 additions & 0 deletions flexmeasures/data/models/planning/tests/conftest.py
Expand Up @@ -187,6 +187,22 @@ def add_inflexible_device_forecasts(
}


@pytest.fixture(scope="module")
def process(db, building, setup_sources) -> dict[str, Sensor]:
"""
Set up a process sensor where the output of the optimization is stored.
"""
_process = Sensor(
name="Process",
generic_asset=building,
event_resolution=timedelta(hours=1),
unit="kWh",
)
db.session.add(_process)

return _process


def add_as_beliefs(db, sensor, values, time_slots, source):
beliefs = [
TimedBelief(
Expand Down

0 comments on commit 1b62bd7

Please sign in to comment.