Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish PandasReporter schemas #788

4 changes: 2 additions & 2 deletions flexmeasures/cli/tests/conftest.py
Expand Up @@ -98,7 +98,8 @@ def reporter_config(app, db, setup_dummy_data):
sensor1, sensor2, report_sensor = setup_dummy_data

reporter_config = dict(
input_variables=["sensor_1", "sensor_2"],
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
required_output=[{"name": "df_agg"}],
transformations=[
dict(
df_input="sensor_1",
Expand All @@ -108,7 +109,6 @@ def reporter_config(app, db, setup_dummy_data):
),
dict(method="resample_events", args=["2h"]),
],
final_df_output="df_agg",
)

return reporter_config
Expand Down
7 changes: 4 additions & 3 deletions flexmeasures/cli/tests/test_data_add.py
Expand Up @@ -131,9 +131,10 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config):
}

parameters = dict(
input_variables=dict(
sensor_1=dict(sensor=sensor1.id), sensor_2=dict(sensor=sensor2.id)
),
input=[
dict(name="sensor_1", sensor=sensor1.id),
dict(name="sensor_2", sensor=sensor2.id),
],
sensor=report_sensor_id,
)

Expand Down
69 changes: 49 additions & 20 deletions flexmeasures/data/models/data_sources.py
@@ -1,7 +1,7 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, List, Dict
from sqlalchemy.ext.mutable import MutableDict

import timely_beliefs as tb
Expand All @@ -22,12 +22,20 @@ class DataGenerator:
_data_source: DataSource | None = None

_config: dict = None
_parameters: dict = None

_parameters_schema: Schema | None = None
_config_schema: Schema | None = None
_save_config: bool = True
_save_parameters: bool = False

def __init__(self, config: dict | None = None, save_config=True, **kwargs) -> None:
def __init__(
self,
config: dict | None = None,
save_config=True,
save_parameters=False,
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
) -> None:
"""Base class for the Schedulers, Reporters and Forecasters.

The configuration `config` stores static parameters, parameters that, if
Expand Down Expand Up @@ -69,19 +77,20 @@ def __init__(self, config: dict | None = None, save_config=True, **kwargs) -> No
"""

self._save_config = save_config
self._save_parameters = save_parameters

if config is None:
if config is None and len(kwargs) > 0:
self._config = kwargs
DataGenerator.validate_deserialized(self._config, self._config_schema)
elif self._config_schema:
elif config is not None:
self._config = self._config_schema.load(config)
else:
self._config = config
elif len(kwargs) == 0:
self._config = self._config_schema.load({})

def _compute(self, **kwargs):
def _compute(self, **kwargs) -> List[Dict[str, Any]]:
raise NotImplementedError()

def compute(self, parameters: dict | None = None, **kwargs):
def compute(self, parameters: dict | None = None, **kwargs) -> List[Dict[str, Any]]:
"""The configuration `parameters` stores dynamic parameters, parameters that, if
changed, DO NOT trigger the creation of a new DataSource. Static parameters, such as
the topology of an energy system, can go into `config`.
Expand All @@ -91,15 +100,18 @@ def compute(self, parameters: dict | None = None, **kwargs):

:param parameters: serialized `parameters` parameters, defaults to None
"""

if self._parameters is None:
self._parameters = {}

if parameters is None:
_parameters = kwargs
DataGenerator.validate_deserialized(_parameters, self._parameters_schema)
elif self._parameters_schema:
_parameters = self._parameters_schema.load(parameters)
else: # skip validation
_parameters = parameters
self._parameters.update(self._parameters_schema.dump(kwargs))
else:
self._parameters.update(parameters)

self._parameters = self._parameters_schema.load(self._parameters)

return self._compute(**_parameters)
return self._compute(**self._parameters)

@staticmethod
def validate_deserialized(values: dict, schema: Schema) -> bool:
Expand Down Expand Up @@ -133,18 +145,31 @@ def data_source(self) -> "DataSource":
if self._data_source is None:
data_source_info = self.get_data_source_info()

attributes = {}
attributes = {"data_generator": {}}

if self._save_config:
attributes = {
"data_generator": {"config": self._config_schema.dump(self._config)}
}
attributes["data_generator"]["config"] = self._config_schema.dump(
self._config
)

if self._save_parameters:
attributes["data_generator"]["parameters"] = self._clean_parameters(
self._parameters_schema.dump(self._parameters)
)

data_source_info["attributes"] = attributes

self._data_source = get_or_create_source(**data_source_info)

return self._data_source

def _clean_parameters(self, parameters: dict) -> dict:
"""Use this function to clean up the parameters dictionary from the
fields that are not to be persisted to the DB with the option save_parameters=True
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
"""

raise NotImplementedError()


class DataSource(db.Model, tb.BeliefSourceDBMixin):
"""Each data source is a data-providing entity."""
Expand Down Expand Up @@ -237,11 +262,15 @@ def data_generator(self):
# fetch DataGenerator details
data_generator_details = self.attributes.get("data_generator", {})
config = data_generator_details.get("config", {})
parameters = data_generator_details.get("parameters", {})

# create DataGenerator class and assign the current DataSource (self) as its source
# create DataGenerator class and add the parameters
data_generator = current_app.data_generators[self.type][self.model](
config=config
)
data_generator._parameters = parameters

# assign the current DataSource (self) as its source
data_generator._data_source = self

self._data_generator = data_generator
Expand Down
55 changes: 30 additions & 25 deletions flexmeasures/data/models/reporting/__init__.py
@@ -1,15 +1,15 @@
from __future__ import annotations

from flexmeasures.data.models.time_series import Sensor
from copy import deepcopy

from typing import List, Dict, Any
from flexmeasures.data.models.data_sources import DataGenerator

from flexmeasures.data.schemas.reporting import (
ReporterParametersSchema,
ReporterConfigSchema,
)

import timely_beliefs as tb


class Reporter(DataGenerator):
"""Superclass for all FlexMeasures Reporters."""
Expand All @@ -18,45 +18,50 @@ class Reporter(DataGenerator):
__author__ = None
__data_generator_base__ = "reporter"

sensor: Sensor = None

_parameters_schema = ReporterParametersSchema()
_config_schema = ReporterConfigSchema()

def _compute(self, **kwargs) -> tb.BeliefsDataFrame:
def _compute(self, **kwargs) -> List[Dict[str, Any]]:
"""This method triggers the creation of a new report.

The same object can generate multiple reports with different start, end, resolution
and belief_time values.
"""

self.sensor = kwargs["sensor"]

# Result
result: tb.BeliefsDataFrame = self._compute_report(**kwargs)
results: List[Dict[str, Any]] = self._compute_report(**kwargs)

# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert (
self.sensor.event_resolution == result.event_resolution
), f"The resolution of the results ({result.event_resolution}) should match that of the output sensor ({self.sensor.event_resolution}, ID {self.sensor.id})."
for result in results:
# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert (
result["sensor"].event_resolution == result["data"].event_resolution
), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})."

# Assign sensor to BeliefDataFrame
result.sensor = self.sensor
# Assign sensor to BeliefDataFrame
result["data"].sensor = result["sensor"]

if result.empty:
return result
if not result["data"].empty:
# update data source
result["data"].index = result["data"].index.set_levels(
[self.data_source] * len(result["data"]),
level="source",
verify_integrity=False,
)

# update data source
result.index = result.index.set_levels(
[self.data_source] * len(result), level="source", verify_integrity=False
)
return results

return result

def _compute_report(self, **kwargs) -> tb.BeliefsDataFrame:
def _compute_report(self, **kwargs) -> List[Dict[str, Any]]:
"""
Overwrite with the actual computation of your report.

:returns BeliefsDataFrame: report as a BeliefsDataFrame.
"""
raise NotImplementedError()

def _clean_parameters(self, parameters: dict) -> dict:
_parameters = deepcopy(parameters)
fields_to_remove = ["start", "end", "resolution"]

for field in fields_to_remove:
_parameters.pop(field, None)

return _parameters
34 changes: 23 additions & 11 deletions flexmeasures/data/models/reporting/aggregator.py
@@ -1,13 +1,15 @@
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Any, List, Dict

import timely_beliefs as tb
import pandas as pd

from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.schemas.reporting.aggregation import AggregatorConfigSchema
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.schemas.reporting.aggregation import (
AggregatorConfigSchema,
AggregatorParametersSchema,
)

from flexmeasures.utils.time_utils import server_now

Expand All @@ -19,18 +21,20 @@ class AggregatorReporter(Reporter):
__author__ = "Seita"

_config_schema = AggregatorConfigSchema()
_parameters_schema = AggregatorParametersSchema()

weights: dict
method: str

def _compute_report(
self,
sensor: Sensor,
start: datetime,
end: datetime,
input: List[Dict[str, Any]],
output: List[Dict[str, Any]],
resolution: timedelta | None = None,
belief_time: datetime | None = None,
) -> tb.BeliefsDataFrame:
) -> List[Dict[str, Any]]:
"""
This method merges all the BeliefDataFrames into a single one, dropping
all indexes but event_start, and applies an aggregation function over the
Expand All @@ -39,19 +43,20 @@ def _compute_report(

method: str = self._config.get("method")
weights: list = self._config.get("weights", {})
data: list = self._config.get("data")

dataframes = []

if belief_time is None:
belief_time = server_now()

for d in data:
# if alias is not in belief_search_config, using the Sensor id instead
column_name = d.get("alias", f"sensor_{d['sensor'].id}")
for input_description in input:
# if name is not in belief_search_config, using the Sensor id instead
column_name = input_description.get(
"name", f"sensor_{input_description['sensor'].id}"
)

df = (
d["sensor"]
input_description["sensor"]
.search_beliefs(
event_starts_after=start,
event_ends_before=end,
Expand Down Expand Up @@ -82,4 +87,11 @@ def _compute_report(
["belief_time", "source", "cumulative_probability"], append=True
)

return output_df
return [
{
"name": "aggregate",
"column": "event_value",
"sensor": output[0]["sensor"],
"data": output_df,
}
]