Skip to content

Commit

Permalink
feat: add AggregatorReporter (#712)
Browse files Browse the repository at this point in the history
* fix: get fresh sensor instance to avoid sqlalchemy.exc.InvalidRequestError

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fx: handle the case of not finding the sensor in the datbase

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: add AggregatorReporter

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: add fixture

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix: typo in file name

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* Set author in PandasReporter and AggregatorReporter

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* style: remove unnecesay class property

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* test: add description of test

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix: vectorized bdf creation

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* style: improve Exception message

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* fix: lowercase enum value

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: make method and weights optional

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: set SUM as the feault aggregation method

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* typo

Signed-off-by: F.N. Claessen <felix@seita.nl>

* refactor: simplify column value assignments

Signed-off-by: F.N. Claessen <felix@seita.nl>

* typo

Signed-off-by: F.N. Claessen <felix@seita.nl>

* fix: use aggregate function instead of getattr

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: allow users to pass any string

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* test: add more test cases

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* feat: weights and method as class attribute

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>

* simplify float to int

Signed-off-by: F.N. Claessen <felix@seita.nl>

* changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* black

Signed-off-by: F.N. Claessen <felix@seita.nl>

---------

Signed-off-by: Victor Garcia Reolid <victor@seita.nl>
Signed-off-by: F.N. Claessen <felix@seita.nl>
Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
victorgarcia98 and Flix6x committed Jun 11, 2023
1 parent cfb662d commit 583fc87
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 2 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -10,7 +10,6 @@ New features
-------------

* Add multiple maxima and minima constraints into `StorageScheduler` [see `PR #680 <https://www.github.com/FlexMeasures/flexmeasures/pull/680>`_]
* Introduction of the classes `Reporter` and `PandasReporter` [see `PR #641 <https://www.github.com/FlexMeasures/flexmeasures/pull/641>`_]
* Add CLI command ``flexmeasures add report`` [see `PR #659 <https://www.github.com/FlexMeasures/flexmeasures/pull/659>`_]
* Add CLI command ``flexmeasures show reporters`` [see `PR #686 <https://www.github.com/FlexMeasures/flexmeasures/pull/686>`_]
* Add CLI command ``flexmeasures show schedulers`` [see `PR #708 <https://github.com/FlexMeasures/flexmeasures/pull/708>`_]
Expand All @@ -22,6 +21,7 @@ Bugfixes
Infrastructure / Support
----------------------

* Introduction of the classes `Reporter`, `PandasReporter` and `AggregatorReporter` to help customize your own reporter functions (experimental) [see `PR #641 <https://www.github.com/FlexMeasures/flexmeasures/pull/641>`_ and `PR #712 <https://www.github.com/FlexMeasures/flexmeasures/pull/712>`_]
* The setting FLEXMEASURES_PLUGINS can be set as environment variable now (as a comma-separated list) [see `PR #660 <https://www.github.com/FlexMeasures/flexmeasures/pull/660>`_]
* Packaging was modernized to stop calling setup.py directly [see `PR #671 <https://www.github.com/FlexMeasures/flexmeasures/pull/671>`_]
* Remove API versions 1.0, 1.1, 1.2, 1.3 and 2.0, while allowing hosts to switch between ``HTTP status 410 (Gone)`` and ``HTTP status 404 (Not Found)`` responses [see `PR #667 <https://www.github.com/FlexMeasures/flexmeasures/pull/667>`_]
Expand Down
77 changes: 77 additions & 0 deletions flexmeasures/data/models/reporting/aggregator.py
@@ -0,0 +1,77 @@
from __future__ import annotations

from datetime import datetime, timedelta

import timely_beliefs as tb
import pandas as pd

from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.schemas.reporting.aggregation import AggregatorSchema

from flexmeasures.utils.time_utils import server_now


class AggregatorReporter(Reporter):
"""This reporter applies an aggregation function to multiple sensors"""

__version__ = "1"
__author__ = "Seita"
schema = AggregatorSchema()
weights: dict
method: str

def deserialize_config(self):
# call Reporter deserialize_config
super().deserialize_config()

# extract AggregatorReporter specific fields
self.method = self.reporter_config.get("method")
self.weights = self.reporter_config.get("weights", dict())

def _compute(
self,
start: datetime,
end: datetime,
input_resolution: timedelta | None = None,
belief_time: datetime | None = None,
) -> tb.BeliefsDataFrame:
"""
This method merges all the BeliefDataFrames into a single one, dropping
all indexes but event_start, and applies an aggregation function over the
columns.
"""

dataframes = []

if belief_time is None:
belief_time = server_now()

for belief_search_config in self.beliefs_search_configs:
# if alias is not in belief_search_config, using the Sensor id instead
column_name = belief_search_config.get(
"alias", f"sensor_{belief_search_config['sensor'].id}"
)
data = self.data[column_name].droplevel([1, 2, 3])

# apply weight
if column_name in self.weights:
data *= self.weights[column_name]

dataframes.append(data)

output_df = pd.concat(dataframes, axis=1)

# apply aggregation method
output_df = output_df.aggregate(self.method, axis=1)

# convert BeliefsSeries into a BeliefsDataFrame
output_df = output_df.to_frame("event_value")
output_df["belief_time"] = belief_time
output_df["cumulative_probability"] = 0.5
output_df["source"] = self.data_source

output_df = output_df.set_index(
["belief_time", "source", "cumulative_probability"], append=True
)

return output_df
2 changes: 1 addition & 1 deletion flexmeasures/data/models/reporting/pandas_reporter.py
Expand Up @@ -18,7 +18,7 @@ class PandasReporter(Reporter):
"""This reporter applies a series of pandas methods on"""

__version__ = "1"
__author__ = None
__author__ = "Seita"
schema = PandasReporterConfigSchema()
transformations: list[dict[str, Any]] = None
final_df_output: str = None
Expand Down
14 changes: 14 additions & 0 deletions flexmeasures/data/models/reporting/tests/conftest.py
Expand Up @@ -63,6 +63,20 @@ def setup_dummy_data(db, app):
)
)

# add simple data for testing the AggregatorReporter:
# 24 hourly events with value 1 for sensor1 and value -1 for sensor2
for sensor, source, value in zip([sensor1, sensor2], [source1, source2], [1, -1]):
for t in range(24):
beliefs.append(
TimedBelief(
event_start=datetime(2023, 5, 10, tzinfo=utc) + timedelta(hours=t),
belief_horizon=timedelta(hours=24),
event_value=value,
sensor=sensor,
source=source,
)
)

db.session.add_all(beliefs)
db.session.commit()

Expand Down
60 changes: 60 additions & 0 deletions flexmeasures/data/models/reporting/tests/test_aggregator.py
@@ -0,0 +1,60 @@
import pytest

from flexmeasures.data.models.reporting.aggregator import AggregatorReporter

from datetime import datetime
from pytz import utc


@pytest.mark.parametrize(
"aggregation_method, expected_value",
[
("sum", 0),
("mean", 0),
("var", 2),
("std", 2**0.5),
("max", 1),
("min", -1),
("prod", -1),
("median", 0),
],
)
def test_aggregator(setup_dummy_data, aggregation_method, expected_value):
"""
This test computes the aggregation of two sensors containing 24 entries
with value 1 and -1, respectively, for sensors 1 and 2.
Test cases:
1) sum: 0 = 1 + (-1)
2) mean: 0 = ((1) + (-1))/2
3) var: 2 = (1)^2 + (-1)^2
4) std: sqrt(2) = sqrt((1)^2 + (-1)^2)
5) max: 1 = max(1, -1)
6) min: -1 = min(1, -1)
7) prod: -1 = (1) * (-1)
8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2
"""
s1, s2, reporter_sensor = setup_dummy_data

reporter_config_raw = dict(
beliefs_search_configs=[
dict(sensor=s1.id, source=1),
dict(sensor=s2.id, source=2),
],
method=aggregation_method,
)

agg_reporter = AggregatorReporter(
reporter_sensor, reporter_config_raw=reporter_config_raw
)

result = agg_reporter.compute(
start=datetime(2023, 5, 10, tzinfo=utc),
end=datetime(2023, 5, 11, tzinfo=utc),
)

# check that we got a result for 24 hours
assert len(result) == 24

# check that the value is equal to expected_value
assert (result == expected_value).all().event_value
58 changes: 58 additions & 0 deletions flexmeasures/data/schemas/reporting/aggregation.py
@@ -0,0 +1,58 @@
from marshmallow import fields, ValidationError, validates_schema

from flexmeasures.data.schemas.reporting import ReporterConfigSchema


class AggregatorSchema(ReporterConfigSchema):
"""Schema for the reporter_config of the AggregatorReporter
Example:
.. code-block:: json
{
"beliefs_search_configs": [
{
"sensor": 1,
"source" : 1,
"alias" : "pv"
},
{
"sensor": 1,
"source" : 2,
"alias" : "consumption"
}
],
"method" : "sum",
"weights" : {
"pv" : 1.0,
"consumption" : -1.0
}
}
"""

method = fields.Str(required=False, dump_default="sum")
weights = fields.Dict(fields.Str(), fields.Float(), required=False)

@validates_schema
def validate_source(self, data, **kwargs):

for beliefs_search_config in data["beliefs_search_configs"]:
if "source" not in beliefs_search_config:
raise ValidationError("`source` is a required field.")

@validates_schema
def validate_weights(self, data, **kwargs):
if "weights" not in data:
return

# get aliases
aliases = []
for beliefs_search_config in data["beliefs_search_configs"]:
if "alias" in beliefs_search_config:
aliases.append(beliefs_search_config.get("alias"))

# check that the aliases in weights are defined
for alias in data.get("weights").keys():
if alias not in aliases:
raise ValidationError(
f"alias `{alias}` in `weights` is not defined in `beliefs_search_config`"
)

0 comments on commit 583fc87

Please sign in to comment.