diff --git a/documentation/changelog.rst b/documentation/changelog.rst index ae323c1a9..3261f28a0 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -5,10 +5,12 @@ FlexMeasures Changelog v0.12.0 | October XX, 2022 ============================ +.. warning:: Upgrading to this version requires running ``flexmeasures db upgrade`` (you can create a backup first with ``flexmeasures db-ops dump``). + New features ------------- -* Hit the replay button to replay what happened, available on the sensor and asset pages [see `PR #463 `_] +* Hit the replay button to visually replay what happened, available on the sensor and asset pages [see `PR #463 `_] * Ability to provide your own custom scheduling function [see `PR #505 `_] * Visually distinguish forecasts/schedules (dashed lines) from measurements (solid lines), and expand the tooltip with timing info regarding the forecast/schedule horizon or measurement lag [see `PR #503 `_] * The asset page also allows to show sensor data from other assets that belong to the same account [see `PR #500 `_] @@ -28,6 +30,7 @@ Infrastructure / Support * Remove bokeh dependency and obsolete UI views [see `PR #476 `_] * Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` not working in docker containers [see `PR #530 `_] and incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed [see `PR #526 `_] * Plugins can save BeliefsSeries, too, instead of just BeliefsDataFrames [see `PR #523 `_] +* Improve documentation and code w.r.t. storage flexibility modelling ― prepare for handling other schedulers & merge battery and car charging schedulers [see `PR #511 `_] * Revised strategy for removing unchanged beliefs when saving data: retain the oldest measurement (ex-post belief), too [see `PR #518 `_] @@ -84,7 +87,7 @@ Bugfixes * The docker-based tutorial now works with UI on all platforms (port 5000 did not expose on MacOS) [see `PR #465 `_] * Fix interpretation of scheduling results in toy tutorial [see `PR #466 `_ and `PR #475 `_] * Avoid formatting datetime.timedelta durations as nominal ISO durations [see `PR #459 `_] -* Account admins cannot add assets to other accounts anymore; and they are shown a button for asset creation in UI [see `PR #488 `_] +* Account admins cannot add assets to other accounts any more; and they are shown a button for asset creation in UI [see `PR #488 `_] Infrastructure / Support ---------------------- diff --git a/documentation/plugin/customisation.rst b/documentation/plugin/customisation.rst index bbe51fafe..d13401d6b 100644 --- a/documentation/plugin/customisation.rst +++ b/documentation/plugin/customisation.rst @@ -15,8 +15,8 @@ but in the background your custom scheduling algorithm is being used. Let's walk through an example! -First, we need to write a function which accepts arguments just like the in-built schedulers (their code is `here `_). -The following minimal example gives you an idea of the inputs and outputs: +First, we need to write a a class (inhering from the Base Scheduler) with a `schedule` function which accepts arguments just like the in-built schedulers (their code is `here `_). +The following minimal example gives you an idea of some meta information you can add for labelling your data, as well as the inputs and outputs of such a scheduling function: .. code-block:: python @@ -24,23 +24,36 @@ The following minimal example gives you an idea of the inputs and outputs: import pandas as pd from pandas.tseries.frequencies import to_offset from flexmeasures.data.models.time_series import Sensor + from flexmeasures.data.models.planning import Scheduler - def compute_a_schedule( - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - *args, - **kwargs - ): - """Just a dummy scheduler, advising to do nothing""" - return pd.Series( - 0, index=pd.date_range(start, end, freq=resolution, closed="left") - ) + + class DummyScheduler(Scheduler): + + __author__ = "My Company" + __version__ = "2" + + def schedule( + self, + sensor: Sensor, + start: datetime, + end: datetime, + resolution: timedelta, + *args, + **kwargs + ): + """ + Just a dummy scheduler that always plans to consume at maximum capacity. + (Schedulers return positive values for consumption, and negative values for production) + """ + return pd.Series( + sensor.get_attribute("capacity_in_mw"), + index=pd.date_range(start, end, freq=resolution, closed="left"), + ) .. note:: It's possible to add arguments that describe the asset flexibility and the EMS context in more detail. For example, - for storage assets we support various state-of-charge parameters. For now, the existing schedulers are the best documentation. + for storage assets we support various state-of-charge parameters. For now, the existing in-built schedulers are the best documentation. + We are working on documenting this better, so the learning curve becomes easier. Finally, make your scheduler be the one that FlexMeasures will use for certain sensors: @@ -52,8 +65,7 @@ Finally, make your scheduler be the one that FlexMeasures will use for certain s scheduler_specs = { "module": "flexmeasures.data.tests.dummy_scheduler", # or a file path, see note below - "function": "compute_a_schedule", - "source": "My Company" + "class": "DummyScheduler", } my_sensor = Sensor.query.filter(Sensor.name == "My power sensor on a flexible asset").one_or_none() diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py index de0fc0ae4..b44fc6738 100644 --- a/flexmeasures/api/common/schemas/sensor_data.py +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -14,6 +14,7 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.api.common.schemas.sensors import SensorField from flexmeasures.api.common.utils.api_utils import upsample_values +from flexmeasures.data.models.planning.utils import initialize_index from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField from flexmeasures.data.services.time_series import simplify_index from flexmeasures.utils.time_utils import duration_isoformat, server_now @@ -179,9 +180,7 @@ def dump_bdf(self, sensor_data_description: dict, **kwargs) -> dict: ) # Convert to desired time range - index = pd.date_range( - start=start, end=end, freq=df.event_resolution, closed="left" - ) + index = initialize_index(start=start, end=end, resolution=df.event_resolution) df = df.reindex(index) # Convert to desired unit diff --git a/flexmeasures/api/v1_2/implementations.py b/flexmeasures/api/v1_2/implementations.py index 5f5a9fef3..1ae9f964e 100644 --- a/flexmeasures/api/v1_2/implementations.py +++ b/flexmeasures/api/v1_2/implementations.py @@ -32,13 +32,14 @@ parse_isodate_str, ) from flexmeasures.data import db -from flexmeasures.data.models.planning.battery import schedule_battery +from flexmeasures.data.models.planning.storage import StorageScheduler from flexmeasures.data.models.planning.exceptions import ( UnknownMarketException, UnknownPricesException, ) from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.services.resources import has_assets, can_access_asset +from flexmeasures.data.models.planning.utils import ensure_storage_specs from flexmeasures.utils.time_utils import duration_isoformat @@ -93,17 +94,20 @@ def get_device_message_response(generic_asset_name_groups, duration): start = datetime.fromisoformat( sensor.generic_asset.get_attribute("soc_datetime") ) + end = start + planning_horizon resolution = sensor.event_resolution # Schedule the asset + storage_specs = dict( + soc_at_start=sensor.generic_asset.get_attribute("soc_in_mwh"), + prefer_charging_sooner=False, + ) + storage_specs = ensure_storage_specs( + storage_specs, sensor, start, end, resolution + ) try: - schedule = schedule_battery( - sensor, - start, - start + planning_horizon, - resolution, - soc_at_start=sensor.generic_asset.get_attribute("soc_in_mwh"), - prefer_charging_sooner=False, + schedule = StorageScheduler().schedule( + sensor, start, end, resolution, storage_specs=storage_specs ) except UnknownPricesException: return unknown_prices() diff --git a/flexmeasures/api/v1_3/implementations.py b/flexmeasures/api/v1_3/implementations.py index 8b2a5b857..88996e16c 100644 --- a/flexmeasures/api/v1_3/implementations.py +++ b/flexmeasures/api/v1_3/implementations.py @@ -39,11 +39,14 @@ parse_isodate_str, ) from flexmeasures.data import db -from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.queries.utils import simplify_index from flexmeasures.data.services.resources import has_assets, can_access_asset -from flexmeasures.data.services.scheduling import create_scheduling_job +from flexmeasures.data.services.scheduling import ( + create_scheduling_job, + get_data_source_for_job, +) from flexmeasures.utils.time_utils import duration_isoformat @@ -99,6 +102,7 @@ def get_device_message_response(generic_asset_name_groups, duration): if event_type not in ("soc", "soc-with-targets"): return unrecognized_event_type(event_type) connection = current_app.queues["scheduling"].connection + job = None try: # First try the scheduling queue job = Job.fetch(event, connection=connection) except NoSuchJobError: # Then try the most recent event_id (stored as a generic asset attribute) @@ -144,19 +148,15 @@ def get_device_message_response(generic_asset_name_groups, duration): return unknown_schedule("Scheduling job has an unknown status.") schedule_start = job.kwargs["start"] - schedule_data_source_name = "Seita" - scheduler_source = DataSource.query.filter_by( - name=schedule_data_source_name, type="scheduling script" - ).one_or_none() - if scheduler_source is None: + data_source = get_data_source_for_job(job, sensor=sensor) + if data_source is None: return unknown_schedule( - message + f'no data is known from "{schedule_data_source_name}".' + message + f"no data source could be found for job {job}." ) - power_values = sensor.search_beliefs( event_starts_after=schedule_start, event_ends_before=schedule_start + planning_horizon, - source=scheduler_source, + source=data_source, most_recent_beliefs_only=True, one_deterministic_belief_per_event=True, ) @@ -301,11 +301,12 @@ def post_udi_event_response(unit: str, prior: datetime): start_of_schedule = datetime end_of_schedule = datetime + current_app.config.get("FLEXMEASURES_PLANNING_HORIZON") resolution = sensor.event_resolution - soc_targets = pd.Series( + soc_targets = initialize_series( np.nan, - index=pd.date_range( - start_of_schedule, end_of_schedule, freq=resolution, closed="right" - ), # note that target values are indexed by their due date (i.e. closed="right") + start=start_of_schedule, + end=end_of_schedule, + resolution=resolution, + inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") ) if event_type == "soc-with-targets": @@ -359,16 +360,18 @@ def post_udi_event_response(unit: str, prior: datetime): soc_targets.loc[target_datetime] = target_value create_scheduling_job( - sensor_id, + sensor, start_of_schedule, end_of_schedule, resolution=resolution, belief_time=prior, # server time if no prior time was sent - soc_at_start=value, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, + storage_specs=dict( + soc_at_start=value, + soc_targets=soc_targets, + soc_min=soc_min, + soc_max=soc_max, + roundtrip_efficiency=roundtrip_efficiency, + ), job_id=form.get("event"), enqueue=True, ) diff --git a/flexmeasures/api/v1_3/tests/test_api_v1_3.py b/flexmeasures/api/v1_3/tests/test_api_v1_3.py index 1835e3f84..aae8fda77 100644 --- a/flexmeasures/api/v1_3/tests/test_api_v1_3.py +++ b/flexmeasures/api/v1_3/tests/test_api_v1_3.py @@ -88,9 +88,10 @@ def test_post_udi_event_and_get_device_message( ) # check results are in the database - resolution = timedelta(minutes=15) + job.refresh() # catch meta info that was added on this very instance + data_source_info = job.meta.get("data_source_info") scheduler_source = DataSource.query.filter_by( - name="Seita", type="scheduling script" + type="scheduling script", **data_source_info ).one_or_none() assert ( scheduler_source is not None @@ -100,6 +101,7 @@ def test_post_udi_event_and_get_device_message( .filter(TimedBelief.source_id == scheduler_source.id) .all() ) + resolution = timedelta(minutes=15) consumption_schedule = pd.Series( [-v.event_value for v in power_values], index=pd.DatetimeIndex([v.event_start for v in power_values], freq=resolution), diff --git a/flexmeasures/api/v3_0/sensors.py b/flexmeasures/api/v3_0/sensors.py index 35f0248b8..901d63bb2 100644 --- a/flexmeasures/api/v3_0/sensors.py +++ b/flexmeasures/api/v3_0/sensors.py @@ -10,7 +10,6 @@ from marshmallow import validate, fields, Schema from marshmallow.validate import OneOf import numpy as np -import pandas as pd from rq.job import Job, NoSuchJobError from timely_beliefs import BeliefsDataFrame from webargs.flaskparser import use_args, use_kwargs @@ -34,7 +33,7 @@ from flexmeasures.api.common.schemas.users import AccountIdField from flexmeasures.api.common.utils.api_utils import save_and_enqueue from flexmeasures.auth.decorators import permission_required_for_context -from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.user import Account from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.queries.utils import simplify_index @@ -42,7 +41,10 @@ from flexmeasures.data.schemas.units import QuantityField from flexmeasures.data.schemas import AwareDateTimeField from flexmeasures.data.services.sensors import get_sensors -from flexmeasures.data.services.scheduling import create_scheduling_job +from flexmeasures.data.services.scheduling import ( + create_scheduling_job, + get_data_source_for_job, +) from flexmeasures.utils.time_utils import duration_isoformat from flexmeasures.utils.unit_utils import ur @@ -204,7 +206,7 @@ def get_data(self, response: dict): validate=validate.Range(min=0, max=1), data_key="roundtrip-efficiency", ), - "value": fields.Float(data_key="soc-at-start"), + "start_value": fields.Float(data_key="soc-at-start"), "soc_min": fields.Float(data_key="soc-min"), "soc_max": fields.Float(data_key="soc-max"), "start_of_schedule": AwareDateTimeField( @@ -220,6 +222,9 @@ def get_data(self, response: dict): ), ), # todo: allow unit to be set per field, using QuantityField("%", validate=validate.Range(min=0, max=1)) "targets": fields.List(fields.Nested(TargetSchema), data_key="soc-targets"), + "prefer_charging_sooner": fields.Bool( + data_key="prefer-charging-sooner", required=False + ), # todo: add a duration parameter, instead of falling back to FLEXMEASURES_PLANNING_HORIZON "consumption_price_sensor": SensorIdField( data_key="consumption-price-sensor", required=False @@ -241,6 +246,7 @@ def trigger_schedule( # noqa: C901 unit: str, prior: datetime, roundtrip_efficiency: Optional[ur.Quantity] = None, + prefer_charging_sooner: Optional[bool] = True, consumption_price_sensor: Optional[Sensor] = None, production_price_sensor: Optional[Sensor] = None, inflexible_device_sensors: Optional[List[Sensor]] = None, @@ -251,11 +257,37 @@ def trigger_schedule( # noqa: C901 .. :quickref: Schedule; Trigger scheduling job - The message should contain a flexibility model. + Trigger FlexMeasures to create a schedule for this sensor. + The assumption is that this sensor is the power sensor on a flexible asset. + + In this request, you can describe: + + - the schedule (start, unit, prior) + - the flexibility model for the sensor (see below, only storage models are supported at the moment) + - the EMS the sensor operates in (inflexible device sensors, and sensors that put a price on consumption and/or production) + + Note: This endpoint does not support to schedule an EMS with multiple flexible sensors at once. This will happen in another endpoint. + See https://github.com/FlexMeasures/flexmeasures/issues/485. Until then, it is possible to call this endpoint for one flexible endpoint at a time + (considering already scheduled sensors as inflexible). + + Flexibility models apply to the sensor's asset type: + + 1) For storage sensors (e.g. battery, charge points), the schedule deals with the state of charge (SOC). + The possible flexibility parameters are: + + - soc-at-start (defaults to 0) + - soc-unit (kWh or MWh) + - soc-min (defaults to 0) + - soc-max (defaults to max soc target) + - soc-targets (defaults to NaN values) + - roundtrip-efficiency (defaults to 100%) + - prefer-charging-sooner (defaults to True, also signals a preference to discharge later) + + 2) Heat pump sensors are work in progress. **Example request A** - This message triggers a schedule starting at 10.00am, at which the state of charge (soc) is 12.1 kWh. + This message triggers a schedule for a storage asset, starting at 10.00am, at which the state of charge (soc) is 12.1 kWh. .. code-block:: json @@ -324,19 +356,19 @@ def trigger_schedule( # noqa: C901 # todo: if a soc-sensor entity address is passed, persist those values to the corresponding sensor # (also update the note in posting_data.rst about flexibility states not being persisted). - # get value - if "value" not in kwargs: + # get starting value + if "start_value" not in kwargs: return ptus_incomplete() try: - value = float(kwargs.get("value")) # type: ignore + start_value = float(kwargs.get("start_value")) # type: ignore except ValueError: extra_info = "Request includes empty or ill-formatted value(s)." current_app.logger.warning(extra_info) return ptus_incomplete(extra_info) if unit == "kWh": - value = value / 1000.0 + start_value = start_value / 1000.0 - # Convert round-trip efficiency to dimensionless + # Convert round-trip efficiency to dimensionless (to the (0,1] range) if roundtrip_efficiency is not None: roundtrip_efficiency = roundtrip_efficiency.to( ur.Quantity("dimensionless") @@ -345,6 +377,7 @@ def trigger_schedule( # noqa: C901 # get optional min and max SOC soc_min = kwargs.get("soc_min", None) soc_max = kwargs.get("soc_max", None) + # TODO: review when we moved away from capacity having to be described in MWh if soc_min is not None and unit == "kWh": soc_min = soc_min / 1000.0 if soc_max is not None and unit == "kWh": @@ -355,13 +388,14 @@ def trigger_schedule( # noqa: C901 "FLEXMEASURES_PLANNING_HORIZON" ) resolution = sensor.event_resolution - soc_targets = pd.Series( + soc_targets = initialize_series( np.nan, - index=pd.date_range( - start_of_schedule, end_of_schedule, freq=resolution, closed="right" - ), # note that target values are indexed by their due date (i.e. closed="right") + start=start_of_schedule, + end=end_of_schedule, + resolution=resolution, + inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") ) - # todo: move deserialization of targets into TargetSchema + # todo: move this deserialization of targets into newly-created ScheduleTargetSchema for target in kwargs.get("targets", []): # get target value @@ -406,16 +440,19 @@ def trigger_schedule( # noqa: C901 soc_targets.loc[target_datetime] = target_value job = create_scheduling_job( - sensor.id, + sensor, start_of_schedule, end_of_schedule, resolution=resolution, belief_time=prior, # server time if no prior time was sent - soc_at_start=value, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, + storage_specs=dict( + soc_at_start=start_value, + soc_targets=soc_targets, + soc_min=soc_min, + soc_max=soc_max, + roundtrip_efficiency=roundtrip_efficiency, + prefer_charging_sooner=prefer_charging_sooner, + ), consumption_price_sensor=consumption_price_sensor, production_price_sensor=production_price_sensor, inflexible_device_sensors=inflexible_device_sensors, @@ -518,21 +555,15 @@ def get_schedule(self, sensor: Sensor, job_id: str, duration: timedelta, **kwarg return unknown_schedule("Scheduling job has an unknown status.") schedule_start = job.kwargs["start"] - schedule_data_source_name = "Seita" - if "data_source_name" in job.meta: - schedule_data_source_name = job.meta["data_source_name"] - scheduler_source = DataSource.query.filter_by( - name=schedule_data_source_name, type="scheduling script" - ).one_or_none() - if scheduler_source is None: + data_source = get_data_source_for_job(job, sensor=sensor) + if data_source is None: return unknown_schedule( - error_message + f'no data is known from "{schedule_data_source_name}".' + error_message + f"no data source could be found for {data_source}." ) - power_values = sensor.search_beliefs( event_starts_after=schedule_start, event_ends_before=schedule_start + planning_horizon, - source=scheduler_source, + source=data_source, most_recent_beliefs_only=True, one_deterministic_belief_per_event=True, ) diff --git a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py index d133671c3..d15b38174 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_schedules.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_schedules.py @@ -9,10 +9,12 @@ from flexmeasures.api.tests.utils import get_auth_token from flexmeasures.api.v1_3.tests.utils import message_for_get_device_message from flexmeasures.api.v3_0.tests.utils import message_for_post_udi_event -from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.tests.utils import work_on_rq -from flexmeasures.data.services.scheduling import handle_scheduling_exception +from flexmeasures.data.services.scheduling import ( + handle_scheduling_exception, + get_data_source_for_job, +) from flexmeasures.utils.calculations import integrate_time_series @@ -66,18 +68,19 @@ def test_trigger_and_get_schedule( ) # check results are in the database - resolution = timedelta(minutes=15) - scheduler_source = DataSource.query.filter_by( - name="Seita", type="scheduling script" - ).one_or_none() - assert ( - scheduler_source is not None - ) # Make sure the scheduler data source is now there + + # First, make sure the scheduler data source is now there + job.refresh() # catch meta info that was added on this very instance + scheduler_source = get_data_source_for_job(job) + assert scheduler_source is not None + + # Then, check if the data was created power_values = ( TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id) .filter(TimedBelief.source_id == scheduler_source.id) .all() ) + resolution = timedelta(minutes=15) consumption_schedule = pd.Series( [-v.event_value for v in power_values], index=pd.DatetimeIndex([v.event_start for v in power_values], freq=resolution), diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index d044f4218..f91244a03 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -28,6 +28,7 @@ from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.data.services.scheduling import make_schedule, create_scheduling_job from flexmeasures.data.services.users import create_user +from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts from flexmeasures.data.models.time_series import ( Sensor, @@ -956,14 +957,12 @@ def create_schedule( except MissingAttributeException: click.echo(f"{power_sensor} has no {attribute} attribute.") raise click.Abort() - soc_targets = pd.Series( + soc_targets = initialize_series( np.nan, - index=pd.date_range( - pd.Timestamp(start).tz_convert(power_sensor.timezone), - pd.Timestamp(end).tz_convert(power_sensor.timezone), - freq=power_sensor.event_resolution, - closed="right", - ), # note that target values are indexed by their due date (i.e. closed="right") + start=pd.Timestamp(start).tz_convert(power_sensor.timezone), + end=pd.Timestamp(end).tz_convert(power_sensor.timezone), + resolution=power_sensor.event_resolution, + inclusive="right", # note that target values are indexed by their due date (i.e. inclusive="right") ) # Convert round-trip efficiency to dimensionless @@ -992,16 +991,18 @@ def create_schedule( if as_job: job = create_scheduling_job( - sensor_id=power_sensor.id, + sensor=power_sensor, start_of_schedule=start, end_of_schedule=end, belief_time=server_now(), resolution=power_sensor.event_resolution, - soc_at_start=soc_at_start, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, + storage_specs=dict( + soc_at_start=soc_at_start, + soc_targets=soc_targets, + soc_min=soc_min, + soc_max=soc_max, + roundtrip_efficiency=roundtrip_efficiency, + ), consumption_price_sensor=consumption_price_sensor, production_price_sensor=production_price_sensor, ) diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index b8455cb98..c6a613f9c 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -26,6 +26,7 @@ from flexmeasures.data.models.assets import AssetType, Asset from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning.utils import initialize_index from flexmeasures.data.models.markets import Market, MarketType from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.user import User, Account, AccountRole @@ -494,12 +495,10 @@ def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets, setup_sources """Add two days of market prices for the EPEX day-ahead market.""" # one day of test data (one complete sine curve) - time_slots = pd.date_range( - datetime(2015, 1, 1), - datetime(2015, 1, 2), - freq="1H", - closed="left", - tz="Europe/Amsterdam", + time_slots = initialize_index( + start=pd.Timestamp("2015-01-01").tz_localize("Europe/Amsterdam"), + end=pd.Timestamp("2015-01-02").tz_localize("Europe/Amsterdam"), + resolution="1H", ) values = [ random() * (1 + np.sin(x * 2 * np.pi / 24)) for x in range(len(time_slots)) @@ -517,12 +516,10 @@ def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets, setup_sources db.session.add_all(day1_beliefs) # another day of test data (8 expensive hours, 8 cheap hours, and again 8 expensive hours) - time_slots = pd.date_range( - datetime(2015, 1, 2), - datetime(2015, 1, 3), - freq="1H", - closed="left", - tz="Europe/Amsterdam", + time_slots = initialize_index( + start=pd.Timestamp("2015-01-02").tz_localize("Europe/Amsterdam"), + end=pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam"), + resolution="1H", ) values = [100] * 8 + [90] * 8 + [100] * 8 day2_beliefs = [ diff --git a/flexmeasures/data/migrations/versions/650b085c0ad3_consolidate_data_source_after_storage_.py b/flexmeasures/data/migrations/versions/650b085c0ad3_consolidate_data_source_after_storage_.py new file mode 100644 index 000000000..26fbe36b4 --- /dev/null +++ b/flexmeasures/data/migrations/versions/650b085c0ad3_consolidate_data_source_after_storage_.py @@ -0,0 +1,27 @@ +"""Consolidate data source after storage schedulers merged + +Revision ID: 650b085c0ad3 +Revises: 30f7b63069e1 +Create Date: 2022-11-16 07:07:44.281943 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "650b085c0ad3" +down_revision = "30f7b63069e1" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute( + "Update data_source set version='1', model='StorageScheduler' where name = 'Seita' and type='scheduling script';" + ) + + +def downgrade(): + op.execute( + "Update data_source set version=null, model=null where name = 'Seita' and type='scheduling script' and version='1' and model='StorageScheduler';" + ) diff --git a/flexmeasures/data/models/planning/__init__.py b/flexmeasures/data/models/planning/__init__.py index e69de29bb..622b2ccf4 100644 --- a/flexmeasures/data/models/planning/__init__.py +++ b/flexmeasures/data/models/planning/__init__.py @@ -0,0 +1,11 @@ +from typing import Optional +import pandas as pd + + +class Scheduler: + """ + Superclass for all FlexMeasures Schedulers + """ + + def schedule(*args, **kwargs) -> Optional[pd.Series]: + return None diff --git a/flexmeasures/data/models/planning/battery.py b/flexmeasures/data/models/planning/battery.py index d6e3380e0..c8004792a 100644 --- a/flexmeasures/data/models/planning/battery.py +++ b/flexmeasures/data/models/planning/battery.py @@ -1,180 +1,11 @@ -from typing import List, Optional, Union -from datetime import datetime, timedelta +from flexmeasures.data.models.planning.storage import StorageScheduler -import pandas as pd -from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.planning.solver import device_scheduler -from flexmeasures.data.models.planning.utils import ( - initialize_df, - initialize_series, - add_tiny_price_slope, - get_prices, - get_power_values, - fallback_charging_policy, -) +def schedule_battery(*args, **kwargs): + import warnings - -def schedule_battery( - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - soc_at_start: float, - soc_targets: Optional[pd.Series] = None, - soc_min: Optional[float] = None, - soc_max: Optional[float] = None, - roundtrip_efficiency: Optional[float] = None, - prefer_charging_sooner: bool = True, - consumption_price_sensor: Optional[Sensor] = None, - production_price_sensor: Optional[Sensor] = None, - inflexible_device_sensors: Optional[List[Sensor]] = None, - belief_time: Optional[datetime] = None, - round_to_decimals: Optional[int] = 6, -) -> Union[pd.Series, None]: - """Schedule a battery asset based directly on the latest beliefs regarding market prices within the specified time - window. - For the resulting consumption schedule, consumption is defined as positive values. - """ - - # Check for required Sensor attributes - sensor.check_required_attributes( - [ - ("capacity_in_mw", (float, int)), - ("max_soc_in_mwh", (float, int)), - ("min_soc_in_mwh", (float, int)), - ], - ) - - # Check for round-trip efficiency - if roundtrip_efficiency is None: - # Get default from sensor, or use 100% otherwise - roundtrip_efficiency = sensor.get_attribute("roundtrip_efficiency", 1) - if roundtrip_efficiency <= 0 or roundtrip_efficiency > 1: - raise ValueError("roundtrip_efficiency expected within the interval (0, 1]") - - # Check for min and max SOC, or get default from sensor - if soc_min is None: - soc_min = sensor.get_attribute("min_soc_in_mwh") - if soc_max is None: - soc_max = sensor.get_attribute("max_soc_in_mwh") - - # Check for known prices or price forecasts, trimming planning window accordingly - up_deviation_prices, (start, end) = get_prices( - (start, end), - resolution, - beliefs_before=belief_time, - price_sensor=consumption_price_sensor, - sensor=sensor, - allow_trimmed_query_window=True, + warnings.warn( + "The schedule_battery method is deprecated and will be removed from flexmeasures in a future version. Replace with StorageScheduler().schedule to suppress this warning.", + FutureWarning, ) - down_deviation_prices, (start, end) = get_prices( - (start, end), - resolution, - beliefs_before=belief_time, - price_sensor=production_price_sensor, - sensor=sensor, - allow_trimmed_query_window=True, - ) - - start = pd.Timestamp(start).tz_convert("UTC") - end = pd.Timestamp(end).tz_convert("UTC") - if soc_targets is not None: - # soc targets are at the end of each time slot, while prices are indexed by the start of each time slot - soc_targets = soc_targets.tz_convert("UTC") - soc_targets = soc_targets[start + resolution : end] - - # Add tiny price slope to prefer charging now rather than later, and discharging later rather than now. - # We penalise the future with at most 1 per thousand times the price spread. - if prefer_charging_sooner: - up_deviation_prices = add_tiny_price_slope(up_deviation_prices, "event_value") - down_deviation_prices = add_tiny_price_slope( - down_deviation_prices, "event_value" - ) - - # Set up commitments to optimise for - commitment_quantities = [initialize_series(0, start, end, resolution)] - - # Todo: convert to EUR/(deviation of commitment, which is in MW) - commitment_upwards_deviation_price = [ - up_deviation_prices.loc[start : end - resolution]["event_value"] - ] - commitment_downwards_deviation_price = [ - down_deviation_prices.loc[start : end - resolution]["event_value"] - ] - - # Set up device constraints: only one scheduled flexible device for this EMS (at index 0), plus the forecasted inflexible devices (at indices 1 to n). - columns = [ - "equals", - "max", - "min", - "derivative equals", - "derivative max", - "derivative min", - "derivative down efficiency", - "derivative up efficiency", - ] - if inflexible_device_sensors is None: - inflexible_device_sensors = [] - device_constraints = [ - initialize_df(columns, start, end, resolution) - for i in range(1 + len(inflexible_device_sensors)) - ] - for i, inflexible_sensor in enumerate(inflexible_device_sensors): - device_constraints[i + 1]["derivative equals"] = get_power_values( - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - sensor=inflexible_sensor, - ) - if soc_targets is not None: - device_constraints[0]["equals"] = soc_targets.shift( - -1, freq=resolution - ).values * (timedelta(hours=1) / resolution) - soc_at_start * ( - timedelta(hours=1) / resolution - ) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time, - # while the "equals" constraint defines what the total stock should be at the end of a time slot, - # where the time slot is indexed by its starting time) - device_constraints[0]["min"] = (soc_min - soc_at_start) * ( - timedelta(hours=1) / resolution - ) - device_constraints[0]["max"] = (soc_max - soc_at_start) * ( - timedelta(hours=1) / resolution - ) - device_constraints[0]["derivative min"] = ( - sensor.get_attribute("capacity_in_mw") * -1 - ) - device_constraints[0]["derivative max"] = sensor.get_attribute("capacity_in_mw") - - # Apply round-trip efficiency evenly to charging and discharging - device_constraints[0]["derivative down efficiency"] = roundtrip_efficiency**0.5 - device_constraints[0]["derivative up efficiency"] = roundtrip_efficiency**0.5 - - # Set up EMS constraints - columns = ["derivative max", "derivative min"] - ems_constraints = initialize_df(columns, start, end, resolution) - ems_capacity = sensor.generic_asset.get_attribute("capacity_in_mw") - if ems_capacity is not None: - ems_constraints["derivative min"] = ems_capacity * -1 - ems_constraints["derivative max"] = ems_capacity - - ems_schedule, expected_costs, scheduler_results = device_scheduler( - device_constraints, - ems_constraints, - commitment_quantities, - commitment_downwards_deviation_price, - commitment_upwards_deviation_price, - ) - if scheduler_results.solver.termination_condition == "infeasible": - # Fallback policy if the problem was unsolvable - battery_schedule = fallback_charging_policy( - sensor, device_constraints[0], start, end, resolution - ) - else: - battery_schedule = ems_schedule[0] - - # Round schedule - if round_to_decimals: - battery_schedule = battery_schedule.round(round_to_decimals) - - return battery_schedule + return StorageScheduler().schedule(*args, **kwargs) diff --git a/flexmeasures/data/models/planning/charging_station.py b/flexmeasures/data/models/planning/charging_station.py index a53df9610..32ef204cf 100644 --- a/flexmeasures/data/models/planning/charging_station.py +++ b/flexmeasures/data/models/planning/charging_station.py @@ -1,180 +1,11 @@ -from typing import List, Optional, Union -from datetime import datetime, timedelta +from flexmeasures.data.models.planning.storage import StorageScheduler -import pandas as pd -from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.planning.solver import device_scheduler -from flexmeasures.data.models.planning.utils import ( - initialize_df, - initialize_series, - add_tiny_price_slope, - get_prices, - get_power_values, - fallback_charging_policy, -) +def schedule_charging_station(*args, **kwargs): + import warnings - -def schedule_charging_station( - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - soc_at_start: float, - soc_targets: pd.Series, - soc_min: Optional[float] = None, - soc_max: Optional[float] = None, - roundtrip_efficiency: Optional[float] = None, - prefer_charging_sooner: bool = True, - consumption_price_sensor: Optional[Sensor] = None, - production_price_sensor: Optional[Sensor] = None, - inflexible_device_sensors: Optional[List[Sensor]] = None, - belief_time: Optional[datetime] = None, - round_to_decimals: Optional[int] = 6, -) -> Union[pd.Series, None]: - """Schedule a charging station asset based directly on the latest beliefs regarding market prices within the specified time - window. - For the resulting consumption schedule, consumption is defined as positive values. - Todo: handle uni-directional charging by setting the "min" or "derivative min" constraint to 0 - """ - - # Check for required Sensor attributes - sensor.check_required_attributes([("capacity_in_mw", (float, int))]) - - # Check for round-trip efficiency - if roundtrip_efficiency is None: - # Get default from sensor, or use 100% otherwise - roundtrip_efficiency = sensor.get_attribute("roundtrip_efficiency", 1) - if roundtrip_efficiency <= 0 or roundtrip_efficiency > 1: - raise ValueError("roundtrip_efficiency expected within the interval (0, 1]") - - # Check for min and max SOC, or get default from sensor - if soc_min is None: - # Can't drain the EV battery by more than it contains - soc_min = sensor.get_attribute("min_soc_in_mwh", 0) - if soc_max is None: - # Lacking information about the battery's nominal capacity, we use the highest target value as the maximum state of charge - soc_max = sensor.get_attribute("max_soc_in_mwh", max(soc_targets.values)) - - # Check for known prices or price forecasts, trimming planning window accordingly - up_deviation_prices, (start, end) = get_prices( - (start, end), - resolution, - beliefs_before=belief_time, - price_sensor=consumption_price_sensor, - sensor=sensor, - allow_trimmed_query_window=True, - ) - down_deviation_prices, (start, end) = get_prices( - (start, end), - resolution, - beliefs_before=belief_time, - price_sensor=production_price_sensor, - sensor=sensor, - allow_trimmed_query_window=True, - ) - - # soc targets are at the end of each time slot, while prices are indexed by the start of each time slot - soc_targets = soc_targets.tz_convert("UTC") - start = pd.Timestamp(start).tz_convert("UTC") - end = pd.Timestamp(end).tz_convert("UTC") - soc_targets = soc_targets[start + resolution : end] - - # Add tiny price slope to prefer charging now rather than later, and discharging later rather than now. - # We penalise the future with at most 1 per thousand times the price spread. - if prefer_charging_sooner: - up_deviation_prices = add_tiny_price_slope(up_deviation_prices, "event_value") - down_deviation_prices = add_tiny_price_slope( - down_deviation_prices, "event_value" - ) - - # Set up commitments to optimise for - commitment_quantities = [initialize_series(0, start, end, resolution)] - - # Todo: convert to EUR/(deviation of commitment, which is in MW) - commitment_upwards_deviation_price = [ - up_deviation_prices.loc[start : end - resolution]["event_value"] - ] - commitment_downwards_deviation_price = [ - down_deviation_prices.loc[start : end - resolution]["event_value"] - ] - - # Set up device constraints: only one scheduled flexible device for this EMS (at index 0), plus the forecasted inflexible devices (at indices 1 to n). - columns = [ - "equals", - "max", - "min", - "derivative equals", - "derivative max", - "derivative min", - ] - if inflexible_device_sensors is None: - inflexible_device_sensors = [] - device_constraints = [ - initialize_df(columns, start, end, resolution) - for i in range(1 + len(inflexible_device_sensors)) - ] - for i, inflexible_sensor in enumerate(inflexible_device_sensors): - device_constraints[i + 1]["derivative equals"] = get_power_values( - query_window=(start, end), - resolution=resolution, - beliefs_before=belief_time, - sensor=inflexible_sensor, - ) - device_constraints[0]["equals"] = soc_targets.shift(-1, freq=resolution).values * ( - timedelta(hours=1) / resolution - ) - soc_at_start * ( - timedelta(hours=1) / resolution - ) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time, - # while the "equals" constraint defines what the total stock should be at the end of a time slot, - # where the time slot is indexed by its starting time) - device_constraints[0]["min"] = (soc_min - soc_at_start) * ( - timedelta(hours=1) / resolution - ) - device_constraints[0]["max"] = (soc_max - soc_at_start) * ( - timedelta(hours=1) / resolution - ) - - if sensor.get_attribute("is_strictly_non_positive"): - device_constraints[0]["derivative min"] = 0 - else: - device_constraints[0]["derivative min"] = ( - sensor.get_attribute("capacity_in_mw") * -1 - ) - if sensor.get_attribute("is_strictly_non_negative"): - device_constraints[0]["derivative max"] = 0 - else: - device_constraints[0]["derivative max"] = sensor.get_attribute("capacity_in_mw") - - # Apply round-trip efficiency evenly to charging and discharging - device_constraints[0]["derivative down efficiency"] = roundtrip_efficiency**0.5 - device_constraints[0]["derivative up efficiency"] = roundtrip_efficiency**0.5 - - # Set up EMS constraints - columns = ["derivative max", "derivative min"] - ems_constraints = initialize_df(columns, start, end, resolution) - ems_capacity = sensor.generic_asset.get_attribute("capacity_in_mw") - if ems_capacity is not None: - ems_constraints["derivative min"] = ems_capacity * -1 - ems_constraints["derivative max"] = ems_capacity - - ems_schedule, expected_costs, scheduler_results = device_scheduler( - device_constraints, - ems_constraints, - commitment_quantities, - commitment_downwards_deviation_price, - commitment_upwards_deviation_price, + warnings.warn( + "The schedule_charging_station method is deprecated and will be removed from flexmeasures in a future version. Replace with StorageScheduler().schedule to suppress this warning.", + FutureWarning, ) - if scheduler_results.solver.termination_condition == "infeasible": - # Fallback policy if the problem was unsolvable - charging_station_schedule = fallback_charging_policy( - sensor, device_constraints[0], start, end, resolution - ) - else: - charging_station_schedule = ems_schedule[0] - - # Round schedule - if round_to_decimals: - charging_station_schedule = charging_station_schedule.round(round_to_decimals) - - return charging_station_schedule + return StorageScheduler().schedule(*args, **kwargs) diff --git a/flexmeasures/data/models/planning/solver.py b/flexmeasures/data/models/planning/linear_optimization.py similarity index 98% rename from flexmeasures/data/models/planning/solver.py rename to flexmeasures/data/models/planning/linear_optimization.py index 42c8acf29..32d8b7456 100644 --- a/flexmeasures/data/models/planning/solver.py +++ b/flexmeasures/data/models/planning/linear_optimization.py @@ -323,11 +323,11 @@ def cost_function(m): for d in model.d: planned_device_power = [model.ems_power[d, j].value for j in model.j] planned_power_per_device.append( - pd.Series( - index=pd.date_range( - start=start, end=end, freq=to_offset(resolution), closed="left" - ), + initialize_series( data=planned_device_power, + start=start, + end=end, + resolution=to_offset(resolution), ) ) diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py new file mode 100644 index 000000000..ddc5ce793 --- /dev/null +++ b/flexmeasures/data/models/planning/storage.py @@ -0,0 +1,178 @@ +from datetime import datetime, timedelta +from typing import Optional, List, Union + +import pandas as pd + +from flexmeasures import Sensor +from flexmeasures.data.models.planning import Scheduler +from flexmeasures.data.models.planning.linear_optimization import device_scheduler +from flexmeasures.data.models.planning.utils import ( + get_prices, + add_tiny_price_slope, + initialize_series, + initialize_df, + get_power_values, + fallback_charging_policy, +) + + +class StorageScheduler(Scheduler): + + __version__ = "1" + __author__ = "Seita" + + def schedule( + self, + sensor: Sensor, + start: datetime, + end: datetime, + resolution: timedelta, + storage_specs: dict, + consumption_price_sensor: Optional[Sensor] = None, + production_price_sensor: Optional[Sensor] = None, + inflexible_device_sensors: Optional[List[Sensor]] = None, + belief_time: Optional[datetime] = None, + round_to_decimals: Optional[int] = 6, + ) -> Union[pd.Series, None]: + """Schedule a battery or Charge Point based directly on the latest beliefs regarding market prices within the specified time window. + For the resulting consumption schedule, consumption is defined as positive values. + """ + + soc_at_start = storage_specs.get("soc_at_start") + soc_targets = storage_specs.get("soc_targets") + soc_min = storage_specs.get("soc_min") + soc_max = storage_specs.get("soc_max") + roundtrip_efficiency = storage_specs.get("roundtrip_efficiency") + prefer_charging_sooner = storage_specs.get("prefer_charging_sooner", True) + + # Check for required Sensor attributes + sensor.check_required_attributes([("capacity_in_mw", (float, int))]) + + # Check for known prices or price forecasts, trimming planning window accordingly + up_deviation_prices, (start, end) = get_prices( + (start, end), + resolution, + beliefs_before=belief_time, + price_sensor=consumption_price_sensor, + sensor=sensor, + allow_trimmed_query_window=True, + ) + down_deviation_prices, (start, end) = get_prices( + (start, end), + resolution, + beliefs_before=belief_time, + price_sensor=production_price_sensor, + sensor=sensor, + allow_trimmed_query_window=True, + ) + + start = pd.Timestamp(start).tz_convert("UTC") + end = pd.Timestamp(end).tz_convert("UTC") + + # Add tiny price slope to prefer charging now rather than later, and discharging later rather than now. + # We penalise the future with at most 1 per thousand times the price spread. + if prefer_charging_sooner: + up_deviation_prices = add_tiny_price_slope( + up_deviation_prices, "event_value" + ) + down_deviation_prices = add_tiny_price_slope( + down_deviation_prices, "event_value" + ) + + # Set up commitments to optimise for + commitment_quantities = [initialize_series(0, start, end, resolution)] + + # Todo: convert to EUR/(deviation of commitment, which is in MW) + commitment_upwards_deviation_price = [ + up_deviation_prices.loc[start : end - resolution]["event_value"] + ] + commitment_downwards_deviation_price = [ + down_deviation_prices.loc[start : end - resolution]["event_value"] + ] + + # Set up device constraints: only one scheduled flexible device for this EMS (at index 0), plus the forecasted inflexible devices (at indices 1 to n). + columns = [ + "equals", + "max", + "min", + "derivative equals", + "derivative max", + "derivative min", + "derivative down efficiency", + "derivative up efficiency", + ] + if inflexible_device_sensors is None: + inflexible_device_sensors = [] + device_constraints = [ + initialize_df(columns, start, end, resolution) + for i in range(1 + len(inflexible_device_sensors)) + ] + for i, inflexible_sensor in enumerate(inflexible_device_sensors): + device_constraints[i + 1]["derivative equals"] = get_power_values( + query_window=(start, end), + resolution=resolution, + beliefs_before=belief_time, + sensor=inflexible_sensor, + ) + if soc_targets is not None and not soc_targets.empty: + soc_targets = soc_targets.tz_convert("UTC") + device_constraints[0]["equals"] = soc_targets.shift( + -1, freq=resolution + ).values * (timedelta(hours=1) / resolution) - soc_at_start * ( + timedelta(hours=1) / resolution + ) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time, + # while the "equals" constraint defines what the total stock should be at the end of a time slot, + # where the time slot is indexed by its starting time) + device_constraints[0]["min"] = (soc_min - soc_at_start) * ( + timedelta(hours=1) / resolution + ) + device_constraints[0]["max"] = (soc_max - soc_at_start) * ( + timedelta(hours=1) / resolution + ) + if sensor.get_attribute("is_strictly_non_positive"): + device_constraints[0]["derivative min"] = 0 + else: + device_constraints[0]["derivative min"] = ( + sensor.get_attribute("capacity_in_mw") * -1 + ) + if sensor.get_attribute("is_strictly_non_negative"): + device_constraints[0]["derivative max"] = 0 + else: + device_constraints[0]["derivative max"] = sensor.get_attribute( + "capacity_in_mw" + ) + + # Apply round-trip efficiency evenly to charging and discharging + device_constraints[0]["derivative down efficiency"] = ( + roundtrip_efficiency**0.5 + ) + device_constraints[0]["derivative up efficiency"] = roundtrip_efficiency**0.5 + + # Set up EMS constraints + columns = ["derivative max", "derivative min"] + ems_constraints = initialize_df(columns, start, end, resolution) + ems_capacity = sensor.generic_asset.get_attribute("capacity_in_mw") + if ems_capacity is not None: + ems_constraints["derivative min"] = ems_capacity * -1 + ems_constraints["derivative max"] = ems_capacity + + ems_schedule, expected_costs, scheduler_results = device_scheduler( + device_constraints, + ems_constraints, + commitment_quantities, + commitment_downwards_deviation_price, + commitment_upwards_deviation_price, + ) + if scheduler_results.solver.termination_condition == "infeasible": + # Fallback policy if the problem was unsolvable + battery_schedule = fallback_charging_policy( + sensor, device_constraints[0], start, end, resolution + ) + else: + battery_schedule = ems_schedule[0] + + # Round schedule + if round_to_decimals: + battery_schedule = battery_schedule.round(round_to_decimals) + + return battery_schedule diff --git a/flexmeasures/data/models/planning/tests/conftest.py b/flexmeasures/data/models/planning/tests/conftest.py index 89d39ccaf..e59db120f 100644 --- a/flexmeasures/data/models/planning/tests/conftest.py +++ b/flexmeasures/data/models/planning/tests/conftest.py @@ -1,11 +1,12 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import timedelta import pytest import pandas as pd from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType +from flexmeasures.data.models.planning.utils import initialize_index from flexmeasures.data.models.time_series import Sensor, TimedBelief @@ -98,12 +99,10 @@ def add_inflexible_device_forecasts( Set up inflexible devices and forecasts. """ # 2 days of test data - time_slots = pd.date_range( - datetime(2015, 1, 1), - datetime(2015, 1, 3), - freq="15T", - closed="left", - tz="Europe/Amsterdam", + time_slots = initialize_index( + start=pd.Timestamp("2015-01-01").tz_localize("Europe/Amsterdam"), + end=pd.Timestamp("2015-01-03").tz_localize("Europe/Amsterdam"), + resolution="15T", ) # PV (8 hours at zero capacity, 8 hours at 90% capacity, and again 8 hours at zero capacity) diff --git a/flexmeasures/data/models/planning/tests/test_solver.py b/flexmeasures/data/models/planning/tests/test_solver.py index 0295079eb..ef2b9711f 100644 --- a/flexmeasures/data/models/planning/tests/test_solver.py +++ b/flexmeasures/data/models/planning/tests/test_solver.py @@ -6,8 +6,11 @@ import pandas as pd from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.planning.battery import schedule_battery -from flexmeasures.data.models.planning.charging_station import schedule_charging_station +from flexmeasures.data.models.planning.storage import StorageScheduler +from flexmeasures.data.models.planning.utils import ( + ensure_storage_specs, + initialize_series, +) from flexmeasures.utils.calculations import integrate_time_series @@ -26,12 +29,15 @@ def test_battery_solver_day_1( end = tz.localize(datetime(2015, 1, 2)) resolution = timedelta(minutes=15) soc_at_start = battery.get_attribute("soc_in_mwh") - schedule = schedule_battery( + storage_specs = ensure_storage_specs( + dict(soc_at_start=soc_at_start), battery, start, end, resolution + ) + schedule = StorageScheduler().schedule( battery, start, end, resolution, - soc_at_start, + storage_specs=storage_specs, inflexible_device_sensors=add_inflexible_device_forecasts.keys() if use_inflexible_device else None, @@ -80,15 +86,24 @@ def test_battery_solver_day_2(add_battery_assets, roundtrip_efficiency: float): soc_at_start = battery.get_attribute("soc_in_mwh") soc_min = 0.5 soc_max = 4.5 - schedule = schedule_battery( + storage_specs = ensure_storage_specs( + dict( + soc_at_start=soc_at_start, + soc_min=soc_min, + soc_max=soc_max, + roundtrip_efficiency=roundtrip_efficiency, + ), battery, start, end, resolution, - soc_at_start, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, + ) + schedule = StorageScheduler().schedule( + battery, + start, + end, + resolution, + storage_specs=storage_specs, ) soc_schedule = integrate_time_series( schedule, @@ -159,12 +174,17 @@ def test_charging_station_solver_day_2(target_soc, charging_station_name): end = tz.localize(datetime(2015, 1, 3)) resolution = timedelta(minutes=15) target_soc_datetime = start + duration_until_target - soc_targets = pd.Series( - np.nan, index=pd.date_range(start, end, freq=resolution, closed="right") - ) + soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") soc_targets.loc[target_soc_datetime] = target_soc - consumption_schedule = schedule_charging_station( - charging_station, start, end, resolution, soc_at_start, soc_targets + storage_specs = ensure_storage_specs( + dict(soc_at_start=soc_at_start, soc_targets=soc_targets), + charging_station, + start, + end, + resolution, + ) + consumption_schedule = StorageScheduler().schedule( + charging_station, start, end, resolution, storage_specs=storage_specs ) soc_schedule = integrate_time_series( consumption_schedule, soc_at_start, decimal_precision=6 @@ -216,12 +236,21 @@ def test_fallback_to_unsolvable_problem(target_soc, charging_station_name): end = tz.localize(datetime(2015, 1, 3)) resolution = timedelta(minutes=15) target_soc_datetime = start + duration_until_target - soc_targets = pd.Series( - np.nan, index=pd.date_range(start, end, freq=resolution, closed="right") - ) + soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") soc_targets.loc[target_soc_datetime] = target_soc - consumption_schedule = schedule_charging_station( - charging_station, start, end, resolution, soc_at_start, soc_targets + storage_specs = ensure_storage_specs( + dict(soc_at_start=soc_at_start, soc_targets=soc_targets), + charging_station, + start, + end, + resolution, + ) + consumption_schedule = StorageScheduler().schedule( + charging_station, + start, + end, + resolution, + storage_specs=storage_specs, ) soc_schedule = integrate_time_series( consumption_schedule, soc_at_start, decimal_precision=6 @@ -269,14 +298,23 @@ def test_building_solver_day_2( soc_at_start = 2.5 soc_min = 0.5 soc_max = 4.5 - schedule = schedule_battery( + storage_specs = ensure_storage_specs( + dict( + soc_at_start=soc_at_start, + soc_min=soc_min, + soc_max=soc_max, + ), battery, start, end, resolution, - soc_at_start, - soc_min=soc_min, - soc_max=soc_max, + ) + schedule = StorageScheduler().schedule( + battery, + start, + end, + resolution, + storage_specs=storage_specs, inflexible_device_sensors=inflexible_devices.values(), ) soc_schedule = integrate_time_series(schedule, soc_at_start, decimal_precision=6) @@ -290,7 +328,7 @@ def test_building_solver_day_2( columns=["inflexible"], ).tail( -4 * 24 - ) # remove first 96 quarterhours (the schedule is about the 2nd day) + ) # remove first 96 quarter-hours (the schedule is about the 2nd day) capacity["max"] = building.get_attribute("capacity_in_mw") capacity["min"] = -building.get_attribute("capacity_in_mw") capacity["production headroom"] = capacity["max"] - capacity["inflexible"] diff --git a/flexmeasures/data/models/planning/utils.py b/flexmeasures/data/models/planning/utils.py index 23a9c347e..d4442583d 100644 --- a/flexmeasures/data/models/planning/utils.py +++ b/flexmeasures/data/models/planning/utils.py @@ -1,3 +1,4 @@ +from packaging import version from typing import List, Optional, Tuple, Union from datetime import date, datetime, timedelta @@ -9,7 +10,6 @@ from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.planning.exceptions import ( - UnknownForecastException, UnknownMarketException, UnknownPricesException, ) @@ -17,9 +17,15 @@ def initialize_df( - columns: List[str], start: datetime, end: datetime, resolution: timedelta + columns: List[str], + start: datetime, + end: datetime, + resolution: timedelta, + inclusive: str = "left", ) -> pd.DataFrame: - df = pd.DataFrame(index=initialize_index(start, end, resolution), columns=columns) + df = pd.DataFrame( + index=initialize_index(start, end, resolution, inclusive), columns=columns + ) return df @@ -28,18 +34,99 @@ def initialize_series( start: datetime, end: datetime, resolution: timedelta, + inclusive: str = "left", ) -> pd.Series: - s = pd.Series(index=initialize_index(start, end, resolution), data=data) + s = pd.Series(index=initialize_index(start, end, resolution, inclusive), data=data) return s def initialize_index( - start: Union[date, datetime], end: Union[date, datetime], resolution: timedelta + start: Union[date, datetime, str], + end: Union[date, datetime, str], + resolution: Union[timedelta, str], + inclusive: str = "left", ) -> pd.DatetimeIndex: - i = pd.date_range( - start=start, end=end, freq=to_offset(resolution), closed="left", name="datetime" - ) - return i + if version.parse(pd.__version__) >= version.parse("1.4.0"): + return pd.date_range( + start=start, + end=end, + freq=to_offset(resolution), + inclusive=inclusive, + name="datetime", + ) + else: + return pd.date_range( + start=start, + end=end, + freq=to_offset(resolution), + closed=inclusive, + name="datetime", + ) + + +def ensure_storage_specs( + specs: Optional[dict], + sensor: Sensor, + start_of_schedule: datetime, + end_of_schedule: datetime, + resolution: timedelta, +) -> dict: + """ + Check storage specs and fill in values from context, if possible. + + Storage specs are: + - soc_at_start + - soc_min + - soc_max + - soc_targets + - roundtrip_efficiency + - prefer_charging_sooner + """ + if specs is None: + specs = {} + + # Check state of charge + # Preferably, a starting soc is given. + # Otherwise, we try to retrieve the current state of charge from the asset (if that is the valid one at the start). + # Otherwise, we set the starting soc to 0 (some assets don't use the concept of a state of charge, + # and without soc targets and limits the starting soc doesn't matter). + if "soc_at_start" not in specs or specs["soc_at_start"] is None: + if ( + start_of_schedule == sensor.get_attribute("soc_datetime") + and sensor.get_attribute("soc_in_mwh") is not None + ): + specs["soc_at_start"] = sensor.get_attribute("soc_in_mwh") + else: + specs["soc_at_start"] = 0 + + # init default targets + if "soc_targets" not in specs or specs["soc_targets"] is None: + specs["soc_targets"] = initialize_series( + np.nan, start_of_schedule, end_of_schedule, resolution, inclusive="right" + ) + # soc targets are at the end of each time slot, while prices are indexed by the start of each time slot + specs["soc_targets"] = specs["soc_targets"][ + start_of_schedule + resolution : end_of_schedule + ] + + # Check for min and max SOC, or get default from sensor + if "soc_min" not in specs or specs["soc_min"] is None: + # Can't drain the storage by more than it contains + specs["soc_min"] = sensor.get_attribute("min_soc_in_mwh", 0) + if "soc_max" not in specs or specs["soc_max"] is None: + # Lacking information about the battery's nominal capacity, we use the highest target value as the maximum state of charge + specs["soc_max"] = sensor.get_attribute( + "max_soc_in_mwh", max(specs["soc_targets"].values) + ) + + # Check for round-trip efficiency + if "roundtrip_efficiency" not in specs or specs["roundtrip_efficiency"] is None: + # Get default from sensor, or use 100% otherwise + specs["roundtrip_efficiency"] = sensor.get_attribute("roundtrip_efficiency", 1) + if specs["roundtrip_efficiency"] <= 0 or specs["roundtrip_efficiency"] > 1: + raise ValueError("roundtrip_efficiency expected within the interval (0, 1]") + + return specs def add_tiny_price_slope( @@ -150,11 +237,17 @@ def get_power_values( one_deterministic_belief_per_event=True, ) # consumption is negative, production is positive df = simplify_index(bdf) + df = df.reindex(initialize_index(query_window[0], query_window[1], resolution)) nan_values = df.isnull().values if nan_values.any() or df.empty: - raise UnknownForecastException( - f"Forecasts unknown for planning window. (sensor {sensor.id})" + current_app.logger.warning( + f"Assuming zero power values for (partially) unknown power values for planning window. (sensor {sensor.id})" ) + df = df.fillna(0) + if sensor.get_attribute( + "consumption_is_positive", False + ): # FlexMeasures default is to store consumption as negative power values + return df.values return -df.values diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index 0e9d2524b..510c59b4a 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -1,22 +1,22 @@ from datetime import datetime, timedelta -from typing import List, Tuple, Optional, Callable +from typing import List, Tuple, Optional import os import sys import importlib.util from importlib.abc import Loader +from rq.job import Job from flask import current_app import click -import numpy as np -import pandas as pd from rq import get_current_job -from rq.job import Job import timely_beliefs as tb from flexmeasures.data import db -from flexmeasures.data.models.planning.battery import schedule_battery -from flexmeasures.data.models.planning.charging_station import schedule_charging_station +from flexmeasures.data.models.planning.storage import StorageScheduler +from flexmeasures.data.models.planning.utils import ensure_storage_specs from flexmeasures.data.models.time_series import Sensor, TimedBelief +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.utils import get_data_source, save_to_db """ @@ -28,31 +28,29 @@ """ -DEFAULT_RESOLUTION = timedelta(minutes=15) - - def create_scheduling_job( - sensor_id: int, + sensor: Sensor, start_of_schedule: datetime, end_of_schedule: datetime, belief_time: datetime, - resolution: timedelta = DEFAULT_RESOLUTION, - soc_at_start: Optional[float] = None, - soc_targets: Optional[pd.Series] = None, - soc_min: Optional[float] = None, - soc_max: Optional[float] = None, - roundtrip_efficiency: Optional[float] = None, + resolution: timedelta, consumption_price_sensor: Optional[Sensor] = None, production_price_sensor: Optional[Sensor] = None, inflexible_device_sensors: Optional[List[Sensor]] = None, job_id: Optional[str] = None, enqueue: bool = True, + storage_specs: Optional[dict] = None, ) -> Job: - """Supporting quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event. + """ + Create a new Job, which is queued for later execution. + + Before enqueuing, we perform some checks on sensor type and specs, for errors we want to bubble up early. + + To support quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event. That means one event leads to one job (i.e. actions are event driven). Target SOC values should be indexed by their due date. For example, for quarter-hourly targets between 5 and 6 AM: - >>> df = pd.Series(data=[1, 2, 2.5, 3], index=pd.date_range(datetime(2010,1,1,5), datetime(2010,1,1,6), freq=timedelta(minutes=15), closed="right")) + >>> df = pd.Series(data=[1, 2, 2.5, 3], index=pd.date_range(datetime(2010,1,1,5), datetime(2010,1,1,6), freq=timedelta(minutes=15), inclusive="right")) >>> print(df) 2010-01-01 05:15:00 1.0 2010-01-01 05:30:00 2.0 @@ -60,23 +58,23 @@ def create_scheduling_job( 2010-01-01 06:00:00 3.0 Freq: 15T, dtype: float64 """ + storage_specs = ensure_storage_specs( + storage_specs, sensor, start_of_schedule, end_of_schedule, resolution + ) + job = Job.create( make_schedule, kwargs=dict( - sensor_id=sensor_id, + sensor_id=sensor.id, start=start_of_schedule, end=end_of_schedule, belief_time=belief_time, resolution=resolution, - soc_at_start=soc_at_start, - soc_targets=soc_targets, - soc_min=soc_min, - soc_max=soc_max, - roundtrip_efficiency=roundtrip_efficiency, + storage_specs=storage_specs, consumption_price_sensor=consumption_price_sensor, production_price_sensor=production_price_sensor, inflexible_device_sensors=inflexible_device_sensors, - ), + ), # TODO: maybe also pass these sensors as IDs, to avoid potential db sessions confusion id=job_id, connection=current_app.queues["scheduling"].connection, ttl=int( @@ -101,93 +99,77 @@ def make_schedule( end: datetime, belief_time: datetime, resolution: timedelta, - soc_at_start: Optional[float] = None, - soc_targets: Optional[pd.Series] = None, - soc_min: Optional[float] = None, - soc_max: Optional[float] = None, - roundtrip_efficiency: Optional[float] = None, + storage_specs: Optional[dict], consumption_price_sensor: Optional[Sensor] = None, production_price_sensor: Optional[Sensor] = None, inflexible_device_sensors: Optional[List[Sensor]] = None, ) -> bool: - """Preferably, a starting soc is given. - Otherwise, we try to retrieve the current state of charge from the asset (if that is the valid one at the start). - Otherwise, we set the starting soc to 0 (some assets don't use the concept of a state of charge, - and without soc targets and limits the starting soc doesn't matter). + """ + This function is meant to be queued as a job. + It thus potentially runs on a different FlexMeasures node than where the job is created. + + - Choose which scheduling function can be used + - Compute schedule + - Turn scheduled values into beliefs and save them to db """ # https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork db.engine.dispose() - rq_job = get_current_job() - - # find sensor sensor = Sensor.query.filter_by(id=sensor_id).one_or_none() + rq_job = get_current_job() if rq_job: click.echo( "Running Scheduling Job %s: %s, from %s to %s" % (rq_job.id, sensor, start, end) ) - if soc_at_start is None: - if ( - start == sensor.get_attribute("soc_datetime") - and sensor.get_attribute("soc_in_mwh") is not None - ): - soc_at_start = sensor.get_attribute("soc_in_mwh") - else: - soc_at_start = 0 - - if soc_targets is None: - soc_targets = pd.Series( - np.nan, index=pd.date_range(start, end, freq=resolution, closed="right") - ) - - data_source_name = "Seita" - - # Choose which algorithm to use + data_source_info = {} + # Choose which algorithm to use TODO: unify loading this into a func store concept if "custom-scheduler" in sensor.attributes: scheduler_specs = sensor.attributes.get("custom-scheduler") - scheduler, data_source_name = load_custom_scheduler(scheduler_specs) - if rq_job: - rq_job.meta["data_source_name"] = data_source_name - rq_job.save_meta() - elif sensor.generic_asset.generic_asset_type.name == "battery": - scheduler = schedule_battery + scheduler, data_source_info = load_custom_scheduler(scheduler_specs) elif sensor.generic_asset.generic_asset_type.name in ( + "battery", "one-way_evse", "two-way_evse", ): - scheduler = schedule_charging_station - + scheduler = StorageScheduler + data_source_info["model"] = scheduler.__name__ + data_source_info["name"] = scheduler.__author__ + data_source_info["version"] = scheduler.__version__ else: raise ValueError( "Scheduling is not (yet) supported for asset type %s." % sensor.generic_asset.generic_asset_type ) - consumption_schedule = scheduler( + consumption_schedule = scheduler().schedule( sensor, start, end, resolution, - soc_at_start, - soc_targets, - soc_min, - soc_max, - roundtrip_efficiency, + storage_specs=storage_specs, consumption_price_sensor=consumption_price_sensor, production_price_sensor=production_price_sensor, inflexible_device_sensors=inflexible_device_sensors, belief_time=belief_time, ) + if rq_job: + click.echo("Job %s made schedule." % rq_job.id) data_source = get_data_source( - data_source_name=data_source_name, + data_source_name=data_source_info["name"], + data_source_model=data_source_info["model"], + data_source_version=data_source_info["version"], data_source_type="scheduling script", ) + + # saving info on the job, so the API for a job can look the data up + data_source_info["id"] = data_source.id if rq_job: - click.echo("Job %s made schedule." % rq_job.id) + rq_job.meta["data_source_info"] = data_source_info + rq_job.save_meta() ts_value_schedule = [ TimedBelief( @@ -206,17 +188,19 @@ def make_schedule( return True -def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]: +def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Scheduler, dict]: """ Read in custom scheduling spec. - Attempt to load the Callable, also derive a data source name. + Attempt to load the Callable, also derive data source info. + + The scheduler class should be derived from flexmeasures.data.models.planning.Scheduler. + The Callable is assumed to be named "schedule". Example specs: { "module": "/path/to/module.py", # or sthg importable, e.g. "package.module" - "function": "name_of_function", - "source": "source name" + "class": "NameOfSchedulerClass", } """ @@ -224,12 +208,12 @@ def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]: scheduler_specs, dict ), f"Scheduler specs is {type(scheduler_specs)}, should be a dict" assert "module" in scheduler_specs, "scheduler specs have no 'module'." - assert "function" in scheduler_specs, "scheduler specs have no 'function'" + assert "class" in scheduler_specs, "scheduler specs have no 'class'" - source_name = scheduler_specs.get( - "source", f"Custom scheduler - {scheduler_specs['function']}" - ) - scheduler_name = scheduler_specs["function"] + scheduler_name = scheduler_specs["class"] + source_info = dict( + model=scheduler_name, version="1", name="Unknown author" + ) # default # import module module_descr = scheduler_specs["module"] @@ -255,10 +239,30 @@ def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Callable, str]: # get scheduling function assert hasattr( - module, scheduler_specs["function"] - ), "Module at {module_descr} has no function {scheduler_specs['function']}" + module, scheduler_specs["class"] + ), "Module at {module_descr} has no class {scheduler_specs['class']}" + + scheduler_class = getattr(module, scheduler_specs["class"]) + + if hasattr(scheduler_class, "__version__"): + source_info["version"] = str(scheduler_class.__version__) + else: + current_app.logger.warning( + f"Scheduler {scheduler_class.__name__} loaded, but has no __version__ attribute." + ) + if hasattr(scheduler_class, "__author__"): + source_info["name"] = str(scheduler_class.__author__) + else: + current_app.logger.warning( + f"Scheduler {scheduler_class.__name__} loaded, but has no __author__ attribute." + ) - return getattr(module, scheduler_specs["function"]), source_name + schedule_function_name = "schedule" + if not hasattr(scheduler_class, schedule_function_name): + raise NotImplementedError( + f"No function {schedule_function_name} in {scheduler_class}. Cannot load custom scheduler." + ) + return scheduler_class, source_info def handle_scheduling_exception(job, exc_type, exc_value, traceback): @@ -268,3 +272,38 @@ def handle_scheduling_exception(job, exc_type, exc_value, traceback): click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value)) job.meta["exception"] = exc_value job.save_meta() + + +def get_data_source_for_job( + job: Optional[Job], sensor: Optional[Sensor] = None +) -> Optional[DataSource]: + """ + Try to find the data source linked by this scheduling job. + + We expect that enough info on the source was placed in the meta dict. + For a transition period, we might have to guess a bit. + TODO: Afterwards, this can be lighter. We should also expect a job and no sensor is needed, + once API v1.3 is deprecated. + """ + data_source_info = None + if job: + data_source_info = job.meta.get("data_source_info") + if data_source_info and "id" in data_source_info: + return DataSource.query.get(data_source_info["id"]) + if data_source_info is None and sensor: + data_source_info = dict(name="Seita", model="StorageScheduler") + # TODO: change to raise later (v0.13) - all scheduling jobs now get full info + current_app.logger.warning( + "Looking up scheduling data without knowing full data_source_info (version). This is deprecated soon. Please specify a job id as event or switch to API v3." + ) + scheduler_sources = ( + DataSource.query.filter_by( + type="scheduling script", + **data_source_info, + ) + .order_by(DataSource.version.desc()) + .all() + ) # Might still be more than one, e.g. per user + if len(scheduler_sources) == 0: + return None + return scheduler_sources[0] diff --git a/flexmeasures/data/tests/dummy_scheduler.py b/flexmeasures/data/tests/dummy_scheduler.py index cf4f0ec89..7a97c2b46 100644 --- a/flexmeasures/data/tests/dummy_scheduler.py +++ b/flexmeasures/data/tests/dummy_scheduler.py @@ -1,21 +1,31 @@ from datetime import datetime, timedelta from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.planning import Scheduler from flexmeasures.data.models.planning.utils import initialize_series -def compute_a_schedule( - sensor: Sensor, - start: datetime, - end: datetime, - resolution: timedelta, - *args, - **kwargs -): - """Just a dummy scheduler.""" - return initialize_series( # simply creates a Pandas Series repeating one value - data=sensor.get_attribute("capacity_in_mw"), - start=start, - end=end, - resolution=resolution, - ) +class DummyScheduler(Scheduler): + + __author__ = "Test Organization" + __version__ = "3" + + def schedule( + self, + sensor: Sensor, + start: datetime, + end: datetime, + resolution: timedelta, + *args, + **kwargs + ): + """ + Just a dummy scheduler that always plans to consume at maximum capacity. + (Schedulers return positive values for consumption, and negative values for production) + """ + return initialize_series( # simply creates a Pandas Series repeating one value + data=sensor.get_attribute("capacity_in_mw"), + start=start, + end=end, + resolution=resolution, + ) diff --git a/flexmeasures/data/tests/test_queries.py b/flexmeasures/data/tests/test_queries.py index 85068475a..109cc0219 100644 --- a/flexmeasures/data/tests/test_queries.py +++ b/flexmeasures/data/tests/test_queries.py @@ -7,6 +7,7 @@ import timely_beliefs as tb from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning.utils import initialize_index from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.queries.utils import ( multiply_dataframe_with_deterministic_beliefs, @@ -108,16 +109,12 @@ def test_collect_power_resampled( def test_multiplication(): df1 = pd.DataFrame( [[30.0, timedelta(hours=3)]], - index=pd.date_range( - "2000-01-01 10:00", "2000-01-01 15:00", freq="1h", closed="left" - ), + index=initialize_index("2000-01-01 10:00", "2000-01-01 15:00", resolution="1h"), columns=["event_value", "belief_horizon"], ) df2 = pd.DataFrame( [[10.0, timedelta(hours=1)]], - index=pd.date_range( - "2000-01-01 13:00", "2000-01-01 18:00", freq="1h", closed="left" - ), + index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"), columns=["event_value", "belief_horizon"], ) df = multiply_dataframe_with_deterministic_beliefs(df1, df2) @@ -125,22 +122,22 @@ def test_multiplication(): [ pd.DataFrame( [[np.nan, timedelta(hours=3)]], - index=pd.date_range( - "2000-01-01 10:00", "2000-01-01 13:00", freq="1h", closed="left" + index=initialize_index( + "2000-01-01 10:00", "2000-01-01 13:00", resolution="1h" ), columns=["event_value", "belief_horizon"], ), pd.DataFrame( [[300.0, timedelta(hours=1)]], - index=pd.date_range( - "2000-01-01 13:00", "2000-01-01 15:00", freq="1h", closed="left" + index=initialize_index( + "2000-01-01 13:00", "2000-01-01 15:00", resolution="1h" ), columns=["event_value", "belief_horizon"], ), pd.DataFrame( [[np.nan, timedelta(hours=1)]], - index=pd.date_range( - "2000-01-01 15:00", "2000-01-01 18:00", freq="1h", closed="left" + index=initialize_index( + "2000-01-01 15:00", "2000-01-01 18:00", resolution="1h" ), columns=["event_value", "belief_horizon"], ), @@ -161,17 +158,13 @@ def test_multiplication_with_one_empty_dataframe(): df2 = pd.DataFrame( [[10.0, timedelta(hours=1)]], - index=pd.date_range( - "2000-01-01 13:00", "2000-01-01 18:00", freq="1h", closed="left" - ), + index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"), columns=["event_value", "belief_horizon"], ) df_compare = pd.DataFrame( [[np.nan, timedelta(hours=1)]], - index=pd.date_range( - "2000-01-01 13:00", "2000-01-01 18:00", freq="1h", closed="left" - ), + index=initialize_index("2000-01-01 13:00", "2000-01-01 18:00", resolution="1h"), columns=["event_value", "belief_horizon"], ) # set correct types diff --git a/flexmeasures/data/tests/test_scheduling_jobs.py b/flexmeasures/data/tests/test_scheduling_jobs.py index a0111a445..357b3709a 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs.py +++ b/flexmeasures/data/tests/test_scheduling_jobs.py @@ -35,7 +35,7 @@ def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): ) # Make sure the scheduler data source isn't there job = create_scheduling_job( - battery.id, start, end, belief_time=start, resolution=resolution + battery, start, end, belief_time=start, resolution=resolution ) print("Job: %s" % job.id) @@ -60,8 +60,7 @@ def test_scheduling_a_battery(db, app, add_battery_assets, setup_test_data): scheduler_specs = { "module": None, # use make_module_descr, see below - "function": "compute_a_schedule", - "source": "Test Source", + "class": "DummyScheduler", } @@ -79,10 +78,12 @@ def test_loading_custom_scheduler(is_path: bool): Simply check if loading a custom scheduler works. """ scheduler_specs["module"] = make_module_descr(is_path) - custom_scheduler, data_source = load_custom_scheduler(scheduler_specs) - assert data_source == "Test Source" - assert custom_scheduler.__name__ == "compute_a_schedule" - assert custom_scheduler.__doc__ == "Just a dummy scheduler." + custom_scheduler, data_source_info = load_custom_scheduler(scheduler_specs) + assert data_source_info["name"] == "Test Organization" + assert data_source_info["version"] == "3" + assert data_source_info["model"] == "DummyScheduler" + assert custom_scheduler.__name__ == "DummyScheduler" + assert "Just a dummy scheduler" in custom_scheduler.schedule.__doc__ @pytest.mark.parametrize("is_path", [False, True]) @@ -102,7 +103,7 @@ def test_assigning_custom_scheduler(db, app, add_battery_assets, is_path: bool): resolution = timedelta(minutes=15) job = create_scheduling_job( - battery.id, start, end, belief_time=start, resolution=resolution + battery, start, end, belief_time=start, resolution=resolution ) print("Job: %s" % job.id) @@ -111,10 +112,11 @@ def test_assigning_custom_scheduler(db, app, add_battery_assets, is_path: bool): # make sure we saved the data source for later lookup redis_connection = app.queues["scheduling"].connection finished_job = Job.fetch(job.id, connection=redis_connection) - assert finished_job.meta["data_source_name"] == scheduler_specs["source"] + assert finished_job.meta["data_source_info"]["model"] == scheduler_specs["class"] scheduler_source = DataSource.query.filter_by( - name=finished_job.meta["data_source_name"], type="scheduling script" + type="scheduling script", + **finished_job.meta["data_source_info"], ).one_or_none() assert ( scheduler_source is not None diff --git a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py index f1f4bd5ea..534435773 100644 --- a/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_scheduling_jobs_fresh_db.py @@ -5,6 +5,7 @@ import pandas as pd from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.planning.utils import initialize_series from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.tests.utils import work_on_rq, exception_reporter @@ -31,9 +32,7 @@ def test_scheduling_a_charging_station( end = tz.localize(datetime(2015, 1, 3)) resolution = timedelta(minutes=15) target_soc_datetime = start + duration_until_target - soc_targets = pd.Series( - np.nan, index=pd.date_range(start, end, freq=resolution, closed="right") - ) + soc_targets = initialize_series(np.nan, start, end, resolution, inclusive="right") soc_targets.loc[target_soc_datetime] = target_soc assert ( @@ -42,13 +41,12 @@ def test_scheduling_a_charging_station( ) # Make sure the scheduler data source isn't there job = create_scheduling_job( - charging_station.id, + charging_station, start, end, belief_time=start, resolution=resolution, - soc_at_start=soc_at_start, - soc_targets=soc_targets, + storage_specs=dict(soc_at_start=soc_at_start, soc_targets=soc_targets), ) print("Job: %s" % job.id) diff --git a/flexmeasures/utils/calculations.py b/flexmeasures/utils/calculations.py index 84cb8dd79..f85813fa7 100644 --- a/flexmeasures/utils/calculations.py +++ b/flexmeasures/utils/calculations.py @@ -44,13 +44,13 @@ def integrate_time_series( down_efficiency: float | pd.Series = 1, decimal_precision: int | None = None, ) -> pd.Series: - """Integrate time series of length n and closed="left" (representing a flow) - to a time series of length n+1 and closed="both" (representing a stock), + """Integrate time series of length n and inclusive="left" (representing a flow) + to a time series of length n+1 and inclusive="both" (representing a stock), given an initial stock (i.e. the constant of integration). The unit of time is hours: i.e. the stock unit is flow unit times hours (e.g. a flow in kW becomes a stock in kWh). Optionally, set a decimal precision to round off the results (useful for tests failing over machine precision). - >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range(datetime(2001, 1, 1, 5), datetime(2001, 1, 1, 6), freq=timedelta(minutes=15), closed="left")) + >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range(datetime(2001, 1, 1, 5), datetime(2001, 1, 1, 6), freq=timedelta(minutes=15), inclusive="left")) >>> integrate_time_series(s, 10) 2001-01-01 05:00:00 10.00 2001-01-01 05:15:00 10.25 @@ -59,7 +59,7 @@ def integrate_time_series( 2001-01-01 06:00:00 12.50 Freq: D, dtype: float64 - >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range(datetime(2001, 1, 1, 5), datetime(2001, 1, 1, 7), freq=timedelta(minutes=30), closed="left")) + >>> s = pd.Series([1, 2, 3, 4], index=pd.date_range(datetime(2001, 1, 1, 5), datetime(2001, 1, 1, 7), freq=timedelta(minutes=30), inclusive="left")) >>> integrate_time_series(s, 10) 2001-01-01 05:00:00 10.0 2001-01-01 05:30:00 10.5 diff --git a/flexmeasures/utils/config_utils.py b/flexmeasures/utils/config_utils.py index bc7cb5fbe..519203d77 100644 --- a/flexmeasures/utils/config_utils.py +++ b/flexmeasures/utils/config_utils.py @@ -8,6 +8,7 @@ from flask import Flask from inflection import camelize +import pandas as pd from flexmeasures.utils.config_defaults import ( Config as DefaultConfig, @@ -45,6 +46,7 @@ def configure_logging(): """Configure and register logging""" + pd.options.display.expand_frame_repr = False # Don't wrap DataFrame representations loggingDictConfig(flexmeasures_logging_config)