diff --git a/documentation/changelog.rst b/documentation/changelog.rst index f203ec026..b3f57f390 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -3,12 +3,15 @@ FlexMeasures Changelog ********************** + v0.22.0 | June XX, 2024 ============================ New features ------------- +* Add unit conversion to the input and output data of the `PandasReporter` [see `PR #1044 `_] + main * Add option `droplevels` to the `PandasReporter` to drop all the levels except the `event_start` and `event_value` [see `PR #1043 `_] * `PandasReporter` accepts the parameter `use_latest_version_only` to filter input data [see `PR #1045 `_] diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index 2977b0e89..c418d0978 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -5,6 +5,7 @@ from copy import deepcopy, copy from flask import current_app +from flexmeasures.utils.unit_utils import convert_units import timely_beliefs as tb import pandas as pd from flexmeasures.data.models.reporting import Reporter @@ -32,6 +33,18 @@ class PandasReporter(Reporter): data: dict[str, tb.BeliefsDataFrame | pd.DataFrame] = None + def _get_input_target_unit(self, name: str) -> str | None: + for required_input in self._config["required_input"]: + if name in required_input.get("name"): + return required_input.get("unit") + return None + + def _get_output_target_unit(self, name: str) -> str | None: + for required_output in self._config["required_output"]: + if name in required_output.get("name"): + return required_output.get("unit") + return None + def fetch_data( self, start: datetime, @@ -91,6 +104,14 @@ def fetch_data( for source in bdf.sources.unique(): self.data[f"source_{source.id}"] = source + unit = self._get_input_target_unit(name) + if unit is not None: + bdf *= convert_units( + 1, + from_unit=sensor.unit, + to_unit=unit, + event_resolution=sensor.event_resolution, + ) if droplevels: # dropping belief_time, source and cummulative_probability columns bdf = bdf.droplevel([1, 2, 3]) @@ -129,7 +150,6 @@ def _compute_report(self, **kwargs) -> list[dict[str, Any]]: if belief_time is None: belief_time = server_now() - # apply pandas transformations to the dataframes in `self.data` self._apply_transformations() @@ -156,6 +176,14 @@ def _compute_report(self, **kwargs) -> list[dict[str, Any]]: output_data = self._clean_belief_series( output_data, belief_time, belief_horizon ) + output_unit = self._get_output_target_unit(name) + if output_unit is not None: + output_data *= convert_units( + 1, + from_unit=output_unit, + to_unit=output_data.sensor.unit, + event_resolution=output_data.sensor.event_resolution, + ) result["data"] = output_data diff --git a/flexmeasures/data/models/reporting/tests/conftest.py b/flexmeasures/data/models/reporting/tests/conftest.py index a9f3516f7..abfeca507 100644 --- a/flexmeasures/data/models/reporting/tests/conftest.py +++ b/flexmeasures/data/models/reporting/tests/conftest.py @@ -123,26 +123,43 @@ def setup_dummy_data(db, app, generic_report): db.session.add(dummy_asset) - sensor1 = Sensor("sensor 1", generic_asset=dummy_asset, event_resolution="1h") + sensor1 = Sensor( + "sensor 1", + generic_asset=dummy_asset, + event_resolution=timedelta(hours=1), + unit="kW", + ) db.session.add(sensor1) - sensor2 = Sensor("sensor 2", generic_asset=dummy_asset, event_resolution="1h") + sensor2 = Sensor( + "sensor 2", generic_asset=dummy_asset, event_resolution=timedelta(hours=1) + ) db.session.add(sensor2) sensor3 = Sensor( "sensor 3", generic_asset=dummy_asset, - event_resolution="1h", + event_resolution=timedelta(hours=1), timezone="Europe/Amsterdam", ) db.session.add(sensor3) + sensor4 = Sensor( + "sensor 4", + generic_asset=dummy_asset, + event_resolution=timedelta(minutes=15), + timezone="Europe/Amsterdam", + unit="kW", + ) + db.session.add(sensor4) report_sensor = Sensor( - "report sensor", generic_asset=generic_report, event_resolution="1h" + "report sensor", + generic_asset=generic_report, + event_resolution=timedelta(hours=1), ) db.session.add(report_sensor) daily_report_sensor = Sensor( "daily report sensor", generic_asset=generic_report, - event_resolution="1D", + event_resolution=timedelta(days=1), timezone="Europe/Amsterdam", ) @@ -161,7 +178,6 @@ def setup_dummy_data(db, app, generic_report): for sensor in [sensor1, sensor2]: for si, source in enumerate([source1, source2]): for t in range(10): - print(si) beliefs.append( TimedBelief( event_start=datetime(2023, 4, 10, tzinfo=utc) @@ -246,7 +262,19 @@ def setup_dummy_data(db, app, generic_report): ) ) + # add data for sensor 4 + for t in range(24 * 3): + beliefs.append( + TimedBelief( + event_start=datetime(2023, 1, 1, tzinfo=utc) + timedelta(hours=t), + belief_horizon=timedelta(hours=24), + event_value=1, + sensor=sensor4, + source=source1, + ) + ) + db.session.add_all(beliefs) db.session.commit() - yield sensor1, sensor2, sensor3, report_sensor, daily_report_sensor + yield sensor1, sensor2, sensor3, sensor4, report_sensor, daily_report_sensor diff --git a/flexmeasures/data/models/reporting/tests/test_aggregator.py b/flexmeasures/data/models/reporting/tests/test_aggregator.py index 6d1a85554..cee177bcb 100644 --- a/flexmeasures/data/models/reporting/tests/test_aggregator.py +++ b/flexmeasures/data/models/reporting/tests/test_aggregator.py @@ -36,7 +36,7 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db): 7) prod: -1 = (1) * (-1) 8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2 """ - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data agg_reporter = AggregatorReporter(method=aggregation_method) @@ -64,7 +64,7 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db): def test_aggregator_reporter_weights( setup_dummy_data, weight_1, weight_2, expected_result, db ): - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data reporter_config = dict(method="sum", weights={"s1": weight_1, "sensor_2": weight_2}) @@ -91,7 +91,7 @@ def test_aggregator_reporter_weights( def test_dst_transition(setup_dummy_data, db): - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data agg_reporter = AggregatorReporter() @@ -121,7 +121,7 @@ def test_dst_transition(setup_dummy_data, db): def test_resampling(setup_dummy_data, db): - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data agg_reporter = AggregatorReporter() @@ -165,7 +165,7 @@ def test_source_transition(setup_dummy_data, db): array is prioritized. """ - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data agg_reporter = AggregatorReporter() diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 64c8ba7fd..d9898c64c 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -6,7 +6,7 @@ def test_reporter(app, setup_dummy_data): - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data reporter_config = dict( required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], @@ -76,7 +76,7 @@ def test_reporter(app, setup_dummy_data): def test_reporter_repeated(setup_dummy_data): """check that calling compute doesn't change the result""" - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data reporter_config = dict( required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], @@ -129,7 +129,7 @@ def test_reporter_repeated(setup_dummy_data): def test_reporter_empty(setup_dummy_data): """check that calling compute with missing data returns an empty report""" - s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data config = dict( required_input=[{"name": "sensor_1"}], @@ -159,3 +159,68 @@ def test_reporter_empty(setup_dummy_data): ) assert report[0]["data"].empty + + +def test_pandas_reporter_unit_conversion(app, setup_dummy_data): + """ + Check that the unit conversion feature can handle the following cases: + - kW -> kW + - kW -> MW + - kW -> MWh + - kW -> W -> kW + """ + s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data + + reporter_config = dict( + required_input=[ + {"name": "sensor_4"}, + {"name": "sensor_4_kw"}, + {"name": "sensor_4_mw", "unit": "MW"}, + {"name": "sensor_4_mwh", "unit": "MWh"}, + ], + required_output=[ + {"name": "sensor_4_kw"}, + {"name": "sensor_4_mw"}, + {"name": "sensor_4_mwh"}, + # Assume that the internal operations that produce sensor_4_output_w have "W" + {"name": "sensor_4_output_w", "unit": "W"}, + ], + transformations=[ + {"df_input": "sensor_4", "method": "copy", "df_output": "sensor_4_output_w"} + ], + ) + + reporter = PandasReporter(config=reporter_config) + + start = datetime(2023, 1, 1, tzinfo=utc) + end = datetime(2023, 1, 2, tzinfo=utc) + input = [ + dict(name="sensor_4", sensor=s4), + dict(name="sensor_4_kw", sensor=s4), + dict(name="sensor_4_mw", sensor=s4), + dict(name="sensor_4_mwh", sensor=s4), + ] + output = [ + dict(name="sensor_4_kw", sensor=s4), + dict(name="sensor_4_mw", sensor=s4), + dict(name="sensor_4_mwh", sensor=s4), + dict(name="sensor_4_output_w", sensor=s4), + ] + + report = reporter.compute(start=start, end=end, input=input, output=output) + result_kw = report[0]["data"] + result_mw = report[1]["data"] + result_mwh = report[2]["data"] + result_output_w = report[3]["data"] + + # MW = kW / 1000 + assert (result_mw.event_value.values == result_kw.event_value.values / 1000).all() + + # MWh = MW * 0.25 (resolution = 15 min) + assert (result_mwh.event_value.values == result_mw.event_value.values * 0.25).all() + + # Input is in kW; the operations transform the data to produce values in W and it transforms the values to the output sensor unit (kW). + # In summary, Input = 1 kW -(copy the values)-> 1 W -> 0.001 kW + assert ( + result_output_w.event_value.values == result_kw.event_value.values * 0.001 + ).all() diff --git a/flexmeasures/data/schemas/io.py b/flexmeasures/data/schemas/io.py index 4c202d137..5d1043fa5 100644 --- a/flexmeasures/data/schemas/io.py +++ b/flexmeasures/data/schemas/io.py @@ -8,6 +8,7 @@ class RequiredInput(Schema): name = fields.Str(required=True) + unit = fields.Str(required=False) class Input(Schema): @@ -68,3 +69,4 @@ class Output(Schema): class RequiredOutput(Schema): name = fields.Str(required=True) column = fields.Str(required=False) + unit = fields.Str(required=False) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index 68954b58e..c0b5e3374 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -56,10 +56,10 @@ class PandasReporterConfigSchema(ReporterConfigSchema): { "required_input" : [ - {"name" : "df1} + {"name" : "df1", "unit" : "MWh"} ], "required_output" : [ - {"name" : "df2"} + {"name" : "df2", "unit" : "kWh"} ], "transformations" : [ {