Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish PandasReporter schemas #788

10 changes: 6 additions & 4 deletions flexmeasures/data/models/reporting/aggregator.py
Expand Up @@ -39,19 +39,21 @@ def _compute_report(

method: str = self._config.get("method")
weights: list = self._config.get("weights", {})
data: list = self._config.get("data")
input: list = self._config.get("input")

dataframes = []

if belief_time is None:
belief_time = server_now()

for d in data:
for input_description in input:
# if alias is not in belief_search_config, using the Sensor id instead
column_name = d.get("alias", f"sensor_{d['sensor'].id}")
column_name = input_description.get(
"alias", f"sensor_{input_description['sensor'].id}"
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
)

df = (
d["sensor"]
input_description["sensor"]
.search_beliefs(
event_starts_after=start,
event_ends_before=end,
Expand Down
138 changes: 92 additions & 46 deletions flexmeasures/data/models/reporting/pandas_reporter.py
Expand Up @@ -2,7 +2,7 @@

from typing import Any, Union, Dict
from datetime import datetime, timedelta
from copy import deepcopy
from copy import deepcopy, copy

from flask import current_app
import timely_beliefs as tb
Expand All @@ -25,7 +25,7 @@ class PandasReporter(Reporter):
_config_schema = PandasReporterConfigSchema()
_parameters_schema = PandasReporterParametersSchema()

input_variables: list[str] = None
input: list[str] = None
transformations: list[dict[str, Any]] = None
final_df_output: str = None

Expand All @@ -35,7 +35,7 @@ def fetch_data(
self,
start: datetime,
end: datetime,
input_variables: dict,
input: dict,
resolution: timedelta | None = None,
belief_time: datetime | None = None,
):
Expand All @@ -44,31 +44,35 @@ def fetch_data(
"""

self.data = {}
for alias, tb_query in input_variables.items():
_tb_query = tb_query.copy()
for input_search_parameters in input:
_input_search_parameters = input_search_parameters.copy()

# using start / end instead of event_starts_after/event_ends_before when not defined
event_starts_after = _tb_query.pop("event_starts_after", start)
event_ends_before = _tb_query.pop("event_ends_before", end)
resolution = _tb_query.pop("resolution", resolution)
belief_time = _tb_query.pop("belief_time", belief_time)
sensor: Sensor = _input_search_parameters.pop("sensor", None)

name = _input_search_parameters.pop("name", f"sensor_{sensor.id}")

sensor: Sensor = _tb_query.pop("sensor", None)
# using start / end instead of event_starts_after/event_ends_before when not defined
event_starts_after = _input_search_parameters.pop(
"event_starts_after", start
)
event_ends_before = _input_search_parameters.pop("event_ends_before", end)
resolution = _input_search_parameters.pop("resolution", resolution)
belief_time = _input_search_parameters.pop("belief_time", belief_time)

bdf = sensor.search_beliefs(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
resolution=resolution,
beliefs_before=belief_time,
**_tb_query,
**_input_search_parameters,
)

# store data source as local variable
for source in bdf.sources.unique():
self.data[f"source_{source.id}"] = source

# store BeliefsDataFrame as local variable
self.data[alias] = bdf
self.data[name] = bdf

def _compute_report(self, **kwargs) -> tb.BeliefsDataFrame:
"""
Expand All @@ -79,7 +83,8 @@ def _compute_report(self, **kwargs) -> tb.BeliefsDataFrame:
# report configuration
start: datetime = kwargs.get("start")
end: datetime = kwargs.get("end")
input_variables: dict = kwargs.get("input_variables")
input: dict = kwargs.get("input")
output_sensor: Sensor | None = kwargs.get("sensor")

resolution: timedelta | None = kwargs.get("resolution", None)
belief_time: datetime | None = kwargs.get("belief_time", None)
Expand All @@ -88,48 +93,89 @@ def _compute_report(self, **kwargs) -> tb.BeliefsDataFrame:
resolution = self.sensor.event_resolution

# fetch sensor data
self.fetch_data(start, end, input_variables, resolution, belief_time)
self.fetch_data(start, end, input, resolution, belief_time)

if belief_time is None:
belief_time = server_now()

# apply pandas transformations to the dataframes in `self.data`
self._apply_transformations()

final_output = self.data[self._config.get("final_df_output")]

if isinstance(final_output, tb.BeliefsDataFrame):

# filing the missing indexes with default values:
# belief_time=belief_time, cummulative_probability=0.5, source=data_source
if "belief_time" not in final_output.index.names:
final_output["belief_time"] = [belief_time] * len(final_output)
final_output = final_output.set_index("belief_time", append=True)

if "cumulative_probability" not in final_output.index.names:
final_output["cumulative_probability"] = [0.5] * len(final_output)
final_output = final_output.set_index(
"cumulative_probability", append=True
)

if "source" not in final_output.index.names:
final_output["source"] = [self.data_source] * len(final_output)
final_output = final_output.set_index("source", append=True)

final_output = final_output.reorder_levels(
tb.BeliefsDataFrame().index.names
)
output = kwargs.get("output", [])

elif isinstance(final_output, tb.BeliefsSeries):
final_output = final_output.to_frame("event_value")
final_output["belief_time"] = belief_time
final_output["cumulative_probability"] = 0.5
final_output["source"] = self.data_source
final_output = final_output.set_index(
["belief_time", "source", "cumulative_probability"], append=True
if len(output) == 0 and output_sensor is None:
raise ValueError(
"No output sensor defined. At least define an output sensor in the `sensor` or `output` fields in `parameters`."
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
)

return final_output
if len(output) == 0:
output = [
{
"name": self._config["required_output"][0]["name"],
"sensor": output_sensor,
}
]

results = []

for output_description in output:
result = copy(output_description)

# TODO: use sensor to store multiple outputs
# sensor = output_description["sensor"]
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
name = output_description["name"]

output_data = self.data[name]

if isinstance(output_data, tb.BeliefsDataFrame):
# if column is missing, use the first column
column = output_description.get("column", output_data.columns[0])
output_data = output_data.rename(columns={column: "event_value"})[
["event_value"]
]
output_data = self._clean_belief_dataframe(output_data, belief_time)

elif isinstance(output_data, tb.BeliefsSeries):
output_data = self._clean_belief_series(output_data, belief_time)

result["data"] = output_data

results.append(result)

return results[0].get("data")

def _clean_belief_series(
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
self, belief_series: tb.BeliefsSeries, belief_time: datetime
) -> tb.BeliefsDataFrame:
belief_series = belief_series.to_frame("event_value")
belief_series["belief_time"] = belief_time
belief_series["cumulative_probability"] = 0.5
belief_series["source"] = self.data_source
belief_series = belief_series.set_index(
["belief_time", "source", "cumulative_probability"], append=True
)

return belief_series

def _clean_belief_dataframe(
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
self, bdf: tb.BeliefsDataFrame, belief_time: datetime
) -> tb.BeliefsDataFrame:
# filing the missing indexes with default values:
# belief_time=belief_time, cummulative_probability=0.5, source=data_source
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
if "belief_time" not in bdf.index.names:
bdf["belief_time"] = [belief_time] * len(bdf)
bdf = bdf.set_index("belief_time", append=True)

if "cumulative_probability" not in bdf.index.names:
bdf["cumulative_probability"] = [0.5] * len(bdf)
bdf = bdf.set_index("cumulative_probability", append=True)

if "source" not in bdf.index.names:
bdf["source"] = [self.data_source] * len(bdf)
bdf = bdf.set_index("source", append=True)

bdf = bdf.reorder_levels(tb.BeliefsDataFrame().index.names)
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
return bdf

def get_object_or_literal(self, value: Any, method: str) -> Any:
"""This method allows using the dataframes as inputs of the Pandas methods that
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/data/models/reporting/tests/test_aggregator.py
Expand Up @@ -39,7 +39,7 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

reporter_config = dict(
data=[
input=[
dict(sensor=s1.id, source=1),
dict(sensor=s2.id, source=2),
],
Expand All @@ -65,7 +65,7 @@ def test_dst_transition(setup_dummy_data):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

reporter_config = dict(
data=[
input=[
dict(sensor=s3.id, source=1),
],
)
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_resampling(setup_dummy_data):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

reporter_config = dict(
data=[
input=[
dict(sensor=s3.id, source=1),
],
)
Expand Down
32 changes: 15 additions & 17 deletions flexmeasures/data/models/reporting/tests/test_pandas_reporter.py
Expand Up @@ -9,7 +9,8 @@ def test_reporter(app, setup_dummy_data):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

reporter_config = dict(
input_variables=["sensor_1", "sensor_2"],
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
required_output=[{"name": "df_merge"}],
transformations=[
dict(
df_input="sensor_1",
Expand All @@ -36,18 +37,15 @@ def test_reporter(app, setup_dummy_data):
dict(method="mean"),
dict(method="sum", kwargs=dict(axis=1)),
],
final_df_output="df_merge",
)

reporter = PandasReporter(config=reporter_config)

start = datetime(2023, 4, 10, tzinfo=utc)
end = datetime(2023, 4, 10, 10, tzinfo=utc)
input_variables = dict(sensor_1=dict(sensor=s1), sensor_2=dict(sensor=s2))
input = [dict(name="sensor_1", sensor=s1), dict(name="sensor_2", sensor=s2)]

report1 = reporter.compute(
sensor=report_sensor, start=start, end=end, input_variables=input_variables
)
report1 = reporter.compute(sensor=report_sensor, start=start, end=end, input=input)

assert len(report1) == 5
assert str(report1.event_starts[0]) == "2023-04-10 00:00:00+00:00"
Expand All @@ -68,7 +66,7 @@ def test_reporter(app, setup_dummy_data):
sensor=report_sensor,
start=datetime(2023, 4, 10, 3, tzinfo=utc),
end=end,
input_variables=input_variables,
input=input,
)
assert len(report2) == 4
assert str(report2.event_starts[0]) == "2023-04-10 02:00:00+00:00"
Expand All @@ -80,7 +78,8 @@ def test_reporter_repeated(setup_dummy_data):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

reporter_config = dict(
input_variables=["sensor_1", "sensor_2"],
required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}],
required_output=[{"name": "df_merge"}],
transformations=[
dict(
df_input="sensor_1",
Expand All @@ -107,17 +106,16 @@ def test_reporter_repeated(setup_dummy_data):
dict(method="mean"),
dict(method="sum", kwargs=dict(axis=1)),
],
final_df_output="df_merge",
)

parameters = dict(
sensor=report_sensor.id,
start="2023-04-10T00:00:00 00:00",
end="2023-04-10T10:00:00 00:00",
input_variables=dict(
sensor_1=dict(sensor=s1.id),
sensor_2=dict(sensor=s2.id),
),
input=[
dict(name="sensor_1", sensor=s1.id),
dict(name="sensor_2", sensor=s2.id),
],
)

reporter = PandasReporter(config=reporter_config)
Expand All @@ -133,9 +131,9 @@ def test_reporter_empty(setup_dummy_data):
s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data

config = dict(
input_variables=["sensor_1"],
required_input=[{"name": "sensor_1"}],
required_output=[{"name": "sensor_1"}],
transformations=[],
final_df_output="sensor_1",
)

reporter = PandasReporter(config=config)
Expand All @@ -145,7 +143,7 @@ def test_reporter_empty(setup_dummy_data):
sensor=report_sensor,
start=datetime(2023, 4, 10, tzinfo=utc),
end=datetime(2023, 4, 10, 10, tzinfo=utc),
input_variables=dict(sensor_1=dict(sensor=s1)),
input=[dict(name="sensor_1", sensor=s1)],
)

assert not report.empty
Expand All @@ -155,7 +153,7 @@ def test_reporter_empty(setup_dummy_data):
sensor=report_sensor,
start=datetime(2021, 4, 10, tzinfo=utc),
end=datetime(2021, 4, 10, 10, tzinfo=utc),
input_variables=dict(sensor_1=dict(sensor=s1)),
input=[dict(name="sensor_1", sensor=s1)],
)

assert report.empty