diff --git a/documentation/api/notation.rst b/documentation/api/notation.rst index ff1f44b22..0e5a6c8a7 100644 --- a/documentation/api/notation.rst +++ b/documentation/api/notation.rst @@ -202,9 +202,14 @@ Here are the three types of flexibility models you can expect to be built-in: For some examples, see the `[POST] /sensors/(id)/schedules/trigger <../api/v3_0.html#post--api-v3_0-sensors-(id)-schedules-trigger>`_ endpoint docs. -2) For **shiftable processes** +2) For **processes** - .. todo:: A simple and proven algorithm exists, but is awaiting proper integration into FlexMeasures, see `PR 729 `_. + - ``power``: nominal power of the load. + - ``duration``: time that the load last. + - ``optimization_sense``: objective of the scheduler, to maximize or minimize. + - ``time_restrictions``: time periods in which the load cannot be schedule to. + - ``process_type``: INFLEXIBLE, BREAKABLE or SHIFTABLE. + 3) For **buffer devices** (e.g. thermal energy storage systems connected to heat pumps), use the same flexibility parameters described above for storage devices. Here are some tips to model a buffer with these parameters: diff --git a/documentation/changelog.rst b/documentation/changelog.rst index b1b72be2f..363273181 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -17,6 +17,7 @@ New features * DataSource table now allows storing arbitrary attributes as a JSON (without content validation), similar to the Sensor and GenericAsset tables [see `PR #750 `_] * Added API endpoint `/sensor/` for fetching a single sensor. [see `PR #759 `_] * The CLI now allows to set lists and dicts as asset & sensor attributes (formerly only single values) [see `PR #762 `_] +* Add `ProcessScheduler` class, which optimizes the starting time of processes using one of the following policies: INFLEXIBLE, SHIFTABLE and BREAKABLE [see `PR #729 `_] Bugfixes ----------- diff --git a/flexmeasures/data/models/planning/process.py b/flexmeasures/data/models/planning/process.py new file mode 100644 index 000000000..ac92f37bc --- /dev/null +++ b/flexmeasures/data/models/planning/process.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from math import ceil +from datetime import timedelta +import pytz + +import pandas as pd + +from flexmeasures.data.models.planning import Scheduler + +from flexmeasures.data.queries.utils import simplify_index +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.schemas.scheduling.process import ( + ProcessSchedulerFlexModelSchema, + ProcessType, + OptimizationDirection, +) +from flexmeasures.data.schemas.scheduling import FlexContextSchema + + +class ProcessScheduler(Scheduler): + + __version__ = "1" + __author__ = "Seita" + + def compute(self) -> pd.Series | None: + """Schedule a process, defined as a `power` and a `duration`, within the specified time window. + To schedule a battery, please, refer to the StorageScheduler. + + For example, this scheduler can plan the start of a process of type `SHIFTABLE` that lasts 5h and requires a power of 10kW. + In that case, the scheduler will find the best (as to minimize/maximize the cost) hour to start the process. + + This scheduler supports three types of `process_types`: + - INFLEXIBLE: this process needs to be scheduled as soon as possible. + - BREAKABLE: this process can be divisible in smaller consumption periods. + - SHIFTABLE: this process can start at any time within the specified time window. + + The resulting schedule provides the power flow at each time period. + + Parameters + ========== + + consumption_price_sensor: it defines the utility (economic, environmental, ) in each + time period. It has units of quantity/energy, for example, EUR/kWh. + power: nominal power of the process. + duration: time that the process last. + + optimization_direction: objective of the scheduler, to maximize or minimize. + time_restrictions: time periods in which the process cannot be schedule to. + process_type: INFLEXIBLE, BREAKABLE or SHIFTABLE. + + :returns: The computed schedule. + """ + + if not self.config_deserialized: + self.deserialize_config() + + start = self.start.astimezone(pytz.utc) + end = self.end.astimezone(pytz.utc) + resolution = self.resolution + belief_time = self.belief_time + sensor = self.sensor + + consumption_price_sensor: Sensor = self.flex_context.get( + "consumption_price_sensor" + ) + duration: timedelta = self.flex_model.get("duration") + power = self.flex_model.get("power") + optimization_direction = self.flex_model.get("optimization_direction") + process_type: ProcessType = self.flex_model.get("process_type") + time_restrictions = self.flex_model.get("time_restrictions") + + # get cost data + cost = consumption_price_sensor.search_beliefs( + event_starts_after=start, + event_ends_before=end, + resolution=resolution, + one_deterministic_belief_per_event=True, + beliefs_before=belief_time, + ) + cost = simplify_index(cost) + + # create an empty schedule + schedule = pd.Series( + index=pd.date_range( + start, + end, + freq=sensor.event_resolution, + inclusive="left", + name="event_start", + ), + data=0, + name="event_value", + ) + + # convert power to energy using the resolution of the sensor. + # e.g. resolution=15min, power=1kW -> energy=250W + energy = power * consumption_price_sensor.event_resolution / timedelta(hours=1) + + # we can fill duration/resolution rows or, if the duration is larger than the schedule + # window, fill the entire window. + rows_to_fill = min( + ceil(duration / consumption_price_sensor.event_resolution), len(schedule) + ) + + # duration of the process exceeds the scheduling window + if rows_to_fill == len(schedule): + if time_restrictions.sum() > 0: + raise ValueError( + "Cannot handle time restrictions if the duration of the process exceeds that of the schedule window." + ) + + schedule[:] = energy + return schedule + + if process_type in [ProcessType.INFLEXIBLE, ProcessType.SHIFTABLE]: + start_time_restrictions = ( + self.block_invalid_starting_times_for_whole_process_scheduling( + process_type, time_restrictions, duration, rows_to_fill + ) + ) + else: # ProcessType.BREAKABLE + if (~time_restrictions).sum() < rows_to_fill: + raise ValueError( + "Cannot allocate a block of time {duration} given the time restrictions provided." + ) + + # create schedule + if process_type == ProcessType.INFLEXIBLE: + self.compute_inflexible( + schedule, start_time_restrictions, rows_to_fill, energy + ) + elif process_type == ProcessType.BREAKABLE: + self.compute_breakable( + schedule, + optimization_direction, + time_restrictions, + cost, + rows_to_fill, + energy, + ) + elif process_type == ProcessType.SHIFTABLE: + self.compute_shiftable( + schedule, + optimization_direction, + start_time_restrictions, + cost, + rows_to_fill, + energy, + ) + else: + raise ValueError(f"Unknown process type '{process_type}'") + + return schedule.tz_convert(self.start.tzinfo) + + def block_invalid_starting_times_for_whole_process_scheduling( + self, + process_type: ProcessType, + time_restrictions: pd.Series, + duration: timedelta, + rows_to_fill: int, + ) -> pd.Series: + """Blocks time periods where the process cannot be schedule into, making + sure no other time restrictions runs in the middle of the activation of the process + + More technically, this function applying an erosion of the time_restrictions array with a block of length duration. + + Then, the condition if time_restrictions.sum() == len(time_restrictions):, makes sure that at least we have a spot to place the process. + + For example: + + time_restriction = [1 0 0 1 1 1 0 0 1 0] + + # applying a dilation with duration = 2 + time_restriction = [1 0 1 1 1 1 0 1 1 1] + + We can only fit a block of duration = 2 in the positions 1 and 6. sum(start_time_restrictions) == 8, + while the len(time_restriction) == 10, which means we have 10-8=2 positions. + + :param process_type: INFLEXIBLE, SHIFTABLE or BREAKABLE + :param time_restrictions: boolean time series indicating time periods in which the process cannot be scheduled. + :param duration: (datetime) duration of the length + :param rows_to_fill: (int) time periods that the process lasts + :return: filtered time restrictions + """ + + # get start time instants that are not feasible, i.e. some time during the ON period goes through + # a time restriction interval + start_time_restrictions = ( + time_restrictions.rolling(duration).max().shift(-rows_to_fill + 1) + ) + start_time_restrictions = ( + start_time_restrictions == 1 + ) | start_time_restrictions.isna() + + if (~start_time_restrictions).sum() == 0: + raise ValueError( + "Cannot allocate a block of time {duration} given the time restrictions provided." + ) + + return start_time_restrictions + + def compute_inflexible( + self, + schedule: pd.Series, + time_restrictions: pd.Series, + rows_to_fill: int, + energy: float, + ) -> None: + """Schedule process as early as possible.""" + start = time_restrictions[~time_restrictions].index[0] + + schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = energy + + def compute_breakable( + self, + schedule: pd.Series, + optimization_direction: OptimizationDirection, + time_restrictions: pd.Series, + cost: pd.DataFrame, + rows_to_fill: int, + energy: float, + ) -> None: + """Break up schedule and divide it over the time slots with the largest utility (max/min cost depending on optimization_direction).""" + cost = cost[~time_restrictions].reset_index() + + if optimization_direction == OptimizationDirection.MIN: + cost_ranking = cost.sort_values( + by=["event_value", "event_start"], ascending=[True, True] + ) + else: + cost_ranking = cost.sort_values( + by=["event_value", "event_start"], ascending=[False, True] + ) + + schedule.loc[cost_ranking.head(rows_to_fill).event_start] = energy + + def compute_shiftable( + self, + schedule: pd.Series, + optimization_direction: OptimizationDirection, + time_restrictions: pd.Series, + cost: pd.DataFrame, + rows_to_fill: int, + energy: float, + ) -> None: + """Schedules a block of consumption/production of `rows_to_fill` periods to maximize a utility.""" + block_cost = simplify_index( + cost.rolling(rows_to_fill).sum().shift(-rows_to_fill + 1) + ) + + if optimization_direction == OptimizationDirection.MIN: + start = block_cost[~time_restrictions].idxmin() + else: + start = block_cost[~time_restrictions].idxmax() + + start = start.iloc[0] + + schedule.loc[start : start + self.resolution * (rows_to_fill - 1)] = energy + + def deserialize_flex_config(self): + """Deserialize flex_model using the schema ProcessSchedulerFlexModelSchema and + flex_context using FlexContextSchema + """ + if self.flex_model is None: + self.flex_model = {} + + self.flex_model = ProcessSchedulerFlexModelSchema( + start=self.start, end=self.end, sensor=self.sensor + ).load(self.flex_model) + + self.flex_context = FlexContextSchema().load(self.flex_context) diff --git a/flexmeasures/data/models/planning/tests/conftest.py b/flexmeasures/data/models/planning/tests/conftest.py index acf8ca4ab..2d7981baf 100644 --- a/flexmeasures/data/models/planning/tests/conftest.py +++ b/flexmeasures/data/models/planning/tests/conftest.py @@ -187,6 +187,22 @@ def add_inflexible_device_forecasts( } +@pytest.fixture(scope="module") +def process(db, building, setup_sources) -> dict[str, Sensor]: + """ + Set up a process sensor where the output of the optimization is stored. + """ + _process = Sensor( + name="Process", + generic_asset=building, + event_resolution=timedelta(hours=1), + unit="kWh", + ) + db.session.add(_process) + + return _process + + def add_as_beliefs(db, sensor, values, time_slots, source): beliefs = [ TimedBelief( diff --git a/flexmeasures/data/models/planning/tests/test_process.py b/flexmeasures/data/models/planning/tests/test_process.py new file mode 100644 index 000000000..be6210084 --- /dev/null +++ b/flexmeasures/data/models/planning/tests/test_process.py @@ -0,0 +1,235 @@ +from datetime import datetime, timedelta +import pytest +import pytz + +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.planning.process import ProcessScheduler + + +tz = pytz.timezone("Europe/Amsterdam") +start = tz.localize(datetime(2015, 1, 2)) +end = tz.localize(datetime(2015, 1, 3)) +resolution = timedelta(hours=1) + + +@pytest.mark.parametrize( + "process_type, optimal_start", + [("INFLEXIBLE", datetime(2015, 1, 2, 0)), ("SHIFTABLE", datetime(2015, 1, 2, 8))], +) +def test_shiftable_scheduler(add_battery_assets, process, process_type, optimal_start): + """ + Test scheduling a process of 4kW of power that last 4h using the ProcessScheduler + without time restrictions. + """ + + # get the sensors from the database + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + flex_model = { + "duration": "PT4H", + "process-type": process_type, + "power": 4, + } + + flex_context = { + "consumption-price-sensor": epex_da.id, + } + + scheduler = ProcessScheduler( + process, + start, + end, + resolution, + flex_model=flex_model, + flex_context=flex_context, + ) + schedule = scheduler.compute() + + optimal_start = tz.localize(optimal_start) + + mask = (optimal_start <= schedule.index) & ( + schedule.index < optimal_start + timedelta(hours=4) + ) + + assert (schedule[mask] == 4).all() + assert (schedule[~mask] == 0).all() + + +@pytest.mark.parametrize( + "process_type, optimal_start", + [("INFLEXIBLE", datetime(2015, 1, 2, 0)), ("SHIFTABLE", datetime(2015, 1, 2, 8))], +) +def test_duration_exceeds_planning_window( + add_battery_assets, process, process_type, optimal_start +): + """ + Test scheduling a process that last longer than the planning window. + """ + + # get the sensors from the database + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + flex_model = { + "duration": "PT48H", + "process-type": process_type, + "power": 4, + } + + flex_context = { + "consumption-price-sensor": epex_da.id, + } + + scheduler = ProcessScheduler( + process, + start, + end, + resolution, + flex_model=flex_model, + flex_context=flex_context, + ) + schedule = scheduler.compute() + + optimal_start = tz.localize(optimal_start) + + assert (schedule == 4).all() + + +def test_shiftable_scheduler_time_restrictions(add_battery_assets, process): + """ + Test ProcessScheduler with a time restrictions consisting of a block of 2h starting + at 8am. The resulting schedules avoid the 8am-10am period and schedules for a valid period. + """ + + # get the sensors from the database + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + # time parameters + + flex_model = { + "duration": "PT4H", + "process-type": "SHIFTABLE", + "power": 4, + "time-restrictions": [ + {"start": "2015-01-02T08:00:00+01:00", "duration": "PT2H"} + ], + } + flex_context = { + "consumption-price-sensor": epex_da.id, + } + + scheduler = ProcessScheduler( + process, + start, + end, + resolution, + flex_model=flex_model, + flex_context=flex_context, + ) + schedule = scheduler.compute() + + optimal_start = tz.localize(datetime(2015, 1, 2, 10)) + + mask = (optimal_start <= schedule.index) & ( + schedule.index < optimal_start + timedelta(hours=4) + ) + + assert (schedule[mask] == 4).all() + assert (schedule[~mask] == 0).all() + + # check that the time restrictions are fulfilled + time_restrictions = scheduler.flex_model["time_restrictions"] + time_restrictions = time_restrictions.tz_convert(tz) + + assert (schedule[time_restrictions] == 0).all() + + +def test_breakable_scheduler_time_restrictions(add_battery_assets, process): + """ + Test BREAKABLE process_type of ProcessScheduler by introducing four 1-hour restrictions + interspaced by 1 hour. The equivalent mask would be the following: [0,...,0,1,0,1,0,1,0,1,0, ...,0]. + Trying to get the best prices (between 9am and 4pm), his makes the schedule choose time periods between + the time restrictions. + """ + + # get the sensors from the database + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + # time parameters + + flex_model = { + "duration": "PT4H", + "process-type": "BREAKABLE", + "power": 4, + "time-restrictions": [ + {"start": "2015-01-02T09:00:00+01:00", "duration": "PT1H"}, + {"start": "2015-01-02T11:00:00+01:00", "duration": "PT1H"}, + {"start": "2015-01-02T13:00:00+01:00", "duration": "PT1H"}, + {"start": "2015-01-02T15:00:00+01:00", "duration": "PT1H"}, + ], + } + + flex_context = { + "consumption-price-sensor": epex_da.id, + } + + scheduler = ProcessScheduler( + process, + start, + end, + resolution, + flex_model=flex_model, + flex_context=flex_context, + ) + schedule = scheduler.compute() + + expected_schedule = [0] * 8 + [4, 0, 4, 0, 4, 0, 4, 0] + [0] * 8 + + assert (schedule == expected_schedule).all() + + # check that the time restrictions are fulfilled + time_restrictions = scheduler.flex_model["time_restrictions"] + time_restrictions = time_restrictions.tz_convert(tz) + + assert (schedule[time_restrictions] == 0).all() + + +@pytest.mark.parametrize( + "process_type, time_restrictions", + [ + ("BREAKABLE", [{"start": "2015-01-02T00:00:00+01:00", "duration": "PT24H"}]), + ("INFLEXIBLE", [{"start": "2015-01-02T03:00:00+01:00", "duration": "PT21H"}]), + ("SHIFTABLE", [{"start": "2015-01-02T03:00:00+01:00", "duration": "PT21H"}]), + ], +) +def test_impossible_schedules( + add_battery_assets, process, process_type, time_restrictions +): + """ + Test schedules with time restrictions that make a 4h block not fit anytime during the + planned window. + """ + + # get the sensors from the database + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + flex_model = { + "duration": "PT4H", + "process-type": process_type, + "power": 4, + "time-restrictions": time_restrictions, + } + flex_context = { + "consumption-price-sensor": epex_da.id, + } + + scheduler = ProcessScheduler( + process, + start, + end, + resolution, + flex_model=flex_model, + flex_context=flex_context, + ) + + with pytest.raises(ValueError): + scheduler.compute() diff --git a/flexmeasures/data/schemas/__init__.py b/flexmeasures/data/schemas/__init__.py index 64be51f77..5965fb501 100644 --- a/flexmeasures/data/schemas/__init__.py +++ b/flexmeasures/data/schemas/__init__.py @@ -6,4 +6,4 @@ from .generic_assets import GenericAssetIdField as AssetIdField # noqa F401 from .sensors import SensorIdField # noqa F401 from .sources import DataSourceIdField as SourceIdField # noqa F401 -from .times import AwareDateTimeField, DurationField # noqa F401 +from .times import AwareDateTimeField, DurationField, TimeIntervalField # noqa F401 diff --git a/flexmeasures/data/schemas/scheduling/process.py b/flexmeasures/data/schemas/scheduling/process.py new file mode 100644 index 000000000..48906b135 --- /dev/null +++ b/flexmeasures/data/schemas/scheduling/process.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +from datetime import datetime +import pytz +import pandas as pd + +from marshmallow import ( + Schema, + post_load, + fields, + pre_load, +) + +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.schemas.times import ( + DurationField, + TimeIntervalSchema, +) + + +from enum import Enum + + +class ProcessType(Enum): + INFLEXIBLE = "INFLEXIBLE" + BREAKABLE = "BREAKABLE" + SHIFTABLE = "SHIFTABLE" + + +class OptimizationDirection(Enum): + MAX = "MAX" + MIN = "MIN" + + +class ProcessSchedulerFlexModelSchema(Schema): + # time that the process last. + duration = DurationField(required=True) + # nominal power of the process. + power = fields.Float(required=True) + # policy to schedule a process: INFLEXIBLE, SHIFTABLE, BREAKABLE + process_type = fields.Enum( + ProcessType, load_default=ProcessType.INFLEXIBLE, data_key="process-type" + ) + # time_restrictions will be turned into a Series with Boolean values (where True means restricted for scheduling). + time_restrictions = fields.List( + fields.Nested(TimeIntervalSchema()), + data_key="time-restrictions", + load_default=[], + ) + # objective of the scheduler, to maximize or minimize. + optimization_direction = fields.Enum( + OptimizationDirection, + load_default=OptimizationDirection.MIN, + data_key="optimization-sense", + ) + + def __init__(self, sensor: Sensor, start: datetime, end: datetime, *args, **kwargs): + """Pass start and end to convert time_restrictions into a time series and sensor + as a fallback mechanism for the process_type + """ + self.start = start.astimezone(pytz.utc) + self.end = end.astimezone(pytz.utc) + self.sensor = sensor + super().__init__(*args, **kwargs) + + def get_mask_from_events(self, events: list[dict[str, str]] | None) -> pd.Series: + """Convert events to a mask of the time periods that are valid + + :param events: list of events defined as dictionaries with a start and duration + :return: mask of the allowed time periods + """ + series = pd.Series( + index=pd.date_range( + self.start, + self.end, + freq=self.sensor.event_resolution, + inclusive="left", + name="event_start", + tz=self.start.tzinfo, + ), + data=False, + ) + + if events is None: + return series + + for event in events: + start = event["start"] + duration = event["duration"] + end = start + duration + series[(series.index >= start) & (series.index < end)] = True + + return series + + @post_load + def post_load_time_restrictions(self, data: dict, **kwargs) -> dict: + """Convert events (list of [start, duration] pairs) into a mask (pandas Series)""" + + data["time_restrictions"] = self.get_mask_from_events(data["time_restrictions"]) + + return data + + @pre_load + def pre_load_process_type(self, data: dict, **kwargs) -> dict: + """Fallback mechanism for the process_type variable. If not found in data, + it tries to find it in among the sensor or asset attributes and, if it's not found + there either, it defaults to "INFLEXIBLE". + """ + if "process-type" not in data or data["process-type"] is None: + process_type = self.sensor.get_attribute("process-type") + + if process_type is None: + process_type = self.sensor.generic_asset.get_attribute("process-type") + + if process_type is None: + process_type = "INFLEXIBLE" + + data["process-type"] = process_type + + return data diff --git a/flexmeasures/data/schemas/tests/conftest.py b/flexmeasures/data/schemas/tests/conftest.py new file mode 100644 index 000000000..58f947ec3 --- /dev/null +++ b/flexmeasures/data/schemas/tests/conftest.py @@ -0,0 +1,30 @@ +import pytest +from datetime import timedelta + +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType + + +@pytest.fixture(scope="module") +def setup_dummy_sensors(db, app): + + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") + db.session.add(dummy_asset_type) + + dummy_asset = GenericAsset( + name="DummyGenericAsset", generic_asset_type=dummy_asset_type + ) + db.session.add(dummy_asset) + + sensor1 = Sensor( + "sensor 1", generic_asset=dummy_asset, event_resolution=timedelta(hours=1) + ) + db.session.add(sensor1) + sensor2 = Sensor( + "sensor 2", generic_asset=dummy_asset, event_resolution=timedelta(hours=1) + ) + db.session.add(sensor2) + + db.session.commit() + + yield sensor1, sensor2 diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index cb5490052..8d30e0013 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -1,6 +1,3 @@ -from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType - from flexmeasures.data.schemas.reporting.pandas_reporter import ( PandasReporterConfigSchema, ) @@ -9,32 +6,6 @@ import pytest -@pytest.fixture(scope="module") -def setup_dummy_sensors(db, app): - - dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") - db.session.add(dummy_asset_type) - - dummy_asset = GenericAsset( - name="DummyGenericAsset", generic_asset_type=dummy_asset_type - ) - db.session.add(dummy_asset) - - sensor1 = Sensor("sensor 1", generic_asset=dummy_asset) - db.session.add(sensor1) - sensor2 = Sensor("sensor 2", generic_asset=dummy_asset) - db.session.add(sensor2) - - db.session.commit() - - yield sensor1, sensor2 - - db.session.delete(sensor1) - db.session.delete(sensor2) - - db.session.commit() - - @pytest.mark.parametrize( "reporter_config, is_valid", [ diff --git a/flexmeasures/data/schemas/tests/test_scheduling.py b/flexmeasures/data/schemas/tests/test_scheduling.py new file mode 100644 index 000000000..ec24eb3a0 --- /dev/null +++ b/flexmeasures/data/schemas/tests/test_scheduling.py @@ -0,0 +1,75 @@ +from flexmeasures.data.schemas.scheduling.process import ( + ProcessSchedulerFlexModelSchema, + ProcessType, +) + +from datetime import datetime +import pytz + + +def test_process_scheduler_flex_model_load(db, app, setup_dummy_sensors): + + sensor1, _ = setup_dummy_sensors + + schema = ProcessSchedulerFlexModelSchema( + sensor=sensor1, + start=datetime(2023, 1, 1, tzinfo=pytz.UTC), + end=datetime(2023, 1, 2, tzinfo=pytz.UTC), + ) + + process_scheduler_flex_model = schema.load( + { + "duration": "PT4H", + "power": 30.0, + "time-restrictions": [ + {"start": "2023-01-01T00:00:00+00:00", "duration": "PT3H"} + ], + } + ) + + assert process_scheduler_flex_model["process_type"] == ProcessType.INFLEXIBLE + + +def test_process_scheduler_flex_model_process_type(db, app, setup_dummy_sensors): + + sensor1, _ = setup_dummy_sensors + + # checking default + + schema = ProcessSchedulerFlexModelSchema( + sensor=sensor1, + start=datetime(2023, 1, 1, tzinfo=pytz.UTC), + end=datetime(2023, 1, 2, tzinfo=pytz.UTC), + ) + + process_scheduler_flex_model = schema.load( + { + "duration": "PT4H", + "power": 30.0, + "time-restrictions": [ + {"start": "2023-01-01T00:00:00+00:00", "duration": "PT3H"} + ], + } + ) + + assert process_scheduler_flex_model["process_type"] == ProcessType.INFLEXIBLE + + sensor1.attributes["process-type"] = "SHIFTABLE" + + schema = ProcessSchedulerFlexModelSchema( + sensor=sensor1, + start=datetime(2023, 1, 1, tzinfo=pytz.UTC), + end=datetime(2023, 1, 2, tzinfo=pytz.UTC), + ) + + process_scheduler_flex_model = schema.load( + { + "duration": "PT4H", + "power": 30.0, + "time-restrictions": [ + {"start": "2023-01-01T00:00:00+00:00", "duration": "PT3H"} + ], + } + ) + + assert process_scheduler_flex_model["process_type"] == ProcessType.SHIFTABLE diff --git a/flexmeasures/data/schemas/times.py b/flexmeasures/data/schemas/times.py index f2f931356..b56d70352 100644 --- a/flexmeasures/data/schemas/times.py +++ b/flexmeasures/data/schemas/times.py @@ -1,9 +1,11 @@ from __future__ import annotations +import json from datetime import datetime, timedelta from flask import current_app -from marshmallow import fields +from marshmallow import fields, Schema +from marshmallow.exceptions import ValidationError import isodate from isodate.isoerror import ISO8601Error import pandas as pd @@ -84,3 +86,20 @@ def _deserialize(self, value: str, attr, obj, **kwargs) -> datetime: """ value = value.replace(" ", "+") return fields.AwareDateTime._deserialize(self, value, attr, obj, **kwargs) + + +class TimeIntervalSchema(Schema): + start = AwareDateTimeField(required=True) + duration = DurationField(required=True) + + +class TimeIntervalField(MarshmallowClickMixin, fields.Dict): + """Field that de-serializes to a TimeInverval defined with start and duration.""" + + def _deserialize(self, value: str, attr, obj, **kwargs) -> dict: + try: + v = json.loads(value) + except json.JSONDecodeError: + raise ValidationError() + + return TimeIntervalSchema().load(v) diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index 526085874..df56e3c9a 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -19,6 +19,7 @@ from flexmeasures.data import db from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.storage import StorageScheduler +from flexmeasures.data.models.planning.process import ProcessScheduler from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.utils import get_data_source, save_to_db @@ -194,6 +195,8 @@ def find_scheduler_class(sensor: Sensor) -> type: "two-way_evse", ): scheduler_class = StorageScheduler + elif sensor.generic_asset.generic_asset_type.name in ("process", "load"): + scheduler_class = ProcessScheduler else: raise ValueError( "Scheduling is not (yet) supported for asset type %s."