diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 72d46010b..c1763f372 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -20,7 +20,7 @@ New features * DataSource table now allows storing arbitrary attributes as a JSON (without content validation), similar to the Sensor and GenericAsset tables [see `PR #750 `_] * Added API endpoints `/sensors/` for fetching a single sensor and `/sensors` (POST) for adding a sensor. [see `PR #759 `_] and [see `PR #767 `_] * The CLI now allows to set lists and dicts as asset & sensor attributes (formerly only single values) [see `PR #762 `_] -* Add `ProcessScheduler` class, which optimizes the starting time of processes using one of the following policies: INFLEXIBLE, SHIFTABLE and BREAKABLE [see `PR #729 `_] +* Add `ProcessScheduler` class to optimize the starting time of processes one of the policies developed (INFLEXIBLE, SHIFTABLE and BREAKABLE), accessible via the CLI command `flexmeasures add schedule for-process` [see `PR #729 `_ and `PR #768 `_] Bugfixes ----------- diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index 2787c7e51..92f2175bc 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -8,6 +8,8 @@ since v0.15.0 | July XX, 2023 ================================= * Allow deleting multiple sensors with a single call to ``flexmeasures delete sensor`` by passing the ``--id`` option multiple times. +* Add ``flexmeasures add schedule for-process`` to create a new process schedule for a given power sensor. + since v0.14.1 | June XX, 2023 ================================= diff --git a/documentation/cli/commands.rst b/documentation/cli/commands.rst index ee31d2d54..a050fb3e8 100644 --- a/documentation/cli/commands.rst +++ b/documentation/cli/commands.rst @@ -36,6 +36,7 @@ of which some are referred to in this documentation. ``flexmeasures add source`` Add a new data source. ``flexmeasures add forecasts`` Create forecasts. ``flexmeasures add schedule for-storage`` Create a charging schedule for a storage asset. +``flexmeasures add schedule for-process`` Create a schedule for a process asset. ``flexmeasures add holidays`` Add holiday annotations to accounts and/or assets. ``flexmeasures add annotation`` Add annotation to accounts, assets and/or sensors. ``flexmeasures add toy-account`` Create a toy account, for tutorials and trying things. diff --git a/flexmeasures/api/dev/tests/conftest.py b/flexmeasures/api/dev/tests/conftest.py index 9b98647af..540859eb3 100644 --- a/flexmeasures/api/dev/tests/conftest.py +++ b/flexmeasures/api/dev/tests/conftest.py @@ -1,5 +1,6 @@ import pytest +from flexmeasures import User from flexmeasures.api.v3_0.tests.conftest import add_incineration_line from flexmeasures.data.models.time_series import Sensor @@ -10,7 +11,7 @@ def setup_api_test_data(db, setup_roles_users, setup_generic_assets): Set up data for API dev tests. """ print("Setting up data for API dev tests on %s" % db.engine) - add_incineration_line(db, setup_roles_users["Test Supplier User"]) + add_incineration_line(db, User.query.get(setup_roles_users["Test Supplier User"])) @pytest.fixture(scope="function") @@ -23,4 +24,6 @@ def setup_api_fresh_test_data( print("Setting up fresh data for API dev tests on %s" % fresh_db.engine) for sensor in Sensor.query.all(): fresh_db.delete(sensor) - add_incineration_line(fresh_db, setup_roles_users_fresh_db["Test Supplier User"]) + add_incineration_line( + fresh_db, User.query.get(setup_roles_users_fresh_db["Test Supplier User"]) + ) diff --git a/flexmeasures/api/v3_0/tests/conftest.py b/flexmeasures/api/v3_0/tests/conftest.py index a1b13a246..59b32532a 100644 --- a/flexmeasures/api/v3_0/tests/conftest.py +++ b/flexmeasures/api/v3_0/tests/conftest.py @@ -17,7 +17,9 @@ def setup_api_test_data( Set up data for API v3.0 tests. """ print("Setting up data for API v3.0 tests on %s" % db.engine) - sensors = add_incineration_line(db, setup_roles_users["Test Supplier User"]) + sensors = add_incineration_line( + db, User.query.get(setup_roles_users["Test Supplier User"]) + ) return sensors @@ -32,7 +34,7 @@ def setup_api_fresh_test_data( for sensor in Sensor.query.all(): fresh_db.delete(sensor) sensors = add_incineration_line( - fresh_db, setup_roles_users_fresh_db["Test Supplier User"] + fresh_db, User.query.get(setup_roles_users_fresh_db["Test Supplier User"]) ) return sensors diff --git a/flexmeasures/api/v3_0/tests/test_sensor_data.py b/flexmeasures/api/v3_0/tests/test_sensor_data.py index 5af1ec722..e570a33be 100644 --- a/flexmeasures/api/v3_0/tests/test_sensor_data.py +++ b/flexmeasures/api/v3_0/tests/test_sensor_data.py @@ -44,7 +44,9 @@ def test_get_sensor_data( ): """Check the /sensors/data endpoint for fetching 1 hour of data of a 10-minute resolution sensor.""" sensor = setup_api_test_data["some gas sensor"] - source: Source = setup_roles_users["Test Supplier User"].data_source[0] + source: Source = User.query.get( + setup_roles_users["Test Supplier User"] + ).data_source[0] assert sensor.event_resolution == timedelta(minutes=10) message = { "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}", @@ -76,7 +78,9 @@ def test_get_instantaneous_sensor_data( ): """Check the /sensors/data endpoint for fetching 1 hour of data of an instantaneous sensor.""" sensor = setup_api_test_data["some temperature sensor"] - source: Source = setup_roles_users["Test Supplier User"].data_source[0] + source: Source = User.query.get( + setup_roles_users["Test Supplier User"] + ).data_source[0] assert sensor.event_resolution == timedelta(minutes=0) message = { "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}", diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 86c47ded3..c43add28c 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -5,8 +5,8 @@ from __future__ import annotations from datetime import datetime, timedelta +from typing import Type, List import isodate -from typing import Type import json from pathlib import Path from io import TextIOBase @@ -52,7 +52,9 @@ LatitudeField, LongitudeField, SensorIdField, + TimeIntervalField, ) +from flexmeasures.data.schemas.times import TimeIntervalSchema from flexmeasures.data.schemas.scheduling.storage import EfficiencyField from flexmeasures.data.schemas.sensors import SensorSchema from flexmeasures.data.schemas.units import QuantityField @@ -1169,6 +1171,137 @@ def add_schedule_for_storage( click.secho("New schedule is stored.", **MsgStyle.SUCCESS) +@create_schedule.command("for-process") +@with_appcontext +@click.option( + "--sensor-id", + "power_sensor", + type=SensorIdField(), + required=True, + help="Create schedule for this sensor. Should be a power sensor. Follow up with the sensor's ID.", +) +@click.option( + "--consumption-price-sensor", + "consumption_price_sensor", + type=SensorIdField(), + required=False, + help="Optimize consumption against this sensor. The sensor typically records an electricity price (e.g. in EUR/kWh), but this field can also be used to optimize against some emission intensity factor (e.g. in kg CO₂ eq./kWh). Follow up with the sensor's ID.", +) +@click.option( + "--start", + "start", + type=AwareDateTimeField(format="iso"), + required=True, + help="Schedule starts at this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.", +) +@click.option( + "--duration", + "duration", + type=DurationField(), + required=True, + help="Duration of schedule, after --start. Follow up with a duration in ISO 6801 format, e.g. PT1H (1 hour) or PT45M (45 minutes).", +) +@click.option( + "--process-duration", + "process_duration", + type=DurationField(), + required=True, + help="Duration of the process. Follow up with a duration in ISO 6801 format, e.g. PT1H (1 hour) or PT45M (45 minutes).", +) +@click.option( + "--process-type", + "process_type", + type=click.Choice(["INFLEXIBLE", "BREAKABLE", "SHIFTABLE"], case_sensitive=False), + required=False, + default="SHIFTABLE", + help="Process schedule policy: INFLEXIBLE, BREAKABLE or SHIFTABLE.", +) +@click.option( + "--process-power", + "process_power", + type=ur.Quantity, + required=True, + help="Constant power of the process during the activation period, e.g. 4kW.", +) +@click.option( + "--forbid", + type=TimeIntervalField(), + multiple=True, + required=False, + help="Add time restrictions to the optimization, where the load will not be scheduled into." + 'Use the following format to define the restrictions: `{"start":, "duration":}`' + "This options allows to define multiple time restrictions by using the --forbid for different periods.", +) +@click.option( + "--as-job", + is_flag=True, + help="Whether to queue a scheduling job instead of computing directly. " + "To process the job, run a worker (on any computer, but configured to the same databases) to process the 'scheduling' queue. Defaults to False.", +) +def add_schedule_process( + power_sensor: Sensor, + consumption_price_sensor: Sensor, + start: datetime, + duration: timedelta, + process_duration: timedelta, + process_type: str, + process_power: ur.Quantity, + forbid: List | None = None, + as_job: bool = False, +): + """Create a new schedule for a process asset. + + Current limitations: + - Only supports consumption blocks. + - Not taking into account grid constraints or other processes. + """ + + if forbid is None: + forbid = [] + + # Parse input and required sensor attributes + if not power_sensor.measures_power: + click.secho( + f"Sensor with ID {power_sensor.id} is not a power sensor.", + **MsgStyle.ERROR, + ) + raise click.Abort() + + end = start + duration + + process_power = convert_units(process_power.magnitude, process_power.units, "MW") # type: ignore + + scheduling_kwargs = dict( + start=start, + end=end, + belief_time=server_now(), + resolution=power_sensor.event_resolution, + flex_model={ + "duration": pd.Timedelta(process_duration).isoformat(), + "process-type": process_type, + "power": process_power, + "time-restrictions": [TimeIntervalSchema().dump(f) for f in forbid], + }, + ) + + if consumption_price_sensor is not None: + scheduling_kwargs["flex_context"] = { + "consumption-price-sensor": consumption_price_sensor.id, + } + + if as_job: + job = create_scheduling_job(sensor=power_sensor, **scheduling_kwargs) + if job: + click.secho( + f"New scheduling job {job.id} has been added to the queue.", + **MsgStyle.SUCCESS, + ) + else: + success = make_schedule(sensor_id=power_sensor.id, **scheduling_kwargs) + if success: + click.secho("New schedule is stored.", **MsgStyle.SUCCESS) + + @fm_add_data.command("report") @with_appcontext @click.option( diff --git a/flexmeasures/cli/tests/conftest.py b/flexmeasures/cli/tests/conftest.py index 1e502150e..e83b53f07 100644 --- a/flexmeasures/cli/tests/conftest.py +++ b/flexmeasures/cli/tests/conftest.py @@ -10,26 +10,42 @@ @pytest.fixture(scope="module") @pytest.mark.skip_github -def setup_dummy_data(db, app): +def setup_dummy_asset(db, app): """ - Create an asset with two sensors (1 and 2), and add the same set of 200 beliefs with an hourly resolution to each of them. - Return the two sensors and a result sensor (which has no data). + Create an Asset to add sensors to and return the id. """ - dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") - report_asset_type = GenericAssetType(name="ReportAssetType") - db.session.add_all([dummy_asset_type, report_asset_type]) + db.session.add(dummy_asset_type) dummy_asset = GenericAsset( name="DummyGenericAsset", generic_asset_type=dummy_asset_type ) + db.session.add(dummy_asset) + db.session.commit() + + return dummy_asset.id + + +@pytest.fixture(scope="module") +@pytest.mark.skip_github +def setup_dummy_data(db, app, setup_dummy_asset): + """ + Create an asset with two sensors (1 and 2), and add the same set of 200 beliefs with an hourly resolution to each of them. + Return the two sensors and a result sensor (which has no data). + """ + + report_asset_type = GenericAssetType(name="ReportAssetType") + + db.session.add(report_asset_type) pandas_report = GenericAsset( name="PandasReport", generic_asset_type=report_asset_type ) - db.session.add_all([dummy_asset, pandas_report]) + db.session.add(pandas_report) + + dummy_asset = GenericAsset.query.get(setup_dummy_asset) sensor1 = Sensor( "sensor 1", generic_asset=dummy_asset, event_resolution=timedelta(hours=1) @@ -98,20 +114,34 @@ def reporter_config_raw(app, db, setup_dummy_data): return reporter_config_raw -@pytest.fixture(scope="module") @pytest.mark.skip_github -def setup_dummy_asset(db, app): +@pytest.fixture(scope="module") +def process_power_sensor(db, app, add_market_prices): """ - Create an Asset to add sensors to and return the id. + Create an asset of type "process", power sensor to hold the result of + the scheduler and price data consisting of 8 expensive hours, 8 cheap hours, and again 8 expensive hours- + """ - dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") - db.session.add(dummy_asset_type) + process_asset_type = GenericAssetType(name="process") - dummy_asset = GenericAsset( - name="DummyGenericAsset", generic_asset_type=dummy_asset_type + db.session.add(process_asset_type) + + process_asset = GenericAsset( + name="Test Process Asset", generic_asset_type=process_asset_type ) - db.session.add(dummy_asset) + + db.session.add(process_asset) + + power_sensor = Sensor( + "power", + generic_asset=process_asset, + event_resolution=timedelta(hours=1), + unit="MW", + ) + + db.session.add(power_sensor) + db.session.commit() - return dummy_asset.id + yield power_sensor.id diff --git a/flexmeasures/cli/tests/test_data_add.py b/flexmeasures/cli/tests/test_data_add.py index 7ca6f9e36..995cc706f 100644 --- a/flexmeasures/cli/tests/test_data_add.py +++ b/flexmeasures/cli/tests/test_data_add.py @@ -213,6 +213,49 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): assert len(stored_report) == 95 +@pytest.mark.skip_github +@pytest.mark.parametrize("process_type", [("INFLEXIBLE"), ("SHIFTABLE"), ("BREAKABLE")]) +def test_add_process(app, process_power_sensor, process_type): + """ + Schedule a 4h of consumption block at a constant power of 400kW in a day using + the three process policies: INFLEXIBLE, SHIFTABLE and BREAKABLE. + """ + + from flexmeasures.cli.data_add import add_schedule_process + + epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + + process_power_sensor_id = process_power_sensor + + cli_input_params = { + "sensor-id": process_power_sensor_id, + "start": "2015-01-02T00:00:00+01:00", + "duration": "PT24H", + "process-duration": "PT4H", + "process-power": "0.4MW", + "process-type": process_type, + "consumption-price-sensor": epex_da.id, + "forbid": '{"start" : "2015-01-02T00:00:00+01:00", "duration" : "PT2H"}', + } + + cli_input = to_flags(cli_input_params) + runner = app.test_cli_runner() + + # call command + result = runner.invoke(add_schedule_process, cli_input) + + print(result) + + assert result.exit_code == 0 # run command without errors + + process_power_sensor = Sensor.query.get(process_power_sensor_id) + + schedule = process_power_sensor.search_beliefs() + # check if the schedule is not empty more detailed testing can be found + # in data/models/planning/tests/test_processs.py. + assert (schedule == -0.4).event_value.sum() == 4 + + @pytest.mark.skip_github @pytest.mark.parametrize( "event_resolution, name, success", diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index 552a74420..ba23b61c8 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -227,7 +227,7 @@ def create_roles_users(db, test_accounts) -> dict[str, User]: ), ) ) - return {user.username: user for user in new_users} + return {user.username: user.id for user in new_users} @pytest.fixture(scope="module") @@ -401,12 +401,12 @@ def setup_assets( ) -> dict[str, Asset]: """Add assets to known test users. Deprecated. Remove with Asset model.""" - + # db.session.refresh(setup_roles_users["Test Prosumer User"]) assets = [] for asset_name in ["wind-asset-1", "wind-asset-2", "solar-asset-1"]: asset = Asset( name=asset_name, - owner_id=setup_roles_users["Test Prosumer User"].id, + owner_id=setup_roles_users["Test Prosumer User"], asset_type_name="wind" if "wind" in asset_name else "solar", event_resolution=timedelta(minutes=15), capacity_in_mw=1, @@ -440,6 +440,7 @@ def setup_assets( for dt, val in zip(time_slots, values) ] db.session.add_all(beliefs) + db.session.commit() return {asset.name: asset for asset in assets} @@ -591,7 +592,7 @@ def create_test_battery_assets( test_battery = Asset( name="Test battery", - owner_id=setup_roles_users["Test Prosumer User"].id, + owner_id=setup_roles_users["Test Prosumer User"], asset_type_name="battery", event_resolution=timedelta(minutes=15), capacity_in_mw=2, @@ -609,7 +610,7 @@ def create_test_battery_assets( test_battery_no_prices = Asset( name="Test battery with no known prices", - owner_id=setup_roles_users["Test Prosumer User"].id, + owner_id=setup_roles_users["Test Prosumer User"], asset_type_name="battery", event_resolution=timedelta(minutes=15), capacity_in_mw=2, @@ -677,7 +678,7 @@ def create_charging_station_assets( charging_station = Asset( name="Test charging station", - owner_id=setup_roles_users["Test Prosumer User"].id, + owner_id=setup_roles_users["Test Prosumer User"], asset_type_name="one-way_evse", event_resolution=timedelta(minutes=15), capacity_in_mw=2, @@ -695,7 +696,7 @@ def create_charging_station_assets( bidirectional_charging_station = Asset( name="Test charging station (bidirectional)", - owner_id=setup_roles_users["Test Prosumer User"].id, + owner_id=setup_roles_users["Test Prosumer User"], asset_type_name="two-way_evse", event_resolution=timedelta(minutes=15), capacity_in_mw=2, diff --git a/flexmeasures/data/models/planning/process.py b/flexmeasures/data/models/planning/process.py index ac92f37bc..d2afd7be6 100644 --- a/flexmeasures/data/models/planning/process.py +++ b/flexmeasures/data/models/planning/process.py @@ -95,13 +95,11 @@ def compute(self) -> pd.Series | None: # convert power to energy using the resolution of the sensor. # e.g. resolution=15min, power=1kW -> energy=250W - energy = power * consumption_price_sensor.event_resolution / timedelta(hours=1) + energy = power * self.resolution / timedelta(hours=1) # we can fill duration/resolution rows or, if the duration is larger than the schedule # window, fill the entire window. - rows_to_fill = min( - ceil(duration / consumption_price_sensor.event_resolution), len(schedule) - ) + rows_to_fill = min(ceil(duration / self.resolution), len(schedule)) # duration of the process exceeds the scheduling window if rows_to_fill == len(schedule): diff --git a/flexmeasures/data/models/planning/tests/test_process.py b/flexmeasures/data/models/planning/tests/test_process.py index be6210084..aba8b3308 100644 --- a/flexmeasures/data/models/planning/tests/test_process.py +++ b/flexmeasures/data/models/planning/tests/test_process.py @@ -16,7 +16,7 @@ "process_type, optimal_start", [("INFLEXIBLE", datetime(2015, 1, 2, 0)), ("SHIFTABLE", datetime(2015, 1, 2, 8))], ) -def test_shiftable_scheduler(add_battery_assets, process, process_type, optimal_start): +def test_process_scheduler(add_battery_assets, process, process_type, optimal_start): """ Test scheduling a process of 4kW of power that last 4h using the ProcessScheduler without time restrictions. @@ -94,7 +94,7 @@ def test_duration_exceeds_planning_window( assert (schedule == 4).all() -def test_shiftable_scheduler_time_restrictions(add_battery_assets, process): +def test_process_scheduler_time_restrictions(add_battery_assets, process): """ Test ProcessScheduler with a time restrictions consisting of a block of 2h starting at 8am. The resulting schedules avoid the 8am-10am period and schedules for a valid period. diff --git a/flexmeasures/data/tests/conftest.py b/flexmeasures/data/tests/conftest.py index 98540df7b..41e065f29 100644 --- a/flexmeasures/data/tests/conftest.py +++ b/flexmeasures/data/tests/conftest.py @@ -8,6 +8,7 @@ from flask_sqlalchemy import SQLAlchemy from statsmodels.api import OLS +from flexmeasures import User from flexmeasures.data.models.annotations import Annotation from flexmeasures.data.models.assets import Asset from flexmeasures.data.models.data_sources import DataSource @@ -70,7 +71,7 @@ def setup_fresh_test_data( unit="MW", market_id=setup_markets["epex_da"].id, ) - asset.owner = setup_roles_users["Test Prosumer User"] + asset.owner = User.query.get(setup_roles_users["Test Prosumer User"]) db.session.add(asset) time_slots = pd.date_range(