diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 34587cafd..e406f77e6 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -10,7 +10,6 @@ New features ------------- * Add multiple maxima and minima constraints into `StorageScheduler` [see `PR #680 `_] -* Introduction of the classes `Reporter` and `PandasReporter` [see `PR #641 `_] * Add CLI command ``flexmeasures add report`` [see `PR #659 `_] * Add CLI command ``flexmeasures show reporters`` [see `PR #686 `_] * Add CLI command ``flexmeasures show schedulers`` [see `PR #708 `_] @@ -22,6 +21,7 @@ Bugfixes Infrastructure / Support ---------------------- +* Introduction of the classes `Reporter`, `PandasReporter` and `AggregatorReporter` to help customize your own reporter functions (experimental) [see `PR #641 `_ and `PR #712 `_] * The setting FLEXMEASURES_PLUGINS can be set as environment variable now (as a comma-separated list) [see `PR #660 `_] * Packaging was modernized to stop calling setup.py directly [see `PR #671 `_] * Remove API versions 1.0, 1.1, 1.2, 1.3 and 2.0, while allowing hosts to switch between ``HTTP status 410 (Gone)`` and ``HTTP status 404 (Not Found)`` responses [see `PR #667 `_] diff --git a/flexmeasures/data/models/reporting/aggregator.py b/flexmeasures/data/models/reporting/aggregator.py new file mode 100644 index 000000000..e74596ae0 --- /dev/null +++ b/flexmeasures/data/models/reporting/aggregator.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +import timely_beliefs as tb +import pandas as pd + +from flexmeasures.data.models.reporting import Reporter +from flexmeasures.data.schemas.reporting.aggregation import AggregatorSchema + +from flexmeasures.utils.time_utils import server_now + + +class AggregatorReporter(Reporter): + """This reporter applies an aggregation function to multiple sensors""" + + __version__ = "1" + __author__ = "Seita" + schema = AggregatorSchema() + weights: dict + method: str + + def deserialize_config(self): + # call Reporter deserialize_config + super().deserialize_config() + + # extract AggregatorReporter specific fields + self.method = self.reporter_config.get("method") + self.weights = self.reporter_config.get("weights", dict()) + + def _compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta | None = None, + belief_time: datetime | None = None, + ) -> tb.BeliefsDataFrame: + """ + This method merges all the BeliefDataFrames into a single one, dropping + all indexes but event_start, and applies an aggregation function over the + columns. + """ + + dataframes = [] + + if belief_time is None: + belief_time = server_now() + + for belief_search_config in self.beliefs_search_configs: + # if alias is not in belief_search_config, using the Sensor id instead + column_name = belief_search_config.get( + "alias", f"sensor_{belief_search_config['sensor'].id}" + ) + data = self.data[column_name].droplevel([1, 2, 3]) + + # apply weight + if column_name in self.weights: + data *= self.weights[column_name] + + dataframes.append(data) + + output_df = pd.concat(dataframes, axis=1) + + # apply aggregation method + output_df = output_df.aggregate(self.method, axis=1) + + # convert BeliefsSeries into a BeliefsDataFrame + output_df = output_df.to_frame("event_value") + output_df["belief_time"] = belief_time + output_df["cumulative_probability"] = 0.5 + output_df["source"] = self.data_source + + output_df = output_df.set_index( + ["belief_time", "source", "cumulative_probability"], append=True + ) + + return output_df diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index 3b98bb2f6..4169cfd69 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -18,7 +18,7 @@ class PandasReporter(Reporter): """This reporter applies a series of pandas methods on""" __version__ = "1" - __author__ = None + __author__ = "Seita" schema = PandasReporterConfigSchema() transformations: list[dict[str, Any]] = None final_df_output: str = None diff --git a/flexmeasures/data/models/reporting/tests/conftest.py b/flexmeasures/data/models/reporting/tests/conftest.py index 347196842..a75a1a465 100644 --- a/flexmeasures/data/models/reporting/tests/conftest.py +++ b/flexmeasures/data/models/reporting/tests/conftest.py @@ -63,6 +63,20 @@ def setup_dummy_data(db, app): ) ) + # add simple data for testing the AggregatorReporter: + # 24 hourly events with value 1 for sensor1 and value -1 for sensor2 + for sensor, source, value in zip([sensor1, sensor2], [source1, source2], [1, -1]): + for t in range(24): + beliefs.append( + TimedBelief( + event_start=datetime(2023, 5, 10, tzinfo=utc) + timedelta(hours=t), + belief_horizon=timedelta(hours=24), + event_value=value, + sensor=sensor, + source=source, + ) + ) + db.session.add_all(beliefs) db.session.commit() diff --git a/flexmeasures/data/models/reporting/tests/test_aggregator.py b/flexmeasures/data/models/reporting/tests/test_aggregator.py new file mode 100644 index 000000000..8cd287fa4 --- /dev/null +++ b/flexmeasures/data/models/reporting/tests/test_aggregator.py @@ -0,0 +1,60 @@ +import pytest + +from flexmeasures.data.models.reporting.aggregator import AggregatorReporter + +from datetime import datetime +from pytz import utc + + +@pytest.mark.parametrize( + "aggregation_method, expected_value", + [ + ("sum", 0), + ("mean", 0), + ("var", 2), + ("std", 2**0.5), + ("max", 1), + ("min", -1), + ("prod", -1), + ("median", 0), + ], +) +def test_aggregator(setup_dummy_data, aggregation_method, expected_value): + """ + This test computes the aggregation of two sensors containing 24 entries + with value 1 and -1, respectively, for sensors 1 and 2. + + Test cases: + 1) sum: 0 = 1 + (-1) + 2) mean: 0 = ((1) + (-1))/2 + 3) var: 2 = (1)^2 + (-1)^2 + 4) std: sqrt(2) = sqrt((1)^2 + (-1)^2) + 5) max: 1 = max(1, -1) + 6) min: -1 = min(1, -1) + 7) prod: -1 = (1) * (-1) + 8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2 + """ + s1, s2, reporter_sensor = setup_dummy_data + + reporter_config_raw = dict( + beliefs_search_configs=[ + dict(sensor=s1.id, source=1), + dict(sensor=s2.id, source=2), + ], + method=aggregation_method, + ) + + agg_reporter = AggregatorReporter( + reporter_sensor, reporter_config_raw=reporter_config_raw + ) + + result = agg_reporter.compute( + start=datetime(2023, 5, 10, tzinfo=utc), + end=datetime(2023, 5, 11, tzinfo=utc), + ) + + # check that we got a result for 24 hours + assert len(result) == 24 + + # check that the value is equal to expected_value + assert (result == expected_value).all().event_value diff --git a/flexmeasures/data/schemas/reporting/aggregation.py b/flexmeasures/data/schemas/reporting/aggregation.py new file mode 100644 index 000000000..a42c6d7b9 --- /dev/null +++ b/flexmeasures/data/schemas/reporting/aggregation.py @@ -0,0 +1,58 @@ +from marshmallow import fields, ValidationError, validates_schema + +from flexmeasures.data.schemas.reporting import ReporterConfigSchema + + +class AggregatorSchema(ReporterConfigSchema): + """Schema for the reporter_config of the AggregatorReporter + + Example: + .. code-block:: json + { + "beliefs_search_configs": [ + { + "sensor": 1, + "source" : 1, + "alias" : "pv" + }, + { + "sensor": 1, + "source" : 2, + "alias" : "consumption" + } + ], + "method" : "sum", + "weights" : { + "pv" : 1.0, + "consumption" : -1.0 + } + } + """ + + method = fields.Str(required=False, dump_default="sum") + weights = fields.Dict(fields.Str(), fields.Float(), required=False) + + @validates_schema + def validate_source(self, data, **kwargs): + + for beliefs_search_config in data["beliefs_search_configs"]: + if "source" not in beliefs_search_config: + raise ValidationError("`source` is a required field.") + + @validates_schema + def validate_weights(self, data, **kwargs): + if "weights" not in data: + return + + # get aliases + aliases = [] + for beliefs_search_config in data["beliefs_search_configs"]: + if "alias" in beliefs_search_config: + aliases.append(beliefs_search_config.get("alias")) + + # check that the aliases in weights are defined + for alias in data.get("weights").keys(): + if alias not in aliases: + raise ValidationError( + f"alias `{alias}` in `weights` is not defined in `beliefs_search_config`" + )