Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish PandasReporter schemas #788

Merged
4 changes: 2 additions & 2 deletions documentation/changelog.rst
Expand Up @@ -25,8 +25,8 @@ New features
* Added API endpoints `/sensors/<id>` for fetching a single sensor, `/sensors` (POST) for adding a sensor, `/sensors/<id>` (PATCH) for updating a sensor and `/sensors/<id>` (DELETE) for deleting a sensor. [see `PR #759 <https://www.github.com/FlexMeasures/flexmeasures/pull/759>`_] and [see `PR #767 <https://www.github.com/FlexMeasures/flexmeasures/pull/767>`_] and [see `PR #773 <https://www.github.com/FlexMeasures/flexmeasures/pull/773>`_] and [see `PR #784 <https://www.github.com/FlexMeasures/flexmeasures/pull/784>`_]
* The CLI now allows to set lists and dicts as asset & sensor attributes (formerly only single values) [see `PR #762 <https://www.github.com/FlexMeasures/flexmeasures/pull/762>`_]
* Add `ProcessScheduler` class to optimize the starting time of processes one of the policies developed (INFLEXIBLE, SHIFTABLE and BREAKABLE), accessible via the CLI command `flexmeasures add schedule for-process` [see `PR #729 <https://www.github.com/FlexMeasures/flexmeasures/pull/729>`_ and `PR #768 <https://www.github.com/FlexMeasures/flexmeasures/pull/768>`_]
* Users will be able to see (e.g. in the UI) exactly which reporter created the report (saved as sensor data), and hosts will be able to identify exactly which configuration was used to create a given report [see `PR #751 <https://www.github.com/FlexMeasures/flexmeasures/pull/751>`_]
* The CLI `flexmeasures add report` now allows passing `config` and `parameters` in YAML format as files or editable via the system's default editor [see `PR #752 <https://www.github.com/FlexMeasures/flexmeasures/pull/752>`_]
* Users will be able to see (e.g. in the UI) exactly which reporter created the report (saved as sensor data), and hosts will be able to identify exactly which configuration was used to create a given report [see `PR #751 <https://www.github.com/FlexMeasures/flexmeasures/pull/751>`_, `PR #751 <https://www.github.com/FlexMeasures/flexmeasures/pull/751>`_ and `PR #788 <https://www.github.com/FlexMeasures/flexmeasures/pull/788>`_]
* The CLI `flexmeasures add report` now allows passing `config` and `parameters` in YAML format as files or editable via the system's default editor [see `PR #788 <https://www.github.com/FlexMeasures/flexmeasures/pull/788>`_]
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

Bugfixes
-----------
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/cli/tests/conftest.py
Expand Up @@ -98,7 +98,8 @@ def reporter_config(app, db, setup_dummy_data):
sensor1, sensor2, report_sensor = setup_dummy_data

reporter_config = dict(
input_variables=["sensor_1", "sensor_2"],
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
required_output=[{"name": "df_agg"}],
transformations=[
dict(
df_input="sensor_1",
Expand All @@ -108,7 +109,6 @@ def reporter_config(app, db, setup_dummy_data):
),
dict(method="resample_events", args=["2h"]),
],
final_df_output="df_agg",
)

return reporter_config
Expand Down
7 changes: 4 additions & 3 deletions flexmeasures/cli/tests/test_data_add.py
Expand Up @@ -131,9 +131,10 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config):
}

parameters = dict(
input_variables=dict(
sensor_1=dict(sensor=sensor1.id), sensor_2=dict(sensor=sensor2.id)
),
input=[
dict(name="sensor_1", sensor=sensor1.id),
dict(name="sensor_2", sensor=sensor2.id),
],
sensor=report_sensor_id,
)

Expand Down
91 changes: 70 additions & 21 deletions flexmeasures/data/models/data_sources.py
@@ -1,7 +1,7 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, List, Dict
from sqlalchemy.ext.mutable import MutableDict

import timely_beliefs as tb
Expand All @@ -22,18 +22,28 @@ class DataGenerator:
_data_source: DataSource | None = None

_config: dict = None
_parameters: dict = None

_parameters_schema: Schema | None = None
_config_schema: Schema | None = None
_save_config: bool = True
_save_parameters: bool = False

def __init__(self, config: dict | None = None, save_config=True, **kwargs) -> None:
def __init__(
self,
config: dict | None = None,
save_config=True,
save_parameters=False,
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
) -> None:
"""Base class for the Schedulers, Reporters and Forecasters.

The configuration `config` stores static parameters, parameters that, if
changed, trigger the creation of a new DataSource. Dynamic parameters, such as
the start date, can go into the `parameters`. See docstring of the method `DataGenerator.compute` for
more details.
more details. Nevertheless, the parameter `save_parameters` can be set to True if some `parameters` need
to be saved to the DB. In that case, the method `_clean_parameters` is called to remove any field that is not
to be persisted, e.g. time parameters which are already contained in the TimedBelief.

Create a new DataGenerator with a certain configuration. There are two alternatives
to define the parameters:
Expand Down Expand Up @@ -66,22 +76,24 @@ def __init__(self, config: dict | None = None, save_config=True, **kwargs) -> No

:param config: serialized `config` parameters, defaults to None
:param save_config: whether to save the config into the data source attributes
:param save_parameters: whether to save the parameters into the data source attributes
"""

self._save_config = save_config
self._save_parameters = save_parameters

if config is None:
if config is None and len(kwargs) > 0:
self._config = kwargs
DataGenerator.validate_deserialized(self._config, self._config_schema)
elif self._config_schema:
elif config is not None:
self._config = self._config_schema.load(config)
else:
self._config = config
elif len(kwargs) == 0:
self._config = self._config_schema.load({})

def _compute(self, **kwargs):
def _compute(self, **kwargs) -> List[Dict[str, Any]]:
raise NotImplementedError()

def compute(self, parameters: dict | None = None, **kwargs):
def compute(self, parameters: dict | None = None, **kwargs) -> List[Dict[str, Any]]:
"""The configuration `parameters` stores dynamic parameters, parameters that, if
changed, DO NOT trigger the creation of a new DataSource. Static parameters, such as
the topology of an energy system, can go into `config`.
Expand All @@ -91,15 +103,18 @@ def compute(self, parameters: dict | None = None, **kwargs):

:param parameters: serialized `parameters` parameters, defaults to None
"""

if self._parameters is None:
self._parameters = {}

if parameters is None:
_parameters = kwargs
DataGenerator.validate_deserialized(_parameters, self._parameters_schema)
elif self._parameters_schema:
_parameters = self._parameters_schema.load(parameters)
else: # skip validation
_parameters = parameters
self._parameters.update(self._parameters_schema.dump(kwargs))
else:
self._parameters.update(parameters)

self._parameters = self._parameters_schema.load(self._parameters)

return self._compute(**_parameters)
return self._compute(**self._parameters)

@staticmethod
def validate_deserialized(values: dict, schema: Schema) -> bool:
Expand Down Expand Up @@ -133,18 +148,48 @@ def data_source(self) -> "DataSource":
if self._data_source is None:
data_source_info = self.get_data_source_info()

attributes = {}
attributes = {"data_generator": {}}

if self._save_config:
attributes = {
"data_generator": {"config": self._config_schema.dump(self._config)}
}
attributes["data_generator"]["config"] = self._config_schema.dump(
self._config
)

if self._save_parameters:
attributes["data_generator"]["parameters"] = self._clean_parameters(
self._parameters_schema.dump(self._parameters)
)

data_source_info["attributes"] = attributes

self._data_source = get_or_create_source(**data_source_info)

return self._data_source

def _clean_parameters(self, parameters: dict) -> dict:
"""Use this function to clean up the parameters dictionary from the
fields that are not to be persisted to the DB as data source attributes (when save_parameters=True),
e.g. because they are already stored as TimedBelief properties, or otherwise.

Example:

An DataGenerator has the following parameters: ["start", "end", "field1", "field2"] and we want just "field1" and "field2"
to be persisted.

Parameters provided to the `compute` method (input of the method `_clean_parameters`):
parameters = {
"start" : "2023-01-01T00:00:00+02:00",
"end" : "2023-01-02T00:00:00+02:00",
"field1" : 1,
"field2" : 2
}

Parameters persisted to the DB (output of the method `_clean_parameters`):
parameters = {"field1" : 1,"field2" : 2}
"""

raise NotImplementedError()


class DataSource(db.Model, tb.BeliefSourceDBMixin):
"""Each data source is a data-providing entity."""
Expand Down Expand Up @@ -237,11 +282,15 @@ def data_generator(self):
# fetch DataGenerator details
data_generator_details = self.attributes.get("data_generator", {})
config = data_generator_details.get("config", {})
parameters = data_generator_details.get("parameters", {})

# create DataGenerator class and assign the current DataSource (self) as its source
# create DataGenerator class and add the parameters
data_generator = current_app.data_generators[self.type][self.model](
config=config
)
data_generator._parameters = parameters

# assign the current DataSource (self) as its source
data_generator._data_source = self

self._data_generator = data_generator
Expand Down
66 changes: 41 additions & 25 deletions flexmeasures/data/models/reporting/__init__.py
@@ -1,15 +1,15 @@
from __future__ import annotations

from flexmeasures.data.models.time_series import Sensor
from copy import deepcopy

from typing import List, Dict, Any
from flexmeasures.data.models.data_sources import DataGenerator

from flexmeasures.data.schemas.reporting import (
ReporterParametersSchema,
ReporterConfigSchema,
)

import timely_beliefs as tb


class Reporter(DataGenerator):
"""Superclass for all FlexMeasures Reporters."""
Expand All @@ -18,45 +18,61 @@ class Reporter(DataGenerator):
__author__ = None
__data_generator_base__ = "reporter"

sensor: Sensor = None

_parameters_schema = ReporterParametersSchema()
_config_schema = ReporterConfigSchema()

def _compute(self, **kwargs) -> tb.BeliefsDataFrame:
def _compute(self, **kwargs) -> List[Dict[str, Any]]:
"""This method triggers the creation of a new report.

The same object can generate multiple reports with different start, end, resolution
and belief_time values.
"""

self.sensor = kwargs["sensor"]

# Result
result: tb.BeliefsDataFrame = self._compute_report(**kwargs)

# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert (
self.sensor.event_resolution == result.event_resolution
), f"The resolution of the results ({result.event_resolution}) should match that of the output sensor ({self.sensor.event_resolution}, ID {self.sensor.id})."
results: List[Dict[str, Any]] = self._compute_report(**kwargs)

# Assign sensor to BeliefDataFrame
result.sensor = self.sensor
for result in results:
# checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor
assert (
result["sensor"].event_resolution == result["data"].event_resolution
), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})."

if result.empty:
return result
# Assign sensor to BeliefDataFrame
result["data"].sensor = result["sensor"]

# update data source
result.index = result.index.set_levels(
[self.data_source] * len(result), level="source", verify_integrity=False
)
if not result["data"].empty:
# update data source
result["data"].index = result["data"].index.set_levels(
[self.data_source] * len(result["data"]),
level="source",
verify_integrity=False,
)

return result
return results

def _compute_report(self, **kwargs) -> tb.BeliefsDataFrame:
def _compute_report(self, **kwargs) -> List[Dict[str, Any]]:
"""
Overwrite with the actual computation of your report.

:returns BeliefsDataFrame: report as a BeliefsDataFrame.
"""
raise NotImplementedError()

def _clean_parameters(self, parameters: dict) -> dict:
_parameters = deepcopy(parameters)
fields_to_remove = ["start", "end", "resolution", "belief_time"]

for field in fields_to_remove:
_parameters.pop(field, None)

fields_to_remove_input = [
"event_starts_after",
"event_ends_before",
"belief_time",
"resolution",
]

for _input in _parameters["input"]:
for field in fields_to_remove_input:
_input.pop(field, None)

return _parameters
34 changes: 23 additions & 11 deletions flexmeasures/data/models/reporting/aggregator.py
@@ -1,13 +1,15 @@
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Any, List, Dict

import timely_beliefs as tb
import pandas as pd

from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.schemas.reporting.aggregation import AggregatorConfigSchema
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.schemas.reporting.aggregation import (
AggregatorConfigSchema,
AggregatorParametersSchema,
)

from flexmeasures.utils.time_utils import server_now

Expand All @@ -19,18 +21,20 @@ class AggregatorReporter(Reporter):
__author__ = "Seita"

_config_schema = AggregatorConfigSchema()
_parameters_schema = AggregatorParametersSchema()

weights: dict
method: str

def _compute_report(
self,
sensor: Sensor,
start: datetime,
end: datetime,
input: List[Dict[str, Any]],
output: List[Dict[str, Any]],
resolution: timedelta | None = None,
belief_time: datetime | None = None,
) -> tb.BeliefsDataFrame:
) -> List[Dict[str, Any]]:
"""
This method merges all the BeliefDataFrames into a single one, dropping
all indexes but event_start, and applies an aggregation function over the
Expand All @@ -39,19 +43,20 @@ def _compute_report(

method: str = self._config.get("method")
weights: list = self._config.get("weights", {})
data: list = self._config.get("data")

dataframes = []

if belief_time is None:
belief_time = server_now()

for d in data:
# if alias is not in belief_search_config, using the Sensor id instead
column_name = d.get("alias", f"sensor_{d['sensor'].id}")
for input_description in input:
# if name is not in belief_search_config, using the Sensor id instead
column_name = input_description.get(
"name", f"sensor_{input_description['sensor'].id}"
)

df = (
d["sensor"]
input_description["sensor"]
.search_beliefs(
event_starts_after=start,
event_ends_before=end,
Expand Down Expand Up @@ -82,4 +87,11 @@ def _compute_report(
["belief_time", "source", "cumulative_probability"], append=True
)

return output_df
return [
{
"name": "aggregate",
"column": "event_value",
"sensor": output[0]["sensor"],
"data": output_df,
}
]