Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AggregatorReporter #712

Merged
merged 26 commits into from Jun 11, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6bb6933
fix: get fresh sensor instance to avoid sqlalchemy.exc.InvalidRequest…
victorgarcia98 Jun 1, 2023
45d4887
fx: handle the case of not finding the sensor in the datbase
victorgarcia98 Jun 1, 2023
fbbd0c2
feat: add AggregatorReporter
victorgarcia98 Jun 6, 2023
28a8b55
feat: add fixture
victorgarcia98 Jun 6, 2023
0bc9450
fix: typo in file name
victorgarcia98 Jun 9, 2023
6a2b378
Set author in PandasReporter and AggregatorReporter
victorgarcia98 Jun 9, 2023
3447b3b
style: remove unnecesay class property
victorgarcia98 Jun 9, 2023
7644cd6
test: add description of test
victorgarcia98 Jun 9, 2023
03a91d7
fix: vectorized bdf creation
victorgarcia98 Jun 9, 2023
cfe2e49
style: improve Exception message
victorgarcia98 Jun 9, 2023
ad4684e
fix: lowercase enum value
victorgarcia98 Jun 9, 2023
a8cadb1
feat: make method and weights optional
victorgarcia98 Jun 9, 2023
20ac071
feat: set SUM as the feault aggregation method
victorgarcia98 Jun 9, 2023
b12054d
typo
Flix6x Jun 9, 2023
7598b3a
refactor: simplify column value assignments
Flix6x Jun 9, 2023
13e00d7
typo
Flix6x Jun 9, 2023
fad8b33
fix: use aggregate function instead of getattr
victorgarcia98 Jun 9, 2023
fadae53
feat: allow users to pass any string
victorgarcia98 Jun 9, 2023
3983b2b
test: add more test cases
victorgarcia98 Jun 9, 2023
f9a876a
Merge branch 'main' into feature/aggregation-reporter
victorgarcia98 Jun 9, 2023
d3d55dc
feat: weights and method as class attribute
victorgarcia98 Jun 9, 2023
7780ecf
simplify float to int
Flix6x Jun 10, 2023
55ed175
Merge remote-tracking branch 'origin/main' into feature/aggregation-r…
Flix6x Jun 10, 2023
03cec94
changelog entry
Flix6x Jun 10, 2023
3375335
black
Flix6x Jun 10, 2023
33aca2e
Merge branch 'main' into feature/aggregation-reporter
victorgarcia98 Jun 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 77 additions & 0 deletions flexmeasures/data/models/reporting/aggregator.py
@@ -0,0 +1,77 @@
from __future__ import annotations

from datetime import datetime, timedelta

import timely_beliefs as tb
import pandas as pd

from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.schemas.reporting.aggregation import AggregatorSchema

from flexmeasures.utils.time_utils import server_now


class AggregatorReporter(Reporter):
"""This reporter applies an aggregation function to multiple sensors"""

__version__ = "1"
__author__ = "Seita"
schema = AggregatorSchema()
weights: dict
method: str

def deserialize_config(self):
# call Reporter deserialize_config
super().deserialize_config()

# extract AggregatorReporter specific fields
self.method = self.reporter_config.get("method")
self.weights = self.reporter_config.get("weights", dict())
Flix6x marked this conversation as resolved.
Show resolved Hide resolved

def _compute(
self,
start: datetime,
end: datetime,
input_resolution: timedelta | None = None,
belief_time: datetime | None = None,
) -> tb.BeliefsDataFrame:
"""
This method merges all the BeliefDataFrames into a single one, dropping
all indexes but event_start, and applies an aggregation function over the
columns.
"""

dataframes = []

if belief_time is None:
belief_time = server_now()

for belief_search_config in self.beliefs_search_configs:
# if alias is not in belief_search_config, using the Sensor id instead
column_name = belief_search_config.get(
"alias", f"sensor_{belief_search_config['sensor'].id}"
)
data = self.data[column_name].droplevel([1, 2, 3])

# apply weight
if column_name in self.weights:
data *= self.weights[column_name]

dataframes.append(data)

output_df = pd.concat(dataframes, axis=1)

# apply aggregation method
output_df = output_df.aggregate(self.method, axis=1)

# convert BeliefsSeries into a BeliefsDataFrame
output_df = output_df.to_frame("event_value")
output_df["belief_time"] = belief_time
output_df["cumulative_probability"] = 0.5
output_df["source"] = self.data_source

output_df = output_df.set_index(
["belief_time", "source", "cumulative_probability"], append=True
)

return output_df
2 changes: 1 addition & 1 deletion flexmeasures/data/models/reporting/pandas_reporter.py
Expand Up @@ -18,7 +18,7 @@ class PandasReporter(Reporter):
"""This reporter applies a series of pandas methods on"""

__version__ = "1"
__author__ = None
__author__ = "Seita"
schema = PandasReporterConfigSchema()
transformations: list[dict[str, Any]] = None
final_df_output: str = None
Expand Down
17 changes: 17 additions & 0 deletions flexmeasures/data/models/reporting/tests/conftest.py
Expand Up @@ -63,6 +63,23 @@ def setup_dummy_data(db, app):
)
)

# add simple data consisting of 24 hourly events with value 1.0 for
# sensor1 and -1.0 for sensor2
# to be used to test the AggregatorReporter
for sensor, source, value in zip(
[sensor1, sensor2], [source1, source2], [1.0, -1.0]
):
for t in range(24):
beliefs.append(
TimedBelief(
event_start=datetime(2023, 5, 10, tzinfo=utc) + timedelta(hours=t),
belief_horizon=timedelta(hours=24),
event_value=value,
sensor=sensor,
source=source,
)
)

db.session.add_all(beliefs)
db.session.commit()

Expand Down
60 changes: 60 additions & 0 deletions flexmeasures/data/models/reporting/tests/test_aggregator.py
@@ -0,0 +1,60 @@
import pytest

from flexmeasures.data.models.reporting.aggregator import AggregatorReporter

from datetime import datetime
from pytz import utc


@pytest.mark.parametrize(
"aggregation_method, expected_value",
[
("sum", 0),
("mean", 0),
("var", 2),
("std", 2**0.5),
("max", 1),
("min", -1),
("prod", -1),
("median", 0),
],
)
def test_aggregator(setup_dummy_data, aggregation_method, expected_value):
"""
This test computes the aggregation of two sensors containing 24 entries
with value 1.0 and -1, respectively, for sensors 1 and 2.

Test cases:
1) sum: 0 = 1 + (-1)
2) mean: 0 = ((1) + (-1))/2
3) var: 2 = (1)^2 + (-1)^2
4) std: sqrt(2) = sqrt((1)^2 + (-1)^2)
5) max: 1 = max(1, -1)
6) min: -1 = min(1, -1)
7) prod: -1 = (1) * (-1)
8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2
"""
s1, s2, reporter_sensor = setup_dummy_data

reporter_config_raw = dict(
beliefs_search_configs=[
dict(sensor=s1.id, source=1),
dict(sensor=s2.id, source=2),
],
method=aggregation_method,
)

agg_reporter = AggregatorReporter(
reporter_sensor, reporter_config_raw=reporter_config_raw
)

result = agg_reporter.compute(
start=datetime(2023, 5, 10, tzinfo=utc),
end=datetime(2023, 5, 11, tzinfo=utc),
)

# check that we got a result for 24 hours
assert len(result) == 24

# check that the value is equal to expected_value
assert (result == expected_value).all().event_value
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -584,6 +584,12 @@ def __init__(
source: tb.DBBeliefSource,
**kwargs,
):
# get a Sensor instance attached to the database session (input sensor is detached)
# check out Issue #683 for more details
_sensor = Sensor.query.get(sensor.id)
if _sensor:
sensor = _sensor

tb.TimedBeliefDBMixin.__init__(self, sensor, source, **kwargs)
tb_utils.remove_class_init_kwargs(tb.TimedBeliefDBMixin, kwargs)
db.Model.__init__(self, **kwargs)
Expand Down
58 changes: 58 additions & 0 deletions flexmeasures/data/schemas/reporting/aggregation.py
@@ -0,0 +1,58 @@
from marshmallow import fields, ValidationError, validates_schema

from flexmeasures.data.schemas.reporting import ReporterConfigSchema


class AggregatorSchema(ReporterConfigSchema):
"""Schema for the reporter_config of the AggregatorReporter

Example:
.. code-block:: json
{
"beliefs_search_configs": [
{
"sensor": 1,
"source" : 1,
"alias" : "pv"
},
{
"sensor": 1,
"source" : 2,
"alias" : "consumption"
}
],
"method" : "sum",
"weights" : {
"pv" : 1.0,
"consumption" : -1.0
}
}
"""

method = fields.Str(required=False, dump_default="sum")
weights = fields.Dict(fields.Str(), fields.Float(), required=False)

@validates_schema
def validate_source(self, data, **kwargs):

for beliefs_search_config in data["beliefs_search_configs"]:
if "source" not in beliefs_search_config:
raise ValidationError("`source` is a required field.")

@validates_schema
def validate_weights(self, data, **kwargs):
if "weights" not in data:
return

# get aliases
aliases = []
for beliefs_search_config in data["beliefs_search_configs"]:
if "alias" in beliefs_search_config:
aliases.append(beliefs_search_config.get("alias"))

# check that the aliases in weights are defined
for alias in data.get("weights").keys():
if alias not in aliases:
raise ValidationError(
f"alias `{alias}` in `weights` is not defined in `beliefs_search_config`"
)