diff --git a/documentation/changelog.rst b/documentation/changelog.rst
index f203ec026..b3f57f390 100644
--- a/documentation/changelog.rst
+++ b/documentation/changelog.rst
@@ -3,12 +3,15 @@
FlexMeasures Changelog
**********************
+
v0.22.0 | June XX, 2024
============================
New features
-------------
+* Add unit conversion to the input and output data of the `PandasReporter` [see `PR #1044 `_]
+ main
* Add option `droplevels` to the `PandasReporter` to drop all the levels except the `event_start` and `event_value` [see `PR #1043 `_]
* `PandasReporter` accepts the parameter `use_latest_version_only` to filter input data [see `PR #1045 `_]
diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py
index 2977b0e89..c418d0978 100644
--- a/flexmeasures/data/models/reporting/pandas_reporter.py
+++ b/flexmeasures/data/models/reporting/pandas_reporter.py
@@ -5,6 +5,7 @@
from copy import deepcopy, copy
from flask import current_app
+from flexmeasures.utils.unit_utils import convert_units
import timely_beliefs as tb
import pandas as pd
from flexmeasures.data.models.reporting import Reporter
@@ -32,6 +33,18 @@ class PandasReporter(Reporter):
data: dict[str, tb.BeliefsDataFrame | pd.DataFrame] = None
+ def _get_input_target_unit(self, name: str) -> str | None:
+ for required_input in self._config["required_input"]:
+ if name in required_input.get("name"):
+ return required_input.get("unit")
+ return None
+
+ def _get_output_target_unit(self, name: str) -> str | None:
+ for required_output in self._config["required_output"]:
+ if name in required_output.get("name"):
+ return required_output.get("unit")
+ return None
+
def fetch_data(
self,
start: datetime,
@@ -91,6 +104,14 @@ def fetch_data(
for source in bdf.sources.unique():
self.data[f"source_{source.id}"] = source
+ unit = self._get_input_target_unit(name)
+ if unit is not None:
+ bdf *= convert_units(
+ 1,
+ from_unit=sensor.unit,
+ to_unit=unit,
+ event_resolution=sensor.event_resolution,
+ )
if droplevels:
# dropping belief_time, source and cummulative_probability columns
bdf = bdf.droplevel([1, 2, 3])
@@ -129,7 +150,6 @@ def _compute_report(self, **kwargs) -> list[dict[str, Any]]:
if belief_time is None:
belief_time = server_now()
-
# apply pandas transformations to the dataframes in `self.data`
self._apply_transformations()
@@ -156,6 +176,14 @@ def _compute_report(self, **kwargs) -> list[dict[str, Any]]:
output_data = self._clean_belief_series(
output_data, belief_time, belief_horizon
)
+ output_unit = self._get_output_target_unit(name)
+ if output_unit is not None:
+ output_data *= convert_units(
+ 1,
+ from_unit=output_unit,
+ to_unit=output_data.sensor.unit,
+ event_resolution=output_data.sensor.event_resolution,
+ )
result["data"] = output_data
diff --git a/flexmeasures/data/models/reporting/tests/conftest.py b/flexmeasures/data/models/reporting/tests/conftest.py
index a9f3516f7..abfeca507 100644
--- a/flexmeasures/data/models/reporting/tests/conftest.py
+++ b/flexmeasures/data/models/reporting/tests/conftest.py
@@ -123,26 +123,43 @@ def setup_dummy_data(db, app, generic_report):
db.session.add(dummy_asset)
- sensor1 = Sensor("sensor 1", generic_asset=dummy_asset, event_resolution="1h")
+ sensor1 = Sensor(
+ "sensor 1",
+ generic_asset=dummy_asset,
+ event_resolution=timedelta(hours=1),
+ unit="kW",
+ )
db.session.add(sensor1)
- sensor2 = Sensor("sensor 2", generic_asset=dummy_asset, event_resolution="1h")
+ sensor2 = Sensor(
+ "sensor 2", generic_asset=dummy_asset, event_resolution=timedelta(hours=1)
+ )
db.session.add(sensor2)
sensor3 = Sensor(
"sensor 3",
generic_asset=dummy_asset,
- event_resolution="1h",
+ event_resolution=timedelta(hours=1),
timezone="Europe/Amsterdam",
)
db.session.add(sensor3)
+ sensor4 = Sensor(
+ "sensor 4",
+ generic_asset=dummy_asset,
+ event_resolution=timedelta(minutes=15),
+ timezone="Europe/Amsterdam",
+ unit="kW",
+ )
+ db.session.add(sensor4)
report_sensor = Sensor(
- "report sensor", generic_asset=generic_report, event_resolution="1h"
+ "report sensor",
+ generic_asset=generic_report,
+ event_resolution=timedelta(hours=1),
)
db.session.add(report_sensor)
daily_report_sensor = Sensor(
"daily report sensor",
generic_asset=generic_report,
- event_resolution="1D",
+ event_resolution=timedelta(days=1),
timezone="Europe/Amsterdam",
)
@@ -161,7 +178,6 @@ def setup_dummy_data(db, app, generic_report):
for sensor in [sensor1, sensor2]:
for si, source in enumerate([source1, source2]):
for t in range(10):
- print(si)
beliefs.append(
TimedBelief(
event_start=datetime(2023, 4, 10, tzinfo=utc)
@@ -246,7 +262,19 @@ def setup_dummy_data(db, app, generic_report):
)
)
+ # add data for sensor 4
+ for t in range(24 * 3):
+ beliefs.append(
+ TimedBelief(
+ event_start=datetime(2023, 1, 1, tzinfo=utc) + timedelta(hours=t),
+ belief_horizon=timedelta(hours=24),
+ event_value=1,
+ sensor=sensor4,
+ source=source1,
+ )
+ )
+
db.session.add_all(beliefs)
db.session.commit()
- yield sensor1, sensor2, sensor3, report_sensor, daily_report_sensor
+ yield sensor1, sensor2, sensor3, sensor4, report_sensor, daily_report_sensor
diff --git a/flexmeasures/data/models/reporting/tests/test_aggregator.py b/flexmeasures/data/models/reporting/tests/test_aggregator.py
index 6d1a85554..cee177bcb 100644
--- a/flexmeasures/data/models/reporting/tests/test_aggregator.py
+++ b/flexmeasures/data/models/reporting/tests/test_aggregator.py
@@ -36,7 +36,7 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db):
7) prod: -1 = (1) * (-1)
8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2
"""
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
agg_reporter = AggregatorReporter(method=aggregation_method)
@@ -64,7 +64,7 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value, db):
def test_aggregator_reporter_weights(
setup_dummy_data, weight_1, weight_2, expected_result, db
):
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
reporter_config = dict(method="sum", weights={"s1": weight_1, "sensor_2": weight_2})
@@ -91,7 +91,7 @@ def test_aggregator_reporter_weights(
def test_dst_transition(setup_dummy_data, db):
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
agg_reporter = AggregatorReporter()
@@ -121,7 +121,7 @@ def test_dst_transition(setup_dummy_data, db):
def test_resampling(setup_dummy_data, db):
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
agg_reporter = AggregatorReporter()
@@ -165,7 +165,7 @@ def test_source_transition(setup_dummy_data, db):
array is prioritized.
"""
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
agg_reporter = AggregatorReporter()
diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py
index 64c8ba7fd..d9898c64c 100644
--- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py
+++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py
@@ -6,7 +6,7 @@
def test_reporter(app, setup_dummy_data):
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
reporter_config = dict(
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
@@ -76,7 +76,7 @@ def test_reporter(app, setup_dummy_data):
def test_reporter_repeated(setup_dummy_data):
"""check that calling compute doesn't change the result"""
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
reporter_config = dict(
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
@@ -129,7 +129,7 @@ def test_reporter_repeated(setup_dummy_data):
def test_reporter_empty(setup_dummy_data):
"""check that calling compute with missing data returns an empty report"""
- s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
config = dict(
required_input=[{"name": "sensor_1"}],
@@ -159,3 +159,68 @@ def test_reporter_empty(setup_dummy_data):
)
assert report[0]["data"].empty
+
+
+def test_pandas_reporter_unit_conversion(app, setup_dummy_data):
+ """
+ Check that the unit conversion feature can handle the following cases:
+ - kW -> kW
+ - kW -> MW
+ - kW -> MWh
+ - kW -> W -> kW
+ """
+ s1, s2, s3, s4, report_sensor, daily_report_sensor = setup_dummy_data
+
+ reporter_config = dict(
+ required_input=[
+ {"name": "sensor_4"},
+ {"name": "sensor_4_kw"},
+ {"name": "sensor_4_mw", "unit": "MW"},
+ {"name": "sensor_4_mwh", "unit": "MWh"},
+ ],
+ required_output=[
+ {"name": "sensor_4_kw"},
+ {"name": "sensor_4_mw"},
+ {"name": "sensor_4_mwh"},
+ # Assume that the internal operations that produce sensor_4_output_w have "W"
+ {"name": "sensor_4_output_w", "unit": "W"},
+ ],
+ transformations=[
+ {"df_input": "sensor_4", "method": "copy", "df_output": "sensor_4_output_w"}
+ ],
+ )
+
+ reporter = PandasReporter(config=reporter_config)
+
+ start = datetime(2023, 1, 1, tzinfo=utc)
+ end = datetime(2023, 1, 2, tzinfo=utc)
+ input = [
+ dict(name="sensor_4", sensor=s4),
+ dict(name="sensor_4_kw", sensor=s4),
+ dict(name="sensor_4_mw", sensor=s4),
+ dict(name="sensor_4_mwh", sensor=s4),
+ ]
+ output = [
+ dict(name="sensor_4_kw", sensor=s4),
+ dict(name="sensor_4_mw", sensor=s4),
+ dict(name="sensor_4_mwh", sensor=s4),
+ dict(name="sensor_4_output_w", sensor=s4),
+ ]
+
+ report = reporter.compute(start=start, end=end, input=input, output=output)
+ result_kw = report[0]["data"]
+ result_mw = report[1]["data"]
+ result_mwh = report[2]["data"]
+ result_output_w = report[3]["data"]
+
+ # MW = kW / 1000
+ assert (result_mw.event_value.values == result_kw.event_value.values / 1000).all()
+
+ # MWh = MW * 0.25 (resolution = 15 min)
+ assert (result_mwh.event_value.values == result_mw.event_value.values * 0.25).all()
+
+ # Input is in kW; the operations transform the data to produce values in W and it transforms the values to the output sensor unit (kW).
+ # In summary, Input = 1 kW -(copy the values)-> 1 W -> 0.001 kW
+ assert (
+ result_output_w.event_value.values == result_kw.event_value.values * 0.001
+ ).all()
diff --git a/flexmeasures/data/schemas/io.py b/flexmeasures/data/schemas/io.py
index 4c202d137..5d1043fa5 100644
--- a/flexmeasures/data/schemas/io.py
+++ b/flexmeasures/data/schemas/io.py
@@ -8,6 +8,7 @@
class RequiredInput(Schema):
name = fields.Str(required=True)
+ unit = fields.Str(required=False)
class Input(Schema):
@@ -68,3 +69,4 @@ class Output(Schema):
class RequiredOutput(Schema):
name = fields.Str(required=True)
column = fields.Str(required=False)
+ unit = fields.Str(required=False)
diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py
index 68954b58e..c0b5e3374 100644
--- a/flexmeasures/data/schemas/reporting/pandas_reporter.py
+++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py
@@ -56,10 +56,10 @@ class PandasReporterConfigSchema(ReporterConfigSchema):
{
"required_input" : [
- {"name" : "df1}
+ {"name" : "df1", "unit" : "MWh"}
],
"required_output" : [
- {"name" : "df2"}
+ {"name" : "df2", "unit" : "kWh"}
],
"transformations" : [
{