From 32e300c37886b589d49ec663b44f7027bb8fb8ea Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 14 Apr 2023 10:50:57 +0200 Subject: [PATCH 01/36] Creating Reporter and PandasReporter classes with their corresponding report_config schemas. Signed-off-by: victor --- .vscode/settings.json | 22 ++ .vscode/spellright.dict | 249 ++++++++++++++++++ .../data/models/reporting/__init__.py | 168 ++++++++++++ .../data/models/reporting/pandas_reporter.py | 114 ++++++++ .../data/models/reporting/tests/__init__.py | 0 .../reporting/tests/test_pandas_reporter.py | 121 +++++++++ .../data/schemas/reporting/__init__.py | 51 ++++ .../data/schemas/reporting/pandas_reporter.py | 122 +++++++++ .../data/schemas/tests/test_reporting.py | 135 ++++++++++ 9 files changed, 982 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 .vscode/spellright.dict create mode 100644 flexmeasures/data/models/reporting/__init__.py create mode 100644 flexmeasures/data/models/reporting/pandas_reporter.py create mode 100644 flexmeasures/data/models/reporting/tests/__init__.py create mode 100644 flexmeasures/data/models/reporting/tests/test_pandas_reporter.py create mode 100644 flexmeasures/data/schemas/reporting/__init__.py create mode 100644 flexmeasures/data/schemas/reporting/pandas_reporter.py create mode 100644 flexmeasures/data/schemas/tests/test_reporting.py diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..9e744aa83 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,22 @@ +{ + "spellright.language": [ + "en_US" + ], + "spellright.documentTypes": [ + "markdown", + "latex", + "plaintext", + "restructuredtext", + "python" + ], + "python.linting.enabled": true, + "python.linting.pylintEnabled": false, + "python.linting.flake8Enabled": true, + "workbench.editor.wrapTabs": true, + "python.formatting.provider": "black", + "python.testing.pytestArgs": [ + "flexmeasures" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} diff --git a/.vscode/spellright.dict b/.vscode/spellright.dict new file mode 100644 index 000000000..fc03ab1a3 --- /dev/null +++ b/.vscode/spellright.dict @@ -0,0 +1,249 @@ +_cli +cli +db +Postgres +SQLALCHEMY +postgresql +env +ENV +urlsafe +cfg +Quickstart +MWh +todo +DSO +CO₂ +USEF +sha256_crypt +WAPE +int +ci +webargs +Cors +importlib +timetomodel +CDF +forecastiopy +utils +Dockerization +dotenv +sys +os +WSGI +hosters +Pyomo +Cbc +cbc +coinor +redis +Redis +flexdemo +MAPBOX +mapbox +timedelta +datetimes +RQ +cplex +glpk +FLEXMEASURES_PLUGIN_PATH +plugin +fmplugin +init +py +Jinja2 +bp +msg +appcontext +dirname +abspath +endblock +mw +epex +param +nnname +testtest +num_assets +pkey +fm +res +str +LatLngGrid +lat +lng +tl +br +dt +appid +OPENWEATHERMAP +tldextract +isodate +numpy +tzlocal +pytz +psycopg +xlrd +pscript +altair +'nt' +SSLify +lon +OWM +valuecol +datecol +cp +skiprows +nrows +TOU +CPO +tou +Jinja +csrf +teardown +setuptools +scm +ss +sshould +PTUS +PTU +bdf +GETting +PARAMS +uis +stringify +onreadystatechange +thead +tr +th +tbody +td +urlData +div +pv +src +txt +divs +OLS +configurator +requeue +PGPASSWORD +uri +pg +pgbackup +naturaldelta +func +multindex +color +colors +incl +javascript +datepicker +paa +wi +alt +SYSTEMROOT +LC +tzinfo +naturaltime +naturaldate +tz +sd +oclock +conftest +autouse +isoformat +deserialization +redislite +tmp +conn +SENTRY_DSN +linenr +xxxxxxxxxxxxxxx +DSN +dsn +pii +bobbydams +functools +etc +yhat +freqstr +loc +timerange +vnd.microsoft.icon +SENTRY_SDN +isnull +abcdefghijklmnopqrstuv +hex_md +pkg_name +_deserialize +__acl__ +stylesheet +rel +Wh +earthdistance +ACL +hr +MJ +ga_id +ga_name +ga_generic_asset_type_id +jde +s.obj +s_id_seq +seq +setval +xb +np +ts +mae +evse +Seita +datetime +descr +soc +SOC +roundtrip +noqa +flexmeasures +sthg +dict +te +timezone +iso +gunicorn +Plugins +CLEANUP +hardcoded +Prosumer +udi +config +quickref +sourcecode +json +reqheader +resheader +io +m³ +api +dtype +df +dir +uuid +Shiftable +rst +num +364bfd06-c1fa-430b-8d25-8f5a547651fb +inf +favor +Mixin +utc +deserializable +UNRECOGNIZED +len +cls +GH +deserialized +deserializing +kwargs +fsdomain +filepath +dataframes diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py new file mode 100644 index 000000000..f14e7f9fa --- /dev/null +++ b/flexmeasures/data/models/reporting/__init__.py @@ -0,0 +1,168 @@ +from typing import Optional, Union, Dict + +import pandas as pd +from flask import current_app + +from flexmeasures.data.schemas.reporting import ReporterConfigSchema +from flexmeasures.data.models.time_series import TimedBelief +from flexmeasures.data.queries.utils import simplify_index + +import timely_beliefs as tb + + +class Reporter: + """Superclass for all FlexMeasures Reporters.""" + + __version__ = None + __author__ = None + + reporter_config: Optional[dict] = None + reporter_config_raw: Optional[dict] = None + schema = ReporterConfigSchema + data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None + + def __init__(self, reporter_config_raw: Optional[dict] = None) -> None: + """ + Initialize a new Reporter. + + + """ + + if not reporter_config_raw: + reporter_config_raw = {} + + self.reporter_config_raw = reporter_config_raw + + def fetch_data(self): + """ + Fetches the time_beliefs from the database + """ + + self.data = {} + for tb_query in self.tb_query_config: + + # using start / end instead of event_starts_after/event_ends_before when not defined + event_starts_after = tb_query.pop("event_starts_after", self.start) + event_ends_before = tb_query.pop("event_ends_before", self.end) + + sensor = tb_query.pop("sensor", None) + + bdf = TimedBelief.search( + sensors=sensor, + event_starts_after=event_starts_after, + event_ends_before=event_ends_before, + **tb_query, + ) + + # adding sources + for source in bdf.sources.unique(): + self.data[f"source_{source.id}"] = source + + # saving bdf + self.data[f"sensor_{sensor.id}"] = bdf + + def compute(self, *args, **kwargs) -> Optional[pd.DataFrame]: + """This method triggers the creation of a new report. This method allows to update the fields + in reporter_config_raw passing them as keyword arguments or the whole `reporter_config_raw` by + passing it in the kwarg `reporter_config_raw`. + + Overall, this method follows these steps: + 1) Updating the reporter_config with the kwargs of the method compute. + 2) Triggers config deserialization. + 3) Fetches the data of the sensors described by the field `tb_query_config`. + 4) If the output is BeliefsDataFrame, it simplifies it into a DataFrame + + """ + # if report_config in kwargs + if "reporter_config_raw" in kwargs: + self.reporter_config_raw.update(kwargs.get("reporter_config_raw")) + else: # check for arguments in kwarg that could be potential fields of reporter config + for key, value in kwargs.items(): + if key in self.reporter_config_raw: + self.reporter_config_raw[key] = value + + # deserialize configuration + self.deserialize_config() + + # fetch data + self.fetch_data() + + # Result + result = self._compute() + + if isinstance(result, tb.BeliefsDataFrame): + result = simplify_index(result) + + return result + + def _compute(self) -> Optional[pd.DataFrame]: + """ + Overwrite with the actual computation of your report. + """ + raise NotImplementedError() + + @classmethod + def get_data_source_info(cls: type) -> dict: + """ + Create and return the data source info, from which a data source lookup/creation is possible. + See for instance get_data_source_for_job(). + """ + source_info = dict( + model=cls.__name__, version="1", name="Unknown author" + ) # default + + if hasattr(cls, "__version__"): + source_info["version"] = str(cls.__version__) + else: + current_app.logger.warning( + f"Scheduler {cls.__name__} loaded, but has no __version__ attribute." + ) + if hasattr(cls, "__author__"): + source_info["name"] = str(cls.__author__) + else: + current_app.logger.warning( + f"Scheduler {cls.__name__} has no __author__ attribute." + ) + return source_info + + def deserialize_config(self): + """ + Check all configurations we have, throwing either ValidationErrors or ValueErrors. + Other code can decide if/how to handle those. + """ + self.deserialize_report_config() + self.deserialize_timing_config() + + def deserialize_timing_config(self): + """ + Check if the timing of the report is valid. + + Raises ValueErrors. + """ + + for tb_query in self.tb_query_config: + start = tb_query.get("event_starts_after", self.start) + end = tb_query.get("event_ends_before ", self.end) + + if end < start: + raise ValueError(f"Start {start} cannot be after end {end}.") + + def deserialize_report_config(self): + """ + Validate the report config against a Marshmallow Schema. + Ideas: + - Override this method + - Call superclass method to apply validation and common variables deserialization (see PandasReporter) + - (Partially) extract the relevant reporter_config parameters into class attributes. + + Raises ValidationErrors or ValueErrors. + """ + + self.reporter_config = self.schema.load( + self.reporter_config_raw + ) # validate reporter config + self.tb_query_config = self.reporter_config.get( + "tb_query_config" + ) # extracting TimeBelief query configuration parameters + self.start = self.reporter_config.get("start") + self.end = self.reporter_config.get("end") diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py new file mode 100644 index 000000000..e5cf9c1a9 --- /dev/null +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -0,0 +1,114 @@ +import pandas as pd +from flexmeasures.data.models.reporting import Reporter +from flexmeasures.data.schemas.reporting.pandas_reporter import ( + PandasReporterConfigSchema, +) + + +class PandasReporter(Reporter): + """This reporter applies a series of pandas methods on""" + + __version__ = "1" + __author__ = None + schema = PandasReporterConfigSchema() + + def deserialize_report_config(self): + # call super class deserialize_report_config + super().deserialize_report_config() + + # extract PandasReporter specific fields + self.transformations = self.reporter_config.get("transformations") + self.final_df_output = self.reporter_config.get("final_df_output") + + def _compute(self) -> pd.Series: + """""" + # apply pandas transformations to the dataframes in `self.data`` + self.apply_transformations() + + final_output = self.data[self.final_df_output] + return final_output + + def get_object_or_literal(self, value, method): + """This method allows using the dataframes as inputs of the Pandas methods that + are run in the transformations. Make sure that they have been created before accessed. + + This works by putting the symbol `@` in front of the name of the dataframe that we want to reference. + For instance, to reference the dataframe test_df, which lives in self.data, we would do `@test_df`. + + This functionality is disabled for methods `eval`and `query` to avoid interfering their internal behaviour + given that they also use `@` to allow using local variables. + + Examples + >> self.get_object_or_literal(["@sensor_1", "@sensor_2"], "sum") + [[ ...n: 0:00:00, ...n: 0:00:00]] + """ + + if method in ["eval", "query"]: + return value + + if isinstance(value, str) and value.startswith("@"): + value = value.replace("@", "") + return self.data[value] + + if isinstance(value, list): + return [self.get_object_or_literal(v, method) for v in value] + + return value + + def process_pandas_args(self, args, method): + """This method applies the function get_object_or_literal to all the arguments + to detect where to replace a string "@" with the actual object stored in `self.data[""]`. + """ + for i in range(len(args)): + args[i] = self.get_object_or_literal(args[i], method) + return args + + def process_pandas_kwargs(self, kwargs, method): + """This method applies the function get_object_or_literal to all the keyword arguments + to detect where to replace a string "@" with the actual object stored in `self.data[""]`. + """ + for k, v in kwargs.items(): + kwargs[k] = self.get_object_or_literal(v, method) + return kwargs + + def apply_transformations(self) -> pd.Series: + """Convert the series using the given list of transformation specs, which is called in the order given. + + Each transformation specs should include a 'method' key specifying a method name of a Pandas DataFrame. + + Optionally, 'args' and 'kwargs' keys can be specified to pass on arguments or keyword arguments to the given method. + + All data exchange is made through the dictionary `self.data`. The superclass Reporter already fetches and saves BeliefDataFrames + of the input sensors in the fields `sensor_`. In case you need to perform complex operations on dataframes, you can + split the operations in several steps and saving the intermediate results using the parameters `df_input` and `df_output` for the + input and output dataframes, respectively. + + Example: + + The example below converts from hourly meter readings in kWh to electricity demand in kW. + transformations = [ + {"mehod": "diff"}, + {"method": "shift", "kwargs": {"periods": -1}}, + {"method": "head", "args": [-1]}, + ], + """ + + previous_df = None + + for transformation in self.transformations: + df_input = transformation.get( + "df_input", previous_df + ) # default is using the previous transformation output + df_output = transformation.get( + "df_output", df_input + ) # default is OUTPUT = INPUT.method() + + method = transformation.get("method") + args = self.process_pandas_args(transformation.get("args", []), method) + kwargs = self.process_pandas_kwargs( + transformation.get("kwargs", {}), method + ) + + self.data[df_output] = getattr(self.data[df_input], method)(*args, **kwargs) + + previous_df = df_output diff --git a/flexmeasures/data/models/reporting/tests/__init__.py b/flexmeasures/data/models/reporting/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py new file mode 100644 index 000000000..64b14ff11 --- /dev/null +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -0,0 +1,121 @@ +import pytest + +from datetime import datetime, timedelta + +from pytz import utc + +from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter +from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.time_series import Sensor, TimedBelief + + +@pytest.fixture +def setup_dummy_data(db, app): + + """ + Create Sensors 2, 1 Asset and 1 AssetType + """ + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") + db.session.add(dummy_asset_type) + + dummy_asset = GenericAsset( + name="DummyGenericAsset", generic_asset_type=dummy_asset_type + ) + db.session.add(dummy_asset) + + sensor1 = Sensor("sensor 1", generic_asset=dummy_asset) + db.session.add(sensor1) + sensor2 = Sensor("sensor 2", generic_asset=dummy_asset) + db.session.add(sensor2) + report_sensor = Sensor("report sensor", generic_asset=dummy_asset) + db.session.add(report_sensor) + + """ + Create 2 DataSources + """ + source1 = DataSource("source1") + source2 = DataSource("source2") + + """ + Create TimedBeliefs + """ + beliefs = [] + for sensor in [sensor1, sensor2]: + for si, source in enumerate([source1, source2]): + for t in range(10): + print(si) + beliefs.append( + TimedBelief( + event_start=datetime(2023, 4, 10, tzinfo=utc) + + timedelta(hours=t + si), + belief_horizon=timedelta(hours=24), + event_value=t, + sensor=sensor, + source=source, + ) + ) + + db.session.add_all(beliefs) + db.session.commit() + + yield sensor1, sensor2, report_sensor + + db.session.delete(sensor1) + db.session.delete(sensor2) + + for b in beliefs: + db.session.delete(b) + + db.session.delete(dummy_asset) + db.session.delete(dummy_asset_type) + + db.session.commit() + + +def test_reporter(setup_dummy_data): + s1, s2, reporter_sensor = setup_dummy_data + + reporter_config_raw = dict( + start=str(datetime(2023, 4, 10, tzinfo=utc)), + end=str(datetime(2023, 4, 10, 10, tzinfo=utc)), + tb_query_config=[dict(sensor=s1.id), dict(sensor=s2.id)], + transformations=[ + dict( + df_input="sensor_1", + df_output="sensor_1_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_input="sensor_2", + df_output="sensor_2_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_output="df_merge", + df_input="sensor_1_source_1", + method="merge", + args=["@sensor_2_source_1"], + kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")), + ), + dict(method="resample", args=["2h"]), + dict(method="mean"), + dict(method="sum", kwargs=dict(axis=1)), + ], + final_df_output="df_merge", + ) + + reporter = PandasReporter(reporter_config_raw=reporter_config_raw) + + report1 = reporter.compute() + + assert len(report1) == 5 + assert str(report1.index[0]) == "2023-04-10 00:00:00+00:00" + + report2 = reporter.compute(start=str(datetime(2023, 4, 10, 3, tzinfo=utc))) + assert len(report2) == 4 + assert str(report2.index[0]) == "2023-04-10 02:00:00+00:00" diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py new file mode 100644 index 000000000..98a366587 --- /dev/null +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -0,0 +1,51 @@ +from marshmallow import Schema, fields + +from flexmeasures.data.schemas.sensors import SensorIdField +from flexmeasures.data.schemas.sources import DataSourceIdField + + +class TimeBeliefQueryConfigSchema(Schema): + """ + This schema implements the required fields to perform a TimeBeliefs search + using the method flexmeasures.data.models.time_series:TimedBelief.search + """ + + sensor = SensorIdField(required=True) + + event_starts_after = fields.DateTime() + event_ends_before = fields.DateTime() + + beliefs_after = fields.DateTime() + beliefs_before = fields.DateTime() + + horizons_at_least = fields.TimeDelta(precision="minutes") + horizons_at_most = fields.TimeDelta(precision="minutes") + + source = DataSourceIdField() + # user_source_ids: Optional[Union[int, List[int]]] = None, + + source_types = fields.List(fields.Str()) + exclude_source_types = fields.List(fields.Str()) + most_recent_beliefs_only = fields.Boolean() + most_recent_events_only = fields.Boolean() + + one_deterministic_belief_per_event = fields.Boolean() + one_deterministic_belief_per_event_per_source = fields.Boolean() + resolution = fields.TimeDelta() + sum_multiple = fields.Boolean() + + +class ReporterConfigSchema(Schema): + """ + This schema is used to validate Reporter class configurations (reporter_config). + Inherit from this to extend this schema with your own parameters. + + If the fields event_starts_after or event_ends_before are not present in `tb_query_config` + they will look up in the fields `start` and `end` + """ + + tb_query_config = fields.List( + fields.Nested(TimeBeliefQueryConfigSchema()), required=True + ) + start = fields.DateTime() + end = fields.DateTime() diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py new file mode 100644 index 000000000..cf3114607 --- /dev/null +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -0,0 +1,122 @@ +from marshmallow import Schema, fields, ValidationError, validates_schema +from inspect import signature + +from flexmeasures.data.schemas.reporting import ReporterConfigSchema +from pandas import DataFrame + +from timely_beliefs import BeliefsDataFrame + + +class PandasMethodCall(Schema): + + df_input = fields.Str() + df_output = fields.Str() + + method = fields.Str(required=True) + args = fields.List(fields.Raw()) + kwargs = fields.Dict() + + @validates_schema + def validate_method_call(self, data, **kwargs): + + method = data["method"] + method_callable = getattr( + DataFrame, method, None + ) # what if the object which is applied to is not a DataFrame... + + if not callable(method_callable): + raise ValidationError( + f"method {method} is not a valid Pandas DataFrame method." + ) + + method_signature = signature(method_callable) + + try: + args = data.get("args", []).copy() + _kwargs = data.get("kwargs", {}).copy() + + args.insert(0, DataFrame) + + method_signature.bind(*args, **_kwargs) + except TypeError: + raise ValidationError( + f"Bad arguments or keyword arguments for method {method}" + ) + + +class PandasReporterConfigSchema(ReporterConfigSchema): + """ + This schema lists fields that can be used to describe sensors in the optimised portfolio + + Example: + + { + "input_sensors" : [ + {'sensor' : 1} + ], + "transformations" : [ + { + "df_input" : "df2", + "df_output" : "df1", + "method" : "copy" + }, + { + "df_input" : "df2", + "df_output" : "df2", + "method" : "sum" + }, + { + "method" : "sum", + "kwargs" : {"axis" : 0} + } + + ] + } + + """ + + transformations = fields.List(fields.Nested(PandasMethodCall()), required=True) + final_df_output = fields.Str(required=True) + + @validates_schema + def validate_chainning(self, data, **kwargs): + """ + This validator ensures that we are always given an input and that the + final_df_output is computed. + """ + + # create dictionary data with objects of the types that is supposed to be generated + # loading the initial data, the sensors' data + fake_data = dict( + (f"sensor_{s['sensor'].id}", BeliefsDataFrame) + for s in data.get("tb_query_config") + ) + final_df_output = data.get("final_df_output") + + previous_df = None + final_df_output_method = None + + for transformation in data.get("transformations"): + + df_input = transformation.get("df_input", previous_df) + df_output = transformation.get("df_output", df_input) + + if df_output == final_df_output: + final_df_output_method = transformation.get("method") + + if not previous_df and not df_input: + raise ValidationError("Cannot find the input DataFrame.") + + previous_df = df_output # keeping last dataframe calculation + + fake_data[df_output] = BeliefsDataFrame + + if final_df_output not in fake_data: + raise ValidationError( + "Cannot find final output DataFrame among the resulting DataFrames." + ) + + if final_df_output_method in ["resample", "groupby"]: + raise ValidationError( + "Final output type cannot by of type `Resampler` or `DataFrameGroupBy`" + ) diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py new file mode 100644 index 000000000..7f94b8f24 --- /dev/null +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -0,0 +1,135 @@ +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType + +from flexmeasures.data.schemas.reporting.pandas_reporter import ( + PandasReporterConfigSchema, +) +from marshmallow.exceptions import ValidationError + +import pytest + + +@pytest.fixture(scope="module") +def setup_dummy_sensors(db, app): + + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") + db.session.add(dummy_asset_type) + + dummy_asset = GenericAsset( + name="DummyGenericAsset", generic_asset_type=dummy_asset_type + ) + db.session.add(dummy_asset) + + sensor1 = Sensor("sensor 1", generic_asset=dummy_asset) + db.session.add(sensor1) + sensor2 = Sensor("sensor 2", generic_asset=dummy_asset) + db.session.add(sensor2) + + db.session.commit() + + yield sensor1, sensor2 + + db.session.delete(sensor1) + db.session.delete(sensor2) + + db.session.commit() + + +@pytest.mark.parametrize( + "report_config, is_valid", + [ + ( + { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline + "tb_query_config": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + ], + "transformations": [ + { + "df_output": "final_output", + "df_input": "sensor_1", + "method": "copy", + } + ], + "final_df_output": "final_output", + }, + True, + ), + ( + { # this checks that chaining works, applying the method copy on the previous dataframe + "tb_query_config": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "copy"}, + ], + "final_df_output": "final_output", + }, + True, + ), + ( + { # this checks that resample cannot be the last method being applied + "tb_query_config": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + { + "sensor": 2, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "resample"}, + ], + "final_df_output": "final_output", + }, + False, + ), + ( + { # this checks that resample cannot be the last method being applied + "tb_query_config": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + { + "sensor": 2, + "event_starts_after": "2022-01-01T00:00:00", + "event_ends_before": "2022-01-01T23:00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "resample"}, + ], + "final_df_output": "final_output", + }, + False, + ), + ], +) +def test_pandas_reporter(report_config, is_valid, db, app, setup_dummy_sensors): + + schema = PandasReporterConfigSchema() + + if is_valid: + schema.load(report_config) + else: + with pytest.raises(ValidationError): + schema.load(report_config) From ce17568dcacf738f26f365ad9b53693bbbfdae0f Mon Sep 17 00:00:00 2001 From: victor Date: Mon, 17 Apr 2023 23:57:00 +0200 Subject: [PATCH 02/36] Added Tibber Reporter. Signed-off-by: victor --- .../data/models/reporting/__init__.py | 4 +- .../reporting/tests/test_tibber_reporter.py | 252 ++++++++++++++++++ .../data/schemas/reporting/__init__.py | 6 +- .../data/schemas/tests/test_reporting.py | 7 +- 4 files changed, 262 insertions(+), 7 deletions(-) create mode 100644 flexmeasures/data/models/reporting/tests/test_tibber_reporter.py diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index f14e7f9fa..ee14a414e 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -59,7 +59,9 @@ def fetch_data(self): self.data[f"source_{source.id}"] = source # saving bdf - self.data[f"sensor_{sensor.id}"] = bdf + self.data[ + f"sensor_{sensor.id}" + ] = bdf # TODO: Add alias to reference dataframes easily. e.g: dict(sensor = 1, alias="power"), def compute(self, *args, **kwargs) -> Optional[pd.DataFrame]: """This method triggers the creation of a new report. This method allows to update the fields diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py new file mode 100644 index 000000000..2cf83f0b9 --- /dev/null +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -0,0 +1,252 @@ +import pytest + +from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter +from flexmeasures.data.models.time_series import Sensor, DataSource, TimedBelief +from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset + +import pandas as pd +from datetime import datetime, timedelta +from pytz import utc + +index = pd.date_range( + datetime(2023, 4, 13), datetime(2023, 4, 13, 23), freq="H", tz=utc +) + +entsoe_prices = [ + 97.23, + 85.09, + 79.49, + 72.86, + 71.12, + 82.50, + 102.06, + 115.04, + 122.15, + 105.39, + 83.40, + 34.90, + -4.50, + -50.00, + -50.07, + -50.00, + -0.90, + 08.10, + 128.10, + 151.00, + 155.20, + 152.00, + 134.04, + 120.10, +] # EUR / MWh + +tibber_app_price = [ + 29.2, + 27.7, + 27.0, + 26.2, + 26.0, + 27.4, + 29.8, + 31.3, + 32.2, + 30.2, + 27.5, + 21.7, + 16.9, + 11.4, + 11.4, + 11.4, + 17.3, + 30.5, + 32.9, + 35.7, + 36.2, + 35.8, + 33.6, + 32.0, +] # EUR/MWh + + +class TibberReporter(PandasReporter): + def __init__(self, start: datetime, end: datetime) -> None: + """This class calculates the price of energy of a tariff indexed to the Day Ahead prices. + Energy Price = (1 + VAT) x ( EB + Tiber + DA Prices) + """ + + # search the sensors + EB = Sensor.query.filter(Sensor.name == "EB").one_or_none() + BWV = Sensor.query.filter(Sensor.name == "BWV").one_or_none() + tibber_tariff = Sensor.query.filter( + Sensor.name == "Tibber Tariff" + ).one_or_none() + + da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() + + tb_query_config_extra = dict( + resolution=3600, # 1h = 3600s + event_starts_after=str(start), + event_ends_before=str(end), + ) + + # creating the PandasReporter reporter config + reporter_config_raw = dict( + start=str(start), + end=str(end), + tb_query_config=[ + dict(sensor=EB.id, **tb_query_config_extra), + dict(sensor=BWV.id, **tb_query_config_extra), + dict(sensor=tibber_tariff.id, **tb_query_config_extra), + dict(sensor=da_prices.id, **tb_query_config_extra), + ], + transformations=[ + dict( + df_input="sensor_1", + df_output="BWV", + method="droplevel", + args=[[1, 2, 3]], + ), + dict(method="add", args=[1]), # this is to get 1 + BWV + dict( + df_input="sensor_2", + df_output="EB", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="sensor_3", + df_output="tibber_tariff", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="sensor_4", + df_output="da_prices", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + method="multiply", + args=[1 / 1000], + ), + dict( + df_output="energy_price", + df_input="EB", + method="add", + args=["@tibber_tariff"], + ), + dict(method="add", args=["@da_prices"]), + dict(method="multiply", args=["@BWV"]), + ], + final_df_output="energy_price", + ) + + super().__init__(reporter_config_raw) + + +def beliefs_from_timeseries(index, values, sensor, source): + beliefs = [] + for dt, value in zip(index, values): + beliefs.append( + TimedBelief( + event_start=dt, + belief_horizon=timedelta(hours=24), + event_value=value, + sensor=sensor, + source=source, + ) + ) + + return beliefs + + +@pytest.fixture() +def tibber_test_data(fresh_db, app): + db = fresh_db + + tax = GenericAssetType(name="Tax") + price = GenericAssetType(name="Price") + + db.session.add_all([tax, price]) + + # Belastingdienst + + electricity_price = GenericAsset(name="Electricity Price", generic_asset_type=price) + + VAT = GenericAsset(name="VAT", generic_asset_type=tax) + + electricity_tax = GenericAsset(name="Energy Tax", generic_asset_type=tax) + + db.session.add_all([electricity_price, VAT, electricity_tax]) + + # Taxes + BWV = Sensor("BWV", generic_asset=VAT, event_resolution=timedelta(days=365)) + EB = Sensor( + "EB", generic_asset=electricity_tax, event_resolution=timedelta(days=365) + ) + + # Tibber Tariff + tibber_tariff = Sensor( + "Tibber Tariff", + generic_asset=electricity_price, + event_resolution=timedelta(days=365), + ) + + db.session.add_all([BWV, EB, tibber_tariff]) + + """ + Saving TimeBeliefs to the DB + """ + + # Adding EB, BWV and Tibber Tarriff beliefs to the DB + for sensor, source_name, value in [ + (BWV, "Belastingdienst", 0.21), + (EB, "Belastingdienst", 0.12599), + (tibber_tariff, "Tibber", 0.018), + ]: + belief = TimedBelief( + sensor=sensor, + source=DataSource(source_name), + event_value=value, + event_start=datetime(2023, 1, 1, tzinfo=utc), + belief_time=datetime(2023, 1, 1, tzinfo=utc), + ) + + db.session.add(belief) + + # DA Prices + entsoe = DataSource("ENTSOE") + da_prices = Sensor( + "DA prices", + generic_asset=electricity_price, + event_resolution=timedelta(hours=1), + ) + db.session.add(da_prices) + da_prices_beliefs = beliefs_from_timeseries(index, entsoe_prices, da_prices, entsoe) + db.session.add_all(da_prices_beliefs) + + return + + +def test_tibber_reporter(tibber_test_data): + """ + This test checks if the calculation of the energy prices gets close enough to the ones + displayed in Tibber's App. + """ + + tibber_reporter = TibberReporter( + datetime(2023, 4, 13, tzinfo=utc), datetime(2023, 4, 14, tzinfo=utc) + ) + + result = tibber_reporter.compute() + + # checking that we've get a result for 24 hours + assert len(result) == 24 + + tibber_app_price_df = ( + pd.DataFrame(tibber_app_price, index=index, columns=["event_value"]) / 100 + ) + + # checking that (EPEX+EB + Tibber Tariff)*(1+BWV) = Tibber App Price + assert ( + abs(result - tibber_app_price_df).mean().iloc[0] < 0.01 + ) # difference of less than 1 cent / kWh diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 98a366587..61ff1aab0 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -18,8 +18,8 @@ class TimeBeliefQueryConfigSchema(Schema): beliefs_after = fields.DateTime() beliefs_before = fields.DateTime() - horizons_at_least = fields.TimeDelta(precision="minutes") - horizons_at_most = fields.TimeDelta(precision="minutes") + horizons_at_least = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) + horizons_at_most = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) source = DataSourceIdField() # user_source_ids: Optional[Union[int, List[int]]] = None, @@ -31,7 +31,7 @@ class TimeBeliefQueryConfigSchema(Schema): one_deterministic_belief_per_event = fields.Boolean() one_deterministic_belief_per_event_per_source = fields.Boolean() - resolution = fields.TimeDelta() + resolution = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) sum_multiple = fields.Boolean() diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 7f94b8f24..55c7f40a1 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -93,7 +93,7 @@ def setup_dummy_sensors(db, app): "transformations": [ {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, {"method": "copy"}, - {"df_output": "final_output", "method": "resample"}, + {"df_output": "final_output", "method": "resample", "args": ["1h"]}, ], "final_df_output": "final_output", }, @@ -116,11 +116,12 @@ def setup_dummy_sensors(db, app): "transformations": [ {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, {"method": "copy"}, - {"df_output": "final_output", "method": "resample"}, + {"df_output": "final_output", "method": "resample", "args": ["1h"]}, + {"method": "sum"}, ], "final_df_output": "final_output", }, - False, + True, ), ], ) From 848e39abf15309a56e6fc670bee28288a5f5872f Mon Sep 17 00:00:00 2001 From: victor Date: Thu, 20 Apr 2023 10:26:04 +0200 Subject: [PATCH 03/36] - Fixing wong DA Price value. - Renaming BWV and EB to english words. - Simplifying calculation (pandas pipeline). - Adding units to sensors. - Changing units from EUR/kWh to EUR/MWh - Adding assert to check maximum error - deserialize_report_config -> deserialize_reporter_config - Warning when a string starting with `@` is used in the method query or eval. - Making process_pandas_args, process_pandas_kwargs and apply_transformation private methods. Signed-off-by: victor --- .../data/models/reporting/__init__.py | 8 +- .../data/models/reporting/pandas_reporter.py | 40 +++++--- .../reporting/tests/test_tibber_reporter.py | 92 +++++++++++-------- 3 files changed, 82 insertions(+), 58 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index ee14a414e..6dd34a09c 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -117,13 +117,13 @@ def get_data_source_info(cls: type) -> dict: source_info["version"] = str(cls.__version__) else: current_app.logger.warning( - f"Scheduler {cls.__name__} loaded, but has no __version__ attribute." + f"Reporter {cls.__name__} loaded, but has no __version__ attribute." ) if hasattr(cls, "__author__"): source_info["name"] = str(cls.__author__) else: current_app.logger.warning( - f"Scheduler {cls.__name__} has no __author__ attribute." + f"Reporter {cls.__name__} has no __author__ attribute." ) return source_info @@ -132,7 +132,7 @@ def deserialize_config(self): Check all configurations we have, throwing either ValidationErrors or ValueErrors. Other code can decide if/how to handle those. """ - self.deserialize_report_config() + self.deserialize_reporter_config() self.deserialize_timing_config() def deserialize_timing_config(self): @@ -149,7 +149,7 @@ def deserialize_timing_config(self): if end < start: raise ValueError(f"Start {start} cannot be after end {end}.") - def deserialize_report_config(self): + def deserialize_reporter_config(self): """ Validate the report config against a Marshmallow Schema. Ideas: diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index e5cf9c1a9..260c92e7d 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -1,4 +1,7 @@ import pandas as pd + +from flask import current_app + from flexmeasures.data.models.reporting import Reporter from flexmeasures.data.schemas.reporting.pandas_reporter import ( PandasReporterConfigSchema, @@ -12,20 +15,25 @@ class PandasReporter(Reporter): __author__ = None schema = PandasReporterConfigSchema() - def deserialize_report_config(self): - # call super class deserialize_report_config - super().deserialize_report_config() + def deserialize_reporter_config(self): + # call super class deserialize_reporter_config + super().deserialize_reporter_config() # extract PandasReporter specific fields self.transformations = self.reporter_config.get("transformations") self.final_df_output = self.reporter_config.get("final_df_output") def _compute(self) -> pd.Series: - """""" - # apply pandas transformations to the dataframes in `self.data`` - self.apply_transformations() + """ + This method applies the transformations and outputs the dataframe + defined in `final_df_output` field of the report_config. + """ + + # apply pandas transformations to the dataframes in `self.data` + self._apply_transformations() final_output = self.data[self.final_df_output] + return final_output def get_object_or_literal(self, value, method): @@ -40,10 +48,14 @@ def get_object_or_literal(self, value, method): Examples >> self.get_object_or_literal(["@sensor_1", "@sensor_2"], "sum") - [[ ...n: 0:00:00, ...n: 0:00:00]] + [[ , ]] """ if method in ["eval", "query"]: + if isinstance(value, str) and value.startswith("@"): + current_app.logger.warning( + "Cannot reference objects in self.data using the method eval or query. That is because these methods use the symbol `@` to make reference to local variables." + ) return value if isinstance(value, str) and value.startswith("@"): @@ -55,7 +67,7 @@ def get_object_or_literal(self, value, method): return value - def process_pandas_args(self, args, method): + def _process_pandas_args(self, args, method): """This method applies the function get_object_or_literal to all the arguments to detect where to replace a string "@" with the actual object stored in `self.data[""]`. """ @@ -63,7 +75,7 @@ def process_pandas_args(self, args, method): args[i] = self.get_object_or_literal(args[i], method) return args - def process_pandas_kwargs(self, kwargs, method): + def _process_pandas_kwargs(self, kwargs, method): """This method applies the function get_object_or_literal to all the keyword arguments to detect where to replace a string "@" with the actual object stored in `self.data[""]`. """ @@ -71,15 +83,15 @@ def process_pandas_kwargs(self, kwargs, method): kwargs[k] = self.get_object_or_literal(v, method) return kwargs - def apply_transformations(self) -> pd.Series: + def _apply_transformations(self) -> pd.Series: """Convert the series using the given list of transformation specs, which is called in the order given. Each transformation specs should include a 'method' key specifying a method name of a Pandas DataFrame. Optionally, 'args' and 'kwargs' keys can be specified to pass on arguments or keyword arguments to the given method. - All data exchange is made through the dictionary `self.data`. The superclass Reporter already fetches and saves BeliefDataFrames - of the input sensors in the fields `sensor_`. In case you need to perform complex operations on dataframes, you can + All data exchange is made through the dictionary `self.data`. The superclass Reporter already fetches BeliefsDataFrames of + the sensors and saves them in the self.data dictionary fields `sensor_`. In case you need to perform complex operations on dataframes, you can split the operations in several steps and saving the intermediate results using the parameters `df_input` and `df_output` for the input and output dataframes, respectively. @@ -104,8 +116,8 @@ def apply_transformations(self) -> pd.Series: ) # default is OUTPUT = INPUT.method() method = transformation.get("method") - args = self.process_pandas_args(transformation.get("args", []), method) - kwargs = self.process_pandas_kwargs( + args = self._process_pandas_args(transformation.get("args", []), method) + kwargs = self._process_pandas_kwargs( transformation.get("kwargs", {}), method ) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 2cf83f0b9..c95ac819a 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -30,7 +30,7 @@ -50.07, -50.00, -0.90, - 08.10, + 108.10, 128.10, 151.00, 155.20, @@ -64,18 +64,18 @@ 35.8, 33.6, 32.0, -] # EUR/MWh +] # cents/kWh class TibberReporter(PandasReporter): def __init__(self, start: datetime, end: datetime) -> None: """This class calculates the price of energy of a tariff indexed to the Day Ahead prices. - Energy Price = (1 + VAT) x ( EB + Tiber + DA Prices) + Energy Price = (1 + VAT) x ( EnergyTax + Tiber + DA Prices) """ # search the sensors - EB = Sensor.query.filter(Sensor.name == "EB").one_or_none() - BWV = Sensor.query.filter(Sensor.name == "BWV").one_or_none() + EnergyTax = Sensor.query.filter(Sensor.name == "EnergyTax").one_or_none() + VAT = Sensor.query.filter(Sensor.name == "VAT").one_or_none() tibber_tariff = Sensor.query.filter( Sensor.name == "Tibber Tariff" ).one_or_none() @@ -84,31 +84,29 @@ def __init__(self, start: datetime, end: datetime) -> None: tb_query_config_extra = dict( resolution=3600, # 1h = 3600s - event_starts_after=str(start), - event_ends_before=str(end), ) # creating the PandasReporter reporter config - reporter_config_raw = dict( + reporter_config = dict( start=str(start), end=str(end), tb_query_config=[ - dict(sensor=EB.id, **tb_query_config_extra), - dict(sensor=BWV.id, **tb_query_config_extra), + dict(sensor=EnergyTax.id, **tb_query_config_extra), + dict(sensor=VAT.id, **tb_query_config_extra), dict(sensor=tibber_tariff.id, **tb_query_config_extra), dict(sensor=da_prices.id, **tb_query_config_extra), ], transformations=[ dict( df_input="sensor_1", - df_output="BWV", + df_output="VAT", method="droplevel", args=[[1, 2, 3]], ), - dict(method="add", args=[1]), # this is to get 1 + BWV + dict(method="add", args=[1]), # this is to get 1 + VAT dict( df_input="sensor_2", - df_output="EB", + df_output="EnergyTax", method="droplevel", args=[[1, 2, 3]], ), @@ -125,22 +123,21 @@ def __init__(self, start: datetime, end: datetime) -> None: args=[[1, 2, 3]], ), dict( - method="multiply", - args=[1 / 1000], - ), + method="add", args=["@tibber_tariff"] + ), # da_prices = da_prices + tibber_tariff dict( - df_output="energy_price", - df_input="EB", - method="add", - args=["@tibber_tariff"], - ), - dict(method="add", args=["@da_prices"]), - dict(method="multiply", args=["@BWV"]), + method="add", args=["@EnergyTax"] + ), # da_prices = da_prices + EnergyTax + dict( + method="multiply", args=["@VAT"] + ), # da_prices = da_price * VAT, VAT + dict(method="round", args=[2]), # round 2 decimals + dict(method="round", args=[1]), # round 1 decimal ], - final_df_output="energy_price", + final_df_output="da_prices", ) - super().__init__(reporter_config_raw) + super().__init__(reporter_config) def beliefs_from_timeseries(index, values, sensor, source): @@ -172,16 +169,24 @@ def tibber_test_data(fresh_db, app): electricity_price = GenericAsset(name="Electricity Price", generic_asset_type=price) - VAT = GenericAsset(name="VAT", generic_asset_type=tax) + VAT_asset = GenericAsset(name="VAT", generic_asset_type=tax) electricity_tax = GenericAsset(name="Energy Tax", generic_asset_type=tax) - db.session.add_all([electricity_price, VAT, electricity_tax]) + db.session.add_all([electricity_price, VAT_asset, electricity_tax]) # Taxes - BWV = Sensor("BWV", generic_asset=VAT, event_resolution=timedelta(days=365)) - EB = Sensor( - "EB", generic_asset=electricity_tax, event_resolution=timedelta(days=365) + VAT = Sensor( + "VAT", + generic_asset=VAT_asset, + event_resolution=timedelta(days=365), + unit="unit range", + ) + EnergyTax = Sensor( + "EnergyTax", + generic_asset=electricity_tax, + event_resolution=timedelta(days=365), + unit="EUR/MWh", ) # Tibber Tariff @@ -189,19 +194,20 @@ def tibber_test_data(fresh_db, app): "Tibber Tariff", generic_asset=electricity_price, event_resolution=timedelta(days=365), + unit="EUR/MWh", ) - db.session.add_all([BWV, EB, tibber_tariff]) + db.session.add_all([VAT, EnergyTax, tibber_tariff]) """ Saving TimeBeliefs to the DB """ - # Adding EB, BWV and Tibber Tarriff beliefs to the DB + # Adding EnergyTax, VAT and Tibber Tarriff beliefs to the DB for sensor, source_name, value in [ - (BWV, "Belastingdienst", 0.21), - (EB, "Belastingdienst", 0.12599), - (tibber_tariff, "Tibber", 0.018), + (VAT, "Tax Authority", 0.21), # unit interval + (EnergyTax, "Tax Authority", 125.99), # EUR / MWh + (tibber_tariff, "Tibber", 18.0), # EUR /MWh ]: belief = TimedBelief( sensor=sensor, @@ -243,10 +249,16 @@ def test_tibber_reporter(tibber_test_data): assert len(result) == 24 tibber_app_price_df = ( - pd.DataFrame(tibber_app_price, index=index, columns=["event_value"]) / 100 + pd.DataFrame(tibber_app_price, index=index, columns=["event_value"]) + * 10 # converting cents/kWh to EUR/MWh ) - # checking that (EPEX+EB + Tibber Tariff)*(1+BWV) = Tibber App Price - assert ( - abs(result - tibber_app_price_df).mean().iloc[0] < 0.01 - ) # difference of less than 1 cent / kWh + error = abs(result - tibber_app_price_df) + + # checking that (EPEX + EnergyTax + Tibber Tariff)*(1 + VAT) = Tibber App Price + + # mean error is low enough, i.e 1 EUR/MWh = 0.1 cent/kWh + assert error.mean().iloc[0] < 1 + + # max error is low enough, i.e 1 EUR/MWh = 0.1 cent/kWh + assert error.max().iloc[0] < 1 From d9867fc7dce6ae508d425a8e09e9dbb4cb8b1b19 Mon Sep 17 00:00:00 2001 From: victor Date: Thu, 20 Apr 2023 10:55:54 +0200 Subject: [PATCH 04/36] Updating VAT units. Signed-off-by: victor --- .../data/models/reporting/tests/test_tibber_reporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index c95ac819a..34673b0eb 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -180,7 +180,7 @@ def tibber_test_data(fresh_db, app): "VAT", generic_asset=VAT_asset, event_resolution=timedelta(days=365), - unit="unit range", + unit="", ) EnergyTax = Sensor( "EnergyTax", @@ -205,7 +205,7 @@ def tibber_test_data(fresh_db, app): # Adding EnergyTax, VAT and Tibber Tarriff beliefs to the DB for sensor, source_name, value in [ - (VAT, "Tax Authority", 0.21), # unit interval + (VAT, "Tax Authority", 0.21), (EnergyTax, "Tax Authority", 125.99), # EUR / MWh (tibber_tariff, "Tibber", 18.0), # EUR /MWh ]: From 1c9de43e3eff2ebf6f9809e45f934cd69b510fa3 Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 21 Apr 2023 09:35:48 +0200 Subject: [PATCH 05/36] - Attatching report to sensor - Output type of compute is BeliefDataFrame - Added a global input resolution to schem - ISO datetime and timedeltas - start, end and input_resolution are considered serialized when passed to the method compute - assert to check that result resolution = sensor resolution Signed-off-by: victor --- .../data/models/reporting/__init__.py | 44 ++++++++++++++----- .../data/models/reporting/pandas_reporter.py | 4 +- .../reporting/tests/test_pandas_reporter.py | 32 ++++++++++---- .../reporting/tests/test_tibber_reporter.py | 37 ++++++++++------ .../data/schemas/reporting/__init__.py | 21 +++++---- .../data/schemas/reporting/pandas_reporter.py | 2 +- .../data/schemas/tests/test_reporting.py | 24 +++++----- 7 files changed, 106 insertions(+), 58 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 6dd34a09c..abea7c911 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -4,8 +4,7 @@ from flask import current_app from flexmeasures.data.schemas.reporting import ReporterConfigSchema -from flexmeasures.data.models.time_series import TimedBelief -from flexmeasures.data.queries.utils import simplify_index +from flexmeasures.data.models.time_series import Sensor import timely_beliefs as tb @@ -16,18 +15,26 @@ class Reporter: __version__ = None __author__ = None + sensor: Sensor = None + reporter_config: Optional[dict] = None reporter_config_raw: Optional[dict] = None schema = ReporterConfigSchema data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None - def __init__(self, reporter_config_raw: Optional[dict] = None) -> None: + def __init__( + self, sensor: Sensor, reporter_config_raw: Optional[dict] = None + ) -> None: """ Initialize a new Reporter. - + Attributes: + :param sensor: sensor where the output of the reporter will be saved to. + :param reporter_config_raw: unserialized configuration of the reporter. """ + self.sensor = sensor + if not reporter_config_raw: reporter_config_raw = {} @@ -44,13 +51,14 @@ def fetch_data(self): # using start / end instead of event_starts_after/event_ends_before when not defined event_starts_after = tb_query.pop("event_starts_after", self.start) event_ends_before = tb_query.pop("event_ends_before", self.end) + event_resolution = tb_query.pop("event_resolution", self.input_resolution) sensor = tb_query.pop("sensor", None) - bdf = TimedBelief.search( - sensors=sensor, + bdf = sensor.search_beliefs( event_starts_after=event_starts_after, event_ends_before=event_ends_before, + event_resolution=event_resolution, **tb_query, ) @@ -63,7 +71,7 @@ def fetch_data(self): f"sensor_{sensor.id}" ] = bdf # TODO: Add alias to reference dataframes easily. e.g: dict(sensor = 1, alias="power"), - def compute(self, *args, **kwargs) -> Optional[pd.DataFrame]: + def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: """This method triggers the creation of a new report. This method allows to update the fields in reporter_config_raw passing them as keyword arguments or the whole `reporter_config_raw` by passing it in the kwarg `reporter_config_raw`. @@ -80,9 +88,17 @@ def compute(self, *args, **kwargs) -> Optional[pd.DataFrame]: self.reporter_config_raw.update(kwargs.get("reporter_config_raw")) else: # check for arguments in kwarg that could be potential fields of reporter config for key, value in kwargs.items(): - if key in self.reporter_config_raw: + + if ( + key in self.reporter_config_raw + ): # update reporter_config_raw with inputs from the method self.reporter_config_raw[key] = value + elif key in ["start", "end"]: # convert datetime to string + self.reporter_config_raw[key] = value.isoformat().replace("+", " ") + elif key in ["input_resolution"]: # convert timedelta into string + self.reporter_config_raw[key] = pd.Timedelta(value).isoformat() + # deserialize configuration self.deserialize_config() @@ -92,14 +108,19 @@ def compute(self, *args, **kwargs) -> Optional[pd.DataFrame]: # Result result = self._compute() - if isinstance(result, tb.BeliefsDataFrame): - result = simplify_index(result) + # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor + assert self.sensor.event_resolution == result.event_resolution + + # Assign sensor to BeliefDataFrame + result.sensor = self.sensor return result - def _compute(self) -> Optional[pd.DataFrame]: + def _compute(self) -> Optional[tb.BeliefsDataFrame]: """ Overwrite with the actual computation of your report. + + :returns BeliefsDataFrame: report as a BeliefsDataFrame. """ raise NotImplementedError() @@ -168,3 +189,4 @@ def deserialize_reporter_config(self): ) # extracting TimeBelief query configuration parameters self.start = self.reporter_config.get("start") self.end = self.reporter_config.get("end") + self.input_resolution = self.reporter_config.get("input_resolution") diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index 260c92e7d..fe8a977ff 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -47,8 +47,8 @@ def get_object_or_literal(self, value, method): given that they also use `@` to allow using local variables. Examples - >> self.get_object_or_literal(["@sensor_1", "@sensor_2"], "sum") - [[ , ]] + >> self.get_object_or_literal(["@df_wind", "@df_solar"], "sum") + [[ , ]] """ if method in ["eval", "query"]: diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 64b14ff11..978995a4d 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -17,18 +17,27 @@ def setup_dummy_data(db, app): Create Sensors 2, 1 Asset and 1 AssetType """ dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") - db.session.add(dummy_asset_type) + report_asset_type = GenericAssetType(name="ReportAssetType") + + db.session.add_all([dummy_asset_type, report_asset_type]) dummy_asset = GenericAsset( name="DummyGenericAsset", generic_asset_type=dummy_asset_type ) - db.session.add(dummy_asset) - sensor1 = Sensor("sensor 1", generic_asset=dummy_asset) + pandas_report = GenericAsset( + name="PandasReport", generic_asset_type=report_asset_type + ) + + db.session.add_all([dummy_asset, pandas_report]) + + sensor1 = Sensor("sensor 1", generic_asset=dummy_asset, event_resolution="1h") db.session.add(sensor1) - sensor2 = Sensor("sensor 2", generic_asset=dummy_asset) + sensor2 = Sensor("sensor 2", generic_asset=dummy_asset, event_resolution="1h") db.session.add(sensor2) - report_sensor = Sensor("report sensor", generic_asset=dummy_asset) + report_sensor = Sensor( + "report sensor", generic_asset=pandas_report, event_resolution="1h" + ) db.session.add(report_sensor) """ @@ -77,8 +86,8 @@ def test_reporter(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - start=str(datetime(2023, 4, 10, tzinfo=utc)), - end=str(datetime(2023, 4, 10, 10, tzinfo=utc)), + start="2023-04-10T00:00:00 00:00", + end="2023-04-10T10:00:00 00:00", tb_query_config=[dict(sensor=s1.id), dict(sensor=s2.id)], transformations=[ dict( @@ -109,13 +118,18 @@ def test_reporter(setup_dummy_data): final_df_output="df_merge", ) - reporter = PandasReporter(reporter_config_raw=reporter_config_raw) + reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) report1 = reporter.compute() assert len(report1) == 5 assert str(report1.index[0]) == "2023-04-10 00:00:00+00:00" + assert ( + report1.sensor == reporter_sensor + ) # check that the output sensor is effectively assigned. - report2 = reporter.compute(start=str(datetime(2023, 4, 10, 3, tzinfo=utc))) + report2 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc)) assert len(report2) == 4 assert str(report2.index[0]) == "2023-04-10 02:00:00+00:00" + + # TODO: resample with BeliefDataFrame specific method (resample_event) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 34673b0eb..4a61532a4 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -68,7 +68,7 @@ class TibberReporter(PandasReporter): - def __init__(self, start: datetime, end: datetime) -> None: + def __init__(self, sensor) -> None: """This class calculates the price of energy of a tariff indexed to the Day Ahead prices. Energy Price = (1 + VAT) x ( EnergyTax + Tiber + DA Prices) """ @@ -83,13 +83,11 @@ def __init__(self, start: datetime, end: datetime) -> None: da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() tb_query_config_extra = dict( - resolution=3600, # 1h = 3600s + resolution="PT1H", ) # creating the PandasReporter reporter config reporter_config = dict( - start=str(start), - end=str(end), tb_query_config=[ dict(sensor=EnergyTax.id, **tb_query_config_extra), dict(sensor=VAT.id, **tb_query_config_extra), @@ -131,13 +129,11 @@ def __init__(self, start: datetime, end: datetime) -> None: dict( method="multiply", args=["@VAT"] ), # da_prices = da_price * VAT, VAT - dict(method="round", args=[2]), # round 2 decimals - dict(method="round", args=[1]), # round 1 decimal ], final_df_output="da_prices", ) - super().__init__(reporter_config) + super().__init__(sensor, reporter_config) def beliefs_from_timeseries(index, values, sensor, source): @@ -162,10 +158,11 @@ def tibber_test_data(fresh_db, app): tax = GenericAssetType(name="Tax") price = GenericAssetType(name="Price") + report = GenericAssetType(name="Report") db.session.add_all([tax, price]) - # Belastingdienst + # Taxes electricity_price = GenericAsset(name="Electricity Price", generic_asset_type=price) @@ -173,7 +170,9 @@ def tibber_test_data(fresh_db, app): electricity_tax = GenericAsset(name="Energy Tax", generic_asset_type=tax) - db.session.add_all([electricity_price, VAT_asset, electricity_tax]) + tibber_report = GenericAsset(name="TibberReport", generic_asset_type=report) + + db.session.add_all([electricity_price, VAT_asset, electricity_tax, tibber_report]) # Taxes VAT = Sensor( @@ -230,7 +229,15 @@ def tibber_test_data(fresh_db, app): da_prices_beliefs = beliefs_from_timeseries(index, entsoe_prices, da_prices, entsoe) db.session.add_all(da_prices_beliefs) - return + tibber_report_sensor = Sensor( + "TibberReportSensor", + generic_asset=tibber_report, + event_resolution=timedelta(hours=1), + unit="EUR/MWh", + ) + db.session.add(tibber_report_sensor) + + return tibber_report_sensor def test_tibber_reporter(tibber_test_data): @@ -239,11 +246,13 @@ def test_tibber_reporter(tibber_test_data): displayed in Tibber's App. """ - tibber_reporter = TibberReporter( - datetime(2023, 4, 13, tzinfo=utc), datetime(2023, 4, 14, tzinfo=utc) - ) + tibber_report_sensor = tibber_test_data - result = tibber_reporter.compute() + tibber_reporter = TibberReporter(tibber_report_sensor) + + result = tibber_reporter.compute( + start=datetime(2023, 4, 13, tzinfo=utc), end=datetime(2023, 4, 14, tzinfo=utc) + ) # checking that we've get a result for 24 hours assert len(result) == 24 diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 61ff1aab0..c7f23eb13 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -3,6 +3,8 @@ from flexmeasures.data.schemas.sensors import SensorIdField from flexmeasures.data.schemas.sources import DataSourceIdField +from flexmeasures.data.schemas import AwareDateTimeField, DurationField + class TimeBeliefQueryConfigSchema(Schema): """ @@ -12,14 +14,14 @@ class TimeBeliefQueryConfigSchema(Schema): sensor = SensorIdField(required=True) - event_starts_after = fields.DateTime() - event_ends_before = fields.DateTime() + event_starts_after = AwareDateTimeField() + event_ends_before = AwareDateTimeField() - beliefs_after = fields.DateTime() - beliefs_before = fields.DateTime() + beliefs_after = AwareDateTimeField() + beliefs_before = AwareDateTimeField() - horizons_at_least = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) - horizons_at_most = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) + horizons_at_least = DurationField() + horizons_at_most = DurationField() source = DataSourceIdField() # user_source_ids: Optional[Union[int, List[int]]] = None, @@ -31,7 +33,7 @@ class TimeBeliefQueryConfigSchema(Schema): one_deterministic_belief_per_event = fields.Boolean() one_deterministic_belief_per_event_per_source = fields.Boolean() - resolution = fields.TimeDelta(precision=fields.TimeDelta.SECONDS) + resolution = DurationField() sum_multiple = fields.Boolean() @@ -47,5 +49,6 @@ class ReporterConfigSchema(Schema): tb_query_config = fields.List( fields.Nested(TimeBeliefQueryConfigSchema()), required=True ) - start = fields.DateTime() - end = fields.DateTime() + start = AwareDateTimeField() + end = AwareDateTimeField() + input_resolution = DurationField() diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index cf3114607..4330fd257 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -79,7 +79,7 @@ class PandasReporterConfigSchema(ReporterConfigSchema): final_df_output = fields.Str(required=True) @validates_schema - def validate_chainning(self, data, **kwargs): + def validate_chaining(self, data, **kwargs): """ This validator ensures that we are always given an input and that the final_df_output is computed. diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 55c7f40a1..8e5000294 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -43,8 +43,8 @@ def setup_dummy_sensors(db, app): "tb_query_config": [ { "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, ], "transformations": [ @@ -63,8 +63,8 @@ def setup_dummy_sensors(db, app): "tb_query_config": [ { "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, ], "transformations": [ @@ -81,13 +81,13 @@ def setup_dummy_sensors(db, app): "tb_query_config": [ { "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, { "sensor": 2, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, ], "transformations": [ @@ -104,13 +104,13 @@ def setup_dummy_sensors(db, app): "tb_query_config": [ { "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, { "sensor": 2, - "event_starts_after": "2022-01-01T00:00:00", - "event_ends_before": "2022-01-01T23:00:00", + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", }, ], "transformations": [ From f61825bc5bd266f42be15c1d9b9a1b4ecbedb456 Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 21 Apr 2023 10:25:10 +0200 Subject: [PATCH 06/36] Fixing wrong arguments to search_beliefs method. Signed-off-by: victor --- flexmeasures/data/models/reporting/__init__.py | 4 ++-- .../data/models/reporting/tests/test_tibber_reporter.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index abea7c911..78dc64c7c 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -51,14 +51,14 @@ def fetch_data(self): # using start / end instead of event_starts_after/event_ends_before when not defined event_starts_after = tb_query.pop("event_starts_after", self.start) event_ends_before = tb_query.pop("event_ends_before", self.end) - event_resolution = tb_query.pop("event_resolution", self.input_resolution) + resolution = tb_query.pop("resolution", self.input_resolution) sensor = tb_query.pop("sensor", None) bdf = sensor.search_beliefs( event_starts_after=event_starts_after, event_ends_before=event_ends_before, - event_resolution=event_resolution, + resolution=resolution, **tb_query, ) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 4a61532a4..394fc99d9 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -82,12 +82,11 @@ def __init__(self, sensor) -> None: da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() - tb_query_config_extra = dict( - resolution="PT1H", - ) + tb_query_config_extra = dict() # creating the PandasReporter reporter config reporter_config = dict( + input_resolution="PT1H", tb_query_config=[ dict(sensor=EnergyTax.id, **tb_query_config_extra), dict(sensor=VAT.id, **tb_query_config_extra), From b01f5c32303ecb8d974a8bb2407f467f17491cc2 Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 21 Apr 2023 11:24:30 +0200 Subject: [PATCH 07/36] FIxing wrong type conversion logic. Signed-off-by: victor --- flexmeasures/data/models/reporting/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 78dc64c7c..7f08315a5 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -94,9 +94,9 @@ def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: ): # update reporter_config_raw with inputs from the method self.reporter_config_raw[key] = value - elif key in ["start", "end"]: # convert datetime to string + if key in ["start", "end"]: # convert datetime to string self.reporter_config_raw[key] = value.isoformat().replace("+", " ") - elif key in ["input_resolution"]: # convert timedelta into string + if key in ["input_resolution"]: # convert timedelta into string self.reporter_config_raw[key] = pd.Timedelta(value).isoformat() # deserialize configuration From 487233c97e96b607977aeed1c78ced19bab9a19b Mon Sep 17 00:00:00 2001 From: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Date: Mon, 24 Apr 2023 22:45:20 +0200 Subject: [PATCH 08/36] Small reporter fixes (#647) * No return value Signed-off-by: F.N. Claessen * typo Signed-off-by: F.N. Claessen * plural Signed-off-by: F.N. Claessen * indentation Signed-off-by: F.N. Claessen * fix return type annotations Signed-off-by: F.N. Claessen * format docstring example Signed-off-by: F.N. Claessen * add type annotations Signed-off-by: F.N. Claessen * predefine instance attributes Signed-off-by: F.N. Claessen * remove redundant variable Signed-off-by: F.N. Claessen * grammar Signed-off-by: F.N. Claessen * support aliases Signed-off-by: F.N. Claessen * grammar/typos Signed-off-by: F.N. Claessen * test exact match Signed-off-by: F.N. Claessen --------- Signed-off-by: F.N. Claessen --- .../data/models/reporting/__init__.py | 28 ++++++++-------- .../data/models/reporting/pandas_reporter.py | 25 +++++++++------ .../reporting/tests/test_tibber_reporter.py | 32 ++++++++----------- .../data/schemas/reporting/__init__.py | 1 + 4 files changed, 44 insertions(+), 42 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 7f08315a5..53804e246 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -53,7 +53,8 @@ def fetch_data(self): event_ends_before = tb_query.pop("event_ends_before", self.end) resolution = tb_query.pop("resolution", self.input_resolution) - sensor = tb_query.pop("sensor", None) + sensor: Sensor = tb_query.pop("sensor", None) + alias: str = tb_query.pop("alias", None) bdf = sensor.search_beliefs( event_starts_after=event_starts_after, @@ -62,31 +63,32 @@ def fetch_data(self): **tb_query, ) - # adding sources + # store data source as local variable for source in bdf.sources.unique(): self.data[f"source_{source.id}"] = source - # saving bdf - self.data[ - f"sensor_{sensor.id}" - ] = bdf # TODO: Add alias to reference dataframes easily. e.g: dict(sensor = 1, alias="power"), + # store BeliefsDataFrame as local variable + if alias: + self.data[alias] = bdf + else: + self.data[f"sensor_{sensor.id}"] = bdf def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: """This method triggers the creation of a new report. This method allows to update the fields in reporter_config_raw passing them as keyword arguments or the whole `reporter_config_raw` by passing it in the kwarg `reporter_config_raw`. - Overall, this method follows these steps: - 1) Updating the reporter_config with the kwargs of the method compute. - 2) Triggers config deserialization. - 3) Fetches the data of the sensors described by the field `tb_query_config`. - 4) If the output is BeliefsDataFrame, it simplifies it into a DataFrame + Overall, this method follows these steps: + 1) Updating the reporter_config with the kwargs of the method compute. + 2) Triggers config deserialization. + 3) Fetches the data of the sensors described by the field `tb_query_config`. + 4) If the output is BeliefsDataFrame, it simplifies it into a DataFrame """ # if report_config in kwargs if "reporter_config_raw" in kwargs: self.reporter_config_raw.update(kwargs.get("reporter_config_raw")) - else: # check for arguments in kwarg that could be potential fields of reporter config + else: # check for arguments in kwargs that could be potential fields of reporter config for key, value in kwargs.items(): if ( @@ -116,7 +118,7 @@ def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: return result - def _compute(self) -> Optional[tb.BeliefsDataFrame]: + def _compute(self) -> tb.BeliefsDataFrame: """ Overwrite with the actual computation of your report. diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index fe8a977ff..93f52b9f6 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -1,6 +1,9 @@ -import pandas as pd +from __future__ import annotations + +from typing import Any from flask import current_app +import timely_beliefs as tb from flexmeasures.data.models.reporting import Reporter from flexmeasures.data.schemas.reporting.pandas_reporter import ( @@ -14,6 +17,8 @@ class PandasReporter(Reporter): __version__ = "1" __author__ = None schema = PandasReporterConfigSchema() + transformations: list[dict[str, Any]] = None + final_df_output: str = None def deserialize_reporter_config(self): # call super class deserialize_reporter_config @@ -23,7 +28,7 @@ def deserialize_reporter_config(self): self.transformations = self.reporter_config.get("transformations") self.final_df_output = self.reporter_config.get("final_df_output") - def _compute(self) -> pd.Series: + def _compute(self) -> tb.BeliefsDataFrame: """ This method applies the transformations and outputs the dataframe defined in `final_df_output` field of the report_config. @@ -36,7 +41,7 @@ def _compute(self) -> pd.Series: return final_output - def get_object_or_literal(self, value, method): + def get_object_or_literal(self, value: Any, method: str) -> Any: """This method allows using the dataframes as inputs of the Pandas methods that are run in the transformations. Make sure that they have been created before accessed. @@ -46,9 +51,9 @@ def get_object_or_literal(self, value, method): This functionality is disabled for methods `eval`and `query` to avoid interfering their internal behaviour given that they also use `@` to allow using local variables. - Examples - >> self.get_object_or_literal(["@df_wind", "@df_solar"], "sum") - [[ , ]] + Example: + >>> self.get_object_or_literal(["@df_wind", "@df_solar"], "sum") + [, ] """ if method in ["eval", "query"]: @@ -67,7 +72,7 @@ def get_object_or_literal(self, value, method): return value - def _process_pandas_args(self, args, method): + def _process_pandas_args(self, args: list, method: str) -> list: """This method applies the function get_object_or_literal to all the arguments to detect where to replace a string "@" with the actual object stored in `self.data[""]`. """ @@ -75,7 +80,7 @@ def _process_pandas_args(self, args, method): args[i] = self.get_object_or_literal(args[i], method) return args - def _process_pandas_kwargs(self, kwargs, method): + def _process_pandas_kwargs(self, kwargs: dict, method: str) -> dict: """This method applies the function get_object_or_literal to all the keyword arguments to detect where to replace a string "@" with the actual object stored in `self.data[""]`. """ @@ -83,7 +88,7 @@ def _process_pandas_kwargs(self, kwargs, method): kwargs[k] = self.get_object_or_literal(v, method) return kwargs - def _apply_transformations(self) -> pd.Series: + def _apply_transformations(self): """Convert the series using the given list of transformation specs, which is called in the order given. Each transformation specs should include a 'method' key specifying a method name of a Pandas DataFrame. @@ -99,7 +104,7 @@ def _apply_transformations(self) -> pd.Series: The example below converts from hourly meter readings in kWh to electricity demand in kW. transformations = [ - {"mehod": "diff"}, + {"method": "diff"}, {"method": "shift", "kwargs": {"periods": -1}}, {"method": "head", "args": [-1]}, ], diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 394fc99d9..859eaa765 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -51,7 +51,7 @@ 32.2, 30.2, 27.5, - 21.7, + 21.6, # originally 21.7 due to a rounding error from 216.4569 EUR/MWh (probably via 216.5) to 217 EUR/MWh 16.9, 11.4, 11.4, @@ -82,16 +82,14 @@ def __init__(self, sensor) -> None: da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() - tb_query_config_extra = dict() - - # creating the PandasReporter reporter config + # create the PandasReporter reporter config reporter_config = dict( input_resolution="PT1H", tb_query_config=[ - dict(sensor=EnergyTax.id, **tb_query_config_extra), - dict(sensor=VAT.id, **tb_query_config_extra), - dict(sensor=tibber_tariff.id, **tb_query_config_extra), - dict(sensor=da_prices.id, **tb_query_config_extra), + dict(sensor=EnergyTax.id, alias="energy_tax_df"), + dict(sensor=VAT.id), + dict(sensor=tibber_tariff.id), + dict(sensor=da_prices.id), ], transformations=[ dict( @@ -102,7 +100,7 @@ def __init__(self, sensor) -> None: ), dict(method="add", args=[1]), # this is to get 1 + VAT dict( - df_input="sensor_2", + df_input="energy_tax_df", df_output="EnergyTax", method="droplevel", args=[[1, 2, 3]], @@ -128,6 +126,7 @@ def __init__(self, sensor) -> None: dict( method="multiply", args=["@VAT"] ), # da_prices = da_price * VAT, VAT + dict(method="round"), ], final_df_output="da_prices", ) @@ -201,7 +200,7 @@ def tibber_test_data(fresh_db, app): Saving TimeBeliefs to the DB """ - # Adding EnergyTax, VAT and Tibber Tarriff beliefs to the DB + # Add EnergyTax, VAT and Tibber Tariff beliefs to the DB for sensor, source_name, value in [ (VAT, "Tax Authority", 0.21), (EnergyTax, "Tax Authority", 125.99), # EUR / MWh @@ -253,20 +252,15 @@ def test_tibber_reporter(tibber_test_data): start=datetime(2023, 4, 13, tzinfo=utc), end=datetime(2023, 4, 14, tzinfo=utc) ) - # checking that we've get a result for 24 hours + # check that we got a result for 24 hours assert len(result) == 24 tibber_app_price_df = ( pd.DataFrame(tibber_app_price, index=index, columns=["event_value"]) - * 10 # converting cents/kWh to EUR/MWh + * 10 # convert cents/kWh to EUR/MWh ) error = abs(result - tibber_app_price_df) - # checking that (EPEX + EnergyTax + Tibber Tariff)*(1 + VAT) = Tibber App Price - - # mean error is low enough, i.e 1 EUR/MWh = 0.1 cent/kWh - assert error.mean().iloc[0] < 1 - - # max error is low enough, i.e 1 EUR/MWh = 0.1 cent/kWh - assert error.max().iloc[0] < 1 + # check that (EPEX + EnergyTax + Tibber Tariff)*(1 + VAT) = Tibber App Price + assert error.sum()[0] == 0 diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index c7f23eb13..0ec6f286f 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -13,6 +13,7 @@ class TimeBeliefQueryConfigSchema(Schema): """ sensor = SensorIdField(required=True) + alias = fields.Str() event_starts_after = AwareDateTimeField() event_ends_before = AwareDateTimeField() From f34f5e18bf7ad3bcf640f6b7917b63d7f3f604c6 Mon Sep 17 00:00:00 2001 From: victor Date: Mon, 24 Apr 2023 23:29:25 +0200 Subject: [PATCH 09/36] Add superclass to Reporter that will be common to all three data generator classes: Reporter, Scheduler, Forecaster. Signed-off-by: victor --- flexmeasures/data/models/data_sources.py | 28 ++++++++++++++++++ .../data/models/reporting/__init__.py | 29 ++----------------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 7705400c0..4d88398a1 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -4,11 +4,39 @@ import timely_beliefs as tb from flexmeasures.data import db +from flask import current_app if TYPE_CHECKING: from flexmeasures.data.models.user import User +class DataGenerator: + @classmethod + def get_data_source_info(cls: type) -> dict: + """ + Create and return the data source info, from which a data source lookup/creation is possible. + + See for instance get_data_source_for_job(). + """ + source_info = dict( + model=cls.__name__, version="1", name="Unknown author" + ) # default + + if hasattr(cls, "__version__"): + source_info["version"] = str(cls.__version__) + else: + current_app.logger.warning( + f"{cls.__data_generator_base__} {cls.__name__} loaded, but has no __version__ attribute." + ) + if hasattr(cls, "__author__"): + source_info["name"] = str(cls.__author__) + else: + current_app.logger.warning( + f"{cls.__data_generator_base__} {cls.__name__} has no __author__ attribute." + ) + return source_info + + class DataSource(db.Model, tb.BeliefSourceDBMixin): """Each data source is a data-providing entity.""" diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 7f08315a5..0c0f5ddab 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -1,19 +1,20 @@ from typing import Optional, Union, Dict import pandas as pd -from flask import current_app from flexmeasures.data.schemas.reporting import ReporterConfigSchema from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.data_sources import DataGenerator import timely_beliefs as tb -class Reporter: +class Reporter(DataGenerator): """Superclass for all FlexMeasures Reporters.""" __version__ = None __author__ = None + __data_generator_base__ = "Reporter" sensor: Sensor = None @@ -124,30 +125,6 @@ def _compute(self) -> Optional[tb.BeliefsDataFrame]: """ raise NotImplementedError() - @classmethod - def get_data_source_info(cls: type) -> dict: - """ - Create and return the data source info, from which a data source lookup/creation is possible. - See for instance get_data_source_for_job(). - """ - source_info = dict( - model=cls.__name__, version="1", name="Unknown author" - ) # default - - if hasattr(cls, "__version__"): - source_info["version"] = str(cls.__version__) - else: - current_app.logger.warning( - f"Reporter {cls.__name__} loaded, but has no __version__ attribute." - ) - if hasattr(cls, "__author__"): - source_info["name"] = str(cls.__author__) - else: - current_app.logger.warning( - f"Reporter {cls.__name__} has no __author__ attribute." - ) - return source_info - def deserialize_config(self): """ Check all configurations we have, throwing either ValidationErrors or ValueErrors. From dd21c99c9801e2e629a8f96cf4b5fa5a99081e9e Mon Sep 17 00:00:00 2001 From: victor Date: Mon, 24 Apr 2023 23:55:48 +0200 Subject: [PATCH 10/36] Add start, end, resolution, beliefs_after and beliefs_before to the `compute` method signature. Signed-off-by: victor --- .../data/models/reporting/__init__.py | 49 +++++++++++++------ .../reporting/tests/test_tibber_reporter.py | 3 +- .../data/schemas/reporting/__init__.py | 2 + 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 7f68c9344..f9b975194 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -6,6 +6,9 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.data.models.data_sources import DataGenerator + +from datetime import datetime, timedelta + import timely_beliefs as tb @@ -22,6 +25,11 @@ class Reporter(DataGenerator): reporter_config_raw: Optional[dict] = None schema = ReporterConfigSchema data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None + start: datetime = None + end: datetime = None + input_resolution: timedelta = None + beliefs_after: datetime = None + beliefs_before: datetime = None def __init__( self, sensor: Sensor, reporter_config_raw: Optional[dict] = None @@ -53,6 +61,8 @@ def fetch_data(self): event_starts_after = tb_query.pop("event_starts_after", self.start) event_ends_before = tb_query.pop("event_ends_before", self.end) resolution = tb_query.pop("resolution", self.input_resolution) + beliefs_after = tb_query.pop("beliefs_after", self.beliefs_after) + beliefs_before = tb_query.pop("beliefs_before", self.beliefs_before) sensor: Sensor = tb_query.pop("sensor", None) alias: str = tb_query.pop("alias", None) @@ -61,6 +71,8 @@ def fetch_data(self): event_starts_after=event_starts_after, event_ends_before=event_ends_before, resolution=resolution, + beliefs_after=beliefs_after, + beliefs_before=beliefs_before, **tb_query, ) @@ -74,7 +86,16 @@ def fetch_data(self): else: self.data[f"sensor_{sensor.id}"] = bdf - def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: + def compute( + self, + *args, + start: datetime = None, + end: datetime = None, + input_resolution: datetime = None, + beliefs_after: datetime = None, + beliefs_before: datetime = None, + **kwargs, + ) -> tb.BeliefsDataFrame: """This method triggers the creation of a new report. This method allows to update the fields in reporter_config_raw passing them as keyword arguments or the whole `reporter_config_raw` by passing it in the kwarg `reporter_config_raw`. @@ -89,22 +110,16 @@ def compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: # if report_config in kwargs if "reporter_config_raw" in kwargs: self.reporter_config_raw.update(kwargs.get("reporter_config_raw")) - else: # check for arguments in kwargs that could be potential fields of reporter config - for key, value in kwargs.items(): - - if ( - key in self.reporter_config_raw - ): # update reporter_config_raw with inputs from the method - self.reporter_config_raw[key] = value - - if key in ["start", "end"]: # convert datetime to string - self.reporter_config_raw[key] = value.isoformat().replace("+", " ") - if key in ["input_resolution"]: # convert timedelta into string - self.reporter_config_raw[key] = pd.Timedelta(value).isoformat() # deserialize configuration self.deserialize_config() + self.start = start + self.end = end + self.input_resolution = input_resolution + self.beliefs_after = beliefs_after + self.beliefs_before = beliefs_before + # fetch data self.fetch_data() @@ -146,8 +161,11 @@ def deserialize_timing_config(self): start = tb_query.get("event_starts_after", self.start) end = tb_query.get("event_ends_before ", self.end) - if end < start: - raise ValueError(f"Start {start} cannot be after end {end}.") + if ( + start is not None and end is not None + ): # not testing when start or end are missing + if end < start: + raise ValueError(f"Start {start} cannot be after end {end}.") def deserialize_reporter_config(self): """ @@ -169,3 +187,4 @@ def deserialize_reporter_config(self): self.start = self.reporter_config.get("start") self.end = self.reporter_config.get("end") self.input_resolution = self.reporter_config.get("input_resolution") + self.belief_time = self.reporter_config.get("belief_time") diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 859eaa765..563eabd71 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -249,7 +249,8 @@ def test_tibber_reporter(tibber_test_data): tibber_reporter = TibberReporter(tibber_report_sensor) result = tibber_reporter.compute( - start=datetime(2023, 4, 13, tzinfo=utc), end=datetime(2023, 4, 14, tzinfo=utc) + start=datetime(2023, 4, 13, tzinfo=utc), + end=datetime(2023, 4, 13, 23, tzinfo=utc), ) # check that we got a result for 24 hours diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 0ec6f286f..59d90aadf 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -53,3 +53,5 @@ class ReporterConfigSchema(Schema): start = AwareDateTimeField() end = AwareDateTimeField() input_resolution = DurationField() + beliefs_after = AwareDateTimeField() + beliefs_before = AwareDateTimeField() From 58d405fd0de6d216c2f75c644cb30140b0474429 Mon Sep 17 00:00:00 2001 From: victor Date: Tue, 25 Apr 2023 00:07:26 +0200 Subject: [PATCH 11/36] Add FLEXMEASURES_DEFAULT_DATASOURCE config to be the feault datasource for data generators. Signed-off-by: victor --- documentation/configuration.rst | 9 +++++++++ flexmeasures/data/models/data_sources.py | 14 +------------- flexmeasures/utils/config_defaults.py | 1 + 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/documentation/configuration.rst b/documentation/configuration.rst index 8e60ee680..e0fea1328 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -247,6 +247,15 @@ Time to live for schedule UUIDs of successful scheduling jobs. Set a negative ti Default: ``timedelta(days=7)`` +.. _datasource_config: + +FLEXMEASURES_DEFAULT_DATASOURCE +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Data generation classes will have this data source name as its sensor's data source output default. + +Default: ``"FlexMeasures"`` + .. _planning_horizon_config: diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 4d88398a1..61b74e980 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -19,21 +19,9 @@ def get_data_source_info(cls: type) -> dict: See for instance get_data_source_for_job(). """ source_info = dict( - model=cls.__name__, version="1", name="Unknown author" + name=current_app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") ) # default - if hasattr(cls, "__version__"): - source_info["version"] = str(cls.__version__) - else: - current_app.logger.warning( - f"{cls.__data_generator_base__} {cls.__name__} loaded, but has no __version__ attribute." - ) - if hasattr(cls, "__author__"): - source_info["name"] = str(cls.__author__) - else: - current_app.logger.warning( - f"{cls.__data_generator_base__} {cls.__name__} has no __author__ attribute." - ) return source_info diff --git a/flexmeasures/utils/config_defaults.py b/flexmeasures/utils/config_defaults.py index d38fe8c9e..c280edebc 100644 --- a/flexmeasures/utils/config_defaults.py +++ b/flexmeasures/utils/config_defaults.py @@ -119,6 +119,7 @@ class Config(object): FLEXMEASURES_PLANNING_TTL: timedelta = timedelta( days=7 ) # Time to live for UDI event ids of successful scheduling jobs. Set a negative timedelta to persist forever. + FLEXMEASURES_DEFAULT_DATASOURCE: str = "FlexMeasures" FLEXMEASURES_TASK_CHECK_AUTH_TOKEN: Optional[str] = None FLEXMEASURES_REDIS_URL: str = "localhost" FLEXMEASURES_REDIS_PORT: int = 6379 From bebc32017a0f591fb19b2ddd1dff50dac5135780 Mon Sep 17 00:00:00 2001 From: victor Date: Thu, 27 Apr 2023 10:27:38 +0200 Subject: [PATCH 12/36] Fixing wrong input type. Signed-off-by: victor --- flexmeasures/data/models/reporting/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index f9b975194..fbcd05177 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -91,7 +91,7 @@ def compute( *args, start: datetime = None, end: datetime = None, - input_resolution: datetime = None, + input_resolution: timedelta = None, beliefs_after: datetime = None, beliefs_before: datetime = None, **kwargs, From 0dfe5f4945cf2f79b75ea7b02be26317a2ac3ee8 Mon Sep 17 00:00:00 2001 From: victor Date: Thu, 27 Apr 2023 11:24:00 +0200 Subject: [PATCH 13/36] Rename DataGenerator class to DataGeneratorMixin Signed-off-by: victor --- flexmeasures/data/models/data_sources.py | 2 +- flexmeasures/data/models/reporting/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 61b74e980..ed446bc55 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -10,7 +10,7 @@ from flexmeasures.data.models.user import User -class DataGenerator: +class DataGeneratorMixin: @classmethod def get_data_source_info(cls: type) -> dict: """ diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index fbcd05177..7db2a4898 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -4,7 +4,7 @@ from flexmeasures.data.schemas.reporting import ReporterConfigSchema from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.data_sources import DataGenerator +from flexmeasures.data.models.data_sources import DataGeneratorMixin from datetime import datetime, timedelta @@ -12,7 +12,7 @@ import timely_beliefs as tb -class Reporter(DataGenerator): +class Reporter(DataGeneratorMixin): """Superclass for all FlexMeasures Reporters.""" __version__ = None From 7830e338a0d4a59654d3784be771ec1ae8c707c7 Mon Sep 17 00:00:00 2001 From: victor Date: Thu, 27 Apr 2023 11:25:54 +0200 Subject: [PATCH 14/36] Reduce logging level from warning to debug. Signed-off-by: victor --- flexmeasures/data/models/reporting/pandas_reporter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index 93f52b9f6..a8efef2f3 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -58,7 +58,7 @@ def get_object_or_literal(self, value: Any, method: str) -> Any: if method in ["eval", "query"]: if isinstance(value, str) and value.startswith("@"): - current_app.logger.warning( + current_app.logger.debug( "Cannot reference objects in self.data using the method eval or query. That is because these methods use the symbol `@` to make reference to local variables." ) return value From 6ab84df0b1e9399aa2031d66838d9c1bd7a6fe1d Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Fri, 28 Apr 2023 00:15:24 +0200 Subject: [PATCH 15/36] Register Reporter to the app context. Signed-off-by: Victor Garcia Reolid --- flexmeasures/app.py | 6 +++++ flexmeasures/utils/coding_utils.py | 35 ++++++++++++++++++++++++++++++ flexmeasures/utils/plugin_utils.py | 7 ++++++ 3 files changed, 48 insertions(+) diff --git a/flexmeasures/app.py b/flexmeasures/app.py index 77514c44d..baf658f85 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -100,6 +100,12 @@ def create( register_db_at(app) + # Register Reporters + from flexmeasures.utils.coding_utils import get_classes_module + from flexmeasures.data.models.reporting import Reporter + + app.reporters = get_classes_module("flexmeasures.data.models.reporting", Reporter) + # add auth policy from flexmeasures.auth import register_at as register_auth_at diff --git a/flexmeasures/utils/coding_utils.py b/flexmeasures/utils/coding_utils.py index 5c2f1b3c0..a3fc8dd9c 100644 --- a/flexmeasures/utils/coding_utils.py +++ b/flexmeasures/utils/coding_utils.py @@ -3,6 +3,8 @@ import functools import time import inspect +import importlib +import pkgutil from typing import Union from flask import current_app @@ -170,3 +172,36 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def find_classes_module(module, superclass, skiptest=True): + classes = [] + reporting_module = importlib.import_module(module) + + for submodule in pkgutil.iter_modules(reporting_module.__path__): + + if skiptest and ("test" in f"{module}.{submodule.name}"): + continue + + if submodule.ispkg: + classes.extend( + find_classes_module( + f"{module}.{submodule.name}", superclass, skiptest=skiptest + ) + ) + else: + module_object = importlib.import_module(f"{module}.{submodule.name}") + module_classes = inspect.getmembers(module_object, inspect.isclass) + classes.extend( + [ + (class_name, klass) + for class_name, klass in module_classes + if issubclass(klass, superclass) and klass != superclass + ] + ) + + return classes + + +def get_classes_module(module, superclass, skiptest=True) -> dict: + return dict(find_classes_module(module, superclass, skiptest=skiptest)) diff --git a/flexmeasures/utils/plugin_utils.py b/flexmeasures/utils/plugin_utils.py index 43da08693..2dc35ca57 100644 --- a/flexmeasures/utils/plugin_utils.py +++ b/flexmeasures/utils/plugin_utils.py @@ -8,6 +8,8 @@ import sentry_sdk from flask import Flask, Blueprint +from flexmeasures.utils.coding_utils import get_classes_module + def register_plugins(app: Flask): """ @@ -96,6 +98,11 @@ def register_plugins(app: Flask): app.logger.debug(f"Registering {plugin_blueprint} ...") app.register_blueprint(plugin_blueprint) + # Loading reporters + from flexmeasures.data.models.reporting import Reporter + + app.reporters.extend(get_classes_module(module, Reporter)) + app.config["LOADED_PLUGINS"][plugin_name] = plugin_version app.logger.info(f"Loaded plugins: {app.config['LOADED_PLUGINS']}") sentry_sdk.set_context("plugins", app.config.get("LOADED_PLUGINS", {})) From 79fe12fd4926987fd5f48dd006c6011e3eeb34a4 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Fri, 28 Apr 2023 00:18:28 +0200 Subject: [PATCH 16/36] Allowing to use BeliefsDataFrame specific method in the schema. Signed-off-by: Victor Garcia Reolid --- .../data/schemas/reporting/pandas_reporter.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index 4330fd257..c34921b0a 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -2,7 +2,6 @@ from inspect import signature from flexmeasures.data.schemas.reporting import ReporterConfigSchema -from pandas import DataFrame from timely_beliefs import BeliefsDataFrame @@ -21,12 +20,12 @@ def validate_method_call(self, data, **kwargs): method = data["method"] method_callable = getattr( - DataFrame, method, None - ) # what if the object which is applied to is not a DataFrame... + BeliefsDataFrame, method, None + ) # what if the object which is applied to is not a BeliefsDataFrame... if not callable(method_callable): raise ValidationError( - f"method {method} is not a valid Pandas DataFrame method." + f"method {method} is not a valid BeliefsDataFrame method." ) method_signature = signature(method_callable) @@ -35,7 +34,7 @@ def validate_method_call(self, data, **kwargs): args = data.get("args", []).copy() _kwargs = data.get("kwargs", {}).copy() - args.insert(0, DataFrame) + args.insert(0, BeliefsDataFrame) method_signature.bind(*args, **_kwargs) except TypeError: @@ -107,7 +106,7 @@ def validate_chaining(self, data, **kwargs): if not previous_df and not df_input: raise ValidationError("Cannot find the input DataFrame.") - previous_df = df_output # keeping last dataframe calculation + previous_df = df_output # keeping last BeliefsDataFrame calculation fake_data[df_output] = BeliefsDataFrame From 4c86878ecc277dded8bfdfa2cb38bff5193c30f2 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Fri, 28 Apr 2023 12:04:26 +0200 Subject: [PATCH 17/36] Fixed wrong method. TODO: test with a plugin. Signed-off-by: Victor Garcia Reolid --- flexmeasures/utils/plugin_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/utils/plugin_utils.py b/flexmeasures/utils/plugin_utils.py index 2dc35ca57..0f746a2e0 100644 --- a/flexmeasures/utils/plugin_utils.py +++ b/flexmeasures/utils/plugin_utils.py @@ -101,7 +101,7 @@ def register_plugins(app: Flask): # Loading reporters from flexmeasures.data.models.reporting import Reporter - app.reporters.extend(get_classes_module(module, Reporter)) + app.reporters.update(get_classes_module(module, Reporter)) app.config["LOADED_PLUGINS"][plugin_name] = plugin_version app.logger.info(f"Loaded plugins: {app.config['LOADED_PLUGINS']}") From 5ca52ebeee355840a1a7a13cac1e2d51713b45b4 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Fri, 28 Apr 2023 12:07:35 +0200 Subject: [PATCH 18/36] Using module name instead of the module object. Signed-off-by: Victor Garcia Reolid --- flexmeasures/utils/plugin_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/utils/plugin_utils.py b/flexmeasures/utils/plugin_utils.py index 0f746a2e0..2e072046a 100644 --- a/flexmeasures/utils/plugin_utils.py +++ b/flexmeasures/utils/plugin_utils.py @@ -101,7 +101,7 @@ def register_plugins(app: Flask): # Loading reporters from flexmeasures.data.models.reporting import Reporter - app.reporters.update(get_classes_module(module, Reporter)) + app.reporters.update(get_classes_module(module.__name__, Reporter)) app.config["LOADED_PLUGINS"][plugin_name] = plugin_version app.logger.info(f"Loaded plugins: {app.config['LOADED_PLUGINS']}") From 12267d39fcc4fdcee4905b0046f472ab802e46ba Mon Sep 17 00:00:00 2001 From: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Date: Mon, 1 May 2023 09:31:30 +0200 Subject: [PATCH 19/36] use belief_time instead of beliefs_before and beliefs_after (#652) Signed-off-by: F.N. Claessen --- flexmeasures/data/models/reporting/__init__.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 7db2a4898..245b18e7f 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -28,8 +28,7 @@ class Reporter(DataGeneratorMixin): start: datetime = None end: datetime = None input_resolution: timedelta = None - beliefs_after: datetime = None - beliefs_before: datetime = None + belief_time: datetime = None def __init__( self, sensor: Sensor, reporter_config_raw: Optional[dict] = None @@ -61,8 +60,7 @@ def fetch_data(self): event_starts_after = tb_query.pop("event_starts_after", self.start) event_ends_before = tb_query.pop("event_ends_before", self.end) resolution = tb_query.pop("resolution", self.input_resolution) - beliefs_after = tb_query.pop("beliefs_after", self.beliefs_after) - beliefs_before = tb_query.pop("beliefs_before", self.beliefs_before) + belief_time = tb_query.pop("belief_time", self.belief_time) sensor: Sensor = tb_query.pop("sensor", None) alias: str = tb_query.pop("alias", None) @@ -71,8 +69,7 @@ def fetch_data(self): event_starts_after=event_starts_after, event_ends_before=event_ends_before, resolution=resolution, - beliefs_after=beliefs_after, - beliefs_before=beliefs_before, + beliefs_before=belief_time, **tb_query, ) @@ -92,8 +89,7 @@ def compute( start: datetime = None, end: datetime = None, input_resolution: timedelta = None, - beliefs_after: datetime = None, - beliefs_before: datetime = None, + belief_time: datetime = None, **kwargs, ) -> tb.BeliefsDataFrame: """This method triggers the creation of a new report. This method allows to update the fields @@ -117,8 +113,7 @@ def compute( self.start = start self.end = end self.input_resolution = input_resolution - self.beliefs_after = beliefs_after - self.beliefs_before = beliefs_before + self.belief_time = belief_time # fetch data self.fetch_data() From cc477423a6270d6e66becc70b8a374636789ed9e Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 1 May 2023 09:57:38 +0200 Subject: [PATCH 20/36] Fixing example. Signed-off-by: Victor Garcia Reolid --- .../data/schemas/reporting/pandas_reporter.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index c34921b0a..c897903fe 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -51,12 +51,12 @@ class PandasReporterConfigSchema(ReporterConfigSchema): { "input_sensors" : [ - {'sensor' : 1} + {"sensor" : 1, "alias" : "df1"} ], "transformations" : [ { - "df_input" : "df2", - "df_output" : "df1", + "df_input" : "df1", + "df_output" : "df2", "method" : "copy" }, { @@ -68,10 +68,8 @@ class PandasReporterConfigSchema(ReporterConfigSchema): "method" : "sum", "kwargs" : {"axis" : 0} } - - ] - } - + ], + "final_df_output" : "df2" """ transformations = fields.List(fields.Nested(PandasMethodCall()), required=True) From cc11809a1745d24433c328814869d4a1235b051f Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 1 May 2023 09:58:03 +0200 Subject: [PATCH 21/36] Fixing grammar. Signed-off-by: Victor Garcia Reolid --- documentation/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/configuration.rst b/documentation/configuration.rst index b44fff621..bcc23e369 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -269,7 +269,7 @@ Default: ``3600`` FLEXMEASURES_DEFAULT_DATASOURCE ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Data generation classes will have this data source name as its sensor's data source output default. +The default DataSource of the resulting data from `DataGeneration` classes. Default: ``"FlexMeasures"`` From c50df176cf3c82cb41c41bc2950805819e4e7529 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 1 May 2023 12:16:54 +0200 Subject: [PATCH 22/36] Require at least 1 input sensor for the tb_query_config. Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/schemas/reporting/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 59d90aadf..66a17fdc7 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -1,4 +1,4 @@ -from marshmallow import Schema, fields +from marshmallow import Schema, fields, validate from flexmeasures.data.schemas.sensors import SensorIdField from flexmeasures.data.schemas.sources import DataSourceIdField @@ -48,7 +48,9 @@ class ReporterConfigSchema(Schema): """ tb_query_config = fields.List( - fields.Nested(TimeBeliefQueryConfigSchema()), required=True + fields.Nested(TimeBeliefQueryConfigSchema()), + required=True, + validator=validate.Length(min=1), ) start = AwareDateTimeField() end = AwareDateTimeField() From c36b904d5b967c43cfa32464418423a5b1c7f4b2 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Tue, 2 May 2023 12:38:26 +0200 Subject: [PATCH 23/36] Bug fix: compute function was overriding the variables to the default value (None) Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/models/reporting/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 245b18e7f..eb6cd2eca 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -83,6 +83,10 @@ def fetch_data(self): else: self.data[f"sensor_{sensor.id}"] = bdf + def update_attribute(self, attribute, default): + if default is not None: + setattr(self, attribute, default) + def compute( self, *args, @@ -110,10 +114,11 @@ def compute( # deserialize configuration self.deserialize_config() - self.start = start - self.end = end - self.input_resolution = input_resolution - self.belief_time = belief_time + # if provided, update the class attributes + self.update_attribute("start", start) + self.update_attribute("end", end) + self.update_attribute("input_resolution", input_resolution) + self.update_attribute("belief_time", belief_time) # fetch data self.fetch_data() From 3ad41131df105ef39a4b4f86e0e235c897934e08 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Tue, 2 May 2023 12:39:21 +0200 Subject: [PATCH 24/36] Changing end to get 24h and fix assert condition to detect NaN. Signed-off-by: Victor Garcia Reolid --- .../data/models/reporting/tests/test_tibber_reporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 563eabd71..43de9ae8d 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -250,7 +250,7 @@ def test_tibber_reporter(tibber_test_data): result = tibber_reporter.compute( start=datetime(2023, 4, 13, tzinfo=utc), - end=datetime(2023, 4, 13, 23, tzinfo=utc), + end=datetime(2023, 4, 14, tzinfo=utc), ) # check that we got a result for 24 hours @@ -264,4 +264,4 @@ def test_tibber_reporter(tibber_test_data): error = abs(result - tibber_app_price_df) # check that (EPEX + EnergyTax + Tibber Tariff)*(1 + VAT) = Tibber App Price - assert error.sum()[0] == 0 + assert error.sum(min_count=1).event_value == 0 From aba68eaedc6a8433a83e596c668259c0b0266a7f Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Tue, 2 May 2023 15:13:25 +0200 Subject: [PATCH 25/36] Adding belief time variable to schema. Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/schemas/reporting/__init__.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 66a17fdc7..98935f0d9 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -18,8 +18,7 @@ class TimeBeliefQueryConfigSchema(Schema): event_starts_after = AwareDateTimeField() event_ends_before = AwareDateTimeField() - beliefs_after = AwareDateTimeField() - beliefs_before = AwareDateTimeField() + belief_time = AwareDateTimeField() horizons_at_least = DurationField() horizons_at_most = DurationField() @@ -55,5 +54,4 @@ class ReporterConfigSchema(Schema): start = AwareDateTimeField() end = AwareDateTimeField() input_resolution = DurationField() - beliefs_after = AwareDateTimeField() - beliefs_before = AwareDateTimeField() + belief_time = AwareDateTimeField() From cc545e1952633ef80493fc1bcae74203d25b8831 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Tue, 2 May 2023 15:25:47 +0200 Subject: [PATCH 26/36] Avoid deserializing multiple times. Signed-off-by: Victor Garcia Reolid --- .../data/models/reporting/__init__.py | 34 ++++------ .../reporting/tests/test_pandas_reporter.py | 65 +++++++++++++++++-- 2 files changed, 74 insertions(+), 25 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index eb6cd2eca..e04b3dfcb 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -55,22 +55,22 @@ def fetch_data(self): self.data = {} for tb_query in self.tb_query_config: - + _tb_query = tb_query.copy() # using start / end instead of event_starts_after/event_ends_before when not defined - event_starts_after = tb_query.pop("event_starts_after", self.start) - event_ends_before = tb_query.pop("event_ends_before", self.end) - resolution = tb_query.pop("resolution", self.input_resolution) - belief_time = tb_query.pop("belief_time", self.belief_time) + event_starts_after = _tb_query.pop("event_starts_after", self.start) + event_ends_before = _tb_query.pop("event_ends_before", self.end) + resolution = _tb_query.pop("resolution", self.input_resolution) + belief_time = _tb_query.pop("belief_time", self.belief_time) - sensor: Sensor = tb_query.pop("sensor", None) - alias: str = tb_query.pop("alias", None) + sensor: Sensor = _tb_query.pop("sensor", None) + alias: str = _tb_query.pop("alias", None) bdf = sensor.search_beliefs( event_starts_after=event_starts_after, event_ends_before=event_ends_before, resolution=resolution, beliefs_before=belief_time, - **tb_query, + **_tb_query, ) # store data source as local variable @@ -96,23 +96,17 @@ def compute( belief_time: datetime = None, **kwargs, ) -> tb.BeliefsDataFrame: - """This method triggers the creation of a new report. This method allows to update the fields - in reporter_config_raw passing them as keyword arguments or the whole `reporter_config_raw` by - passing it in the kwarg `reporter_config_raw`. + """This method triggers the creation of a new report. - Overall, this method follows these steps: - 1) Updating the reporter_config with the kwargs of the method compute. - 2) Triggers config deserialization. - 3) Fetches the data of the sensors described by the field `tb_query_config`. - 4) If the output is BeliefsDataFrame, it simplifies it into a DataFrame + The same object can generate multiple reports with different start, end, input_resolution + and belief_time values. + In the future, this function will parse arbitrary input arguments defined in a schema. """ - # if report_config in kwargs - if "reporter_config_raw" in kwargs: - self.reporter_config_raw.update(kwargs.get("reporter_config_raw")) # deserialize configuration - self.deserialize_config() + if self.reporter_config is None: + self.deserialize_config() # if provided, update the class attributes self.update_attribute("start", start) diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 978995a4d..bd2ce30e2 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -1,9 +1,9 @@ import pytest - from datetime import datetime, timedelta - from pytz import utc +import pandas as pd + from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType from flexmeasures.data.models.data_sources import DataSource @@ -128,8 +128,63 @@ def test_reporter(setup_dummy_data): report1.sensor == reporter_sensor ) # check that the output sensor is effectively assigned. - report2 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc)) - assert len(report2) == 4 - assert str(report2.index[0]) == "2023-04-10 02:00:00+00:00" + # check that calling compute with different parameters changes the result + report3 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc)) + assert len(report3) == 4 + assert str(report3.index[0]) == "2023-04-10 02:00:00+00:00" # TODO: resample with BeliefDataFrame specific method (resample_event) + + +def test_reporter_repeated(setup_dummy_data): + """check that calling compute doesn't change the result""" + + s1, s2, reporter_sensor = setup_dummy_data + + reporter_config_raw = dict( + tb_query_config=[ + dict( + sensor=s1.id, + event_starts_after="2023-04-10T00:00:00 00:00", + event_ends_before="2023-04-10T10:00:00 00:00", + ), + dict( + sensor=s2.id, + event_starts_after="2023-04-10T00:00:00 00:00", + event_ends_before="2023-04-10T10:00:00 00:00", + ), + ], + transformations=[ + dict( + df_input="sensor_1", + df_output="sensor_1_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_input="sensor_2", + df_output="sensor_2_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_output="df_merge", + df_input="sensor_1_source_1", + method="merge", + args=["@sensor_2_source_1"], + kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")), + ), + dict(method="resample", args=["2h"]), + dict(method="mean"), + dict(method="sum", kwargs=dict(axis=1)), + ], + final_df_output="df_merge", + ) + + reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) + + report1 = reporter.compute() + report2 = reporter.compute() + pd.testing.assert_series_equal(report1, report2) From 9d5dc5775a41ce537938d0d2796ebc5f95077477 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Tue, 2 May 2023 15:37:12 +0200 Subject: [PATCH 27/36] Add scope="module" to avoid recreating objects in DB. Signed-off-by: Victor Garcia Reolid --- .../data/models/reporting/tests/test_pandas_reporter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index bd2ce30e2..a53c60490 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -10,7 +10,7 @@ from flexmeasures.data.models.time_series import Sensor, TimedBelief -@pytest.fixture +@pytest.fixture(scope="module") def setup_dummy_data(db, app): """ From 801e358572d0de44a3f88bb8bb2ec73ea610a65c Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Fri, 5 May 2023 13:36:52 +0200 Subject: [PATCH 28/36] fix: remove time paramters (start, end, ...) from the Reporter class attributes. Signed-off-by: Victor Garcia Reolid --- .../data/models/reporting/__init__.py | 72 ++++++------------- .../data/models/reporting/pandas_reporter.py | 15 ++-- .../reporting/tests/test_pandas_reporter.py | 18 ++--- .../reporting/tests/test_tibber_reporter.py | 2 +- .../data/schemas/reporting/__init__.py | 7 -- .../data/schemas/tests/test_reporting.py | 10 +-- 6 files changed, 50 insertions(+), 74 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index e04b3dfcb..14a419843 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -25,10 +25,6 @@ class Reporter(DataGeneratorMixin): reporter_config_raw: Optional[dict] = None schema = ReporterConfigSchema data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None - start: datetime = None - end: datetime = None - input_resolution: timedelta = None - belief_time: datetime = None def __init__( self, sensor: Sensor, reporter_config_raw: Optional[dict] = None @@ -48,7 +44,13 @@ def __init__( self.reporter_config_raw = reporter_config_raw - def fetch_data(self): + def fetch_data( + self, + start: datetime, + end: datetime, + input_resolution: timedelta = None, + belief_time: datetime = None, + ): """ Fetches the time_beliefs from the database """ @@ -57,10 +59,10 @@ def fetch_data(self): for tb_query in self.tb_query_config: _tb_query = tb_query.copy() # using start / end instead of event_starts_after/event_ends_before when not defined - event_starts_after = _tb_query.pop("event_starts_after", self.start) - event_ends_before = _tb_query.pop("event_ends_before", self.end) - resolution = _tb_query.pop("resolution", self.input_resolution) - belief_time = _tb_query.pop("belief_time", self.belief_time) + event_starts_after = _tb_query.pop("event_starts_after", start) + event_ends_before = _tb_query.pop("event_ends_before", end) + resolution = _tb_query.pop("resolution", input_resolution) + belief_time = _tb_query.pop("belief_time", belief_time) sensor: Sensor = _tb_query.pop("sensor", None) alias: str = _tb_query.pop("alias", None) @@ -89,9 +91,8 @@ def update_attribute(self, attribute, default): def compute( self, - *args, - start: datetime = None, - end: datetime = None, + start: datetime, + end: datetime, input_resolution: timedelta = None, belief_time: datetime = None, **kwargs, @@ -108,17 +109,11 @@ def compute( if self.reporter_config is None: self.deserialize_config() - # if provided, update the class attributes - self.update_attribute("start", start) - self.update_attribute("end", end) - self.update_attribute("input_resolution", input_resolution) - self.update_attribute("belief_time", belief_time) - # fetch data - self.fetch_data() + self.fetch_data(start, end, input_resolution, belief_time) # Result - result = self._compute() + result = self._compute(start, end, input_resolution, belief_time) # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor assert self.sensor.event_resolution == result.event_resolution @@ -128,7 +123,13 @@ def compute( return result - def _compute(self) -> tb.BeliefsDataFrame: + def _compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta = None, + belief_time: datetime = None, + ) -> tb.BeliefsDataFrame: """ Overwrite with the actual computation of your report. @@ -137,31 +138,6 @@ def _compute(self) -> tb.BeliefsDataFrame: raise NotImplementedError() def deserialize_config(self): - """ - Check all configurations we have, throwing either ValidationErrors or ValueErrors. - Other code can decide if/how to handle those. - """ - self.deserialize_reporter_config() - self.deserialize_timing_config() - - def deserialize_timing_config(self): - """ - Check if the timing of the report is valid. - - Raises ValueErrors. - """ - - for tb_query in self.tb_query_config: - start = tb_query.get("event_starts_after", self.start) - end = tb_query.get("event_ends_before ", self.end) - - if ( - start is not None and end is not None - ): # not testing when start or end are missing - if end < start: - raise ValueError(f"Start {start} cannot be after end {end}.") - - def deserialize_reporter_config(self): """ Validate the report config against a Marshmallow Schema. Ideas: @@ -178,7 +154,3 @@ def deserialize_reporter_config(self): self.tb_query_config = self.reporter_config.get( "tb_query_config" ) # extracting TimeBelief query configuration parameters - self.start = self.reporter_config.get("start") - self.end = self.reporter_config.get("end") - self.input_resolution = self.reporter_config.get("input_resolution") - self.belief_time = self.reporter_config.get("belief_time") diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index a8efef2f3..ffda2f666 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import Any +from datetime import datetime, timedelta from flask import current_app import timely_beliefs as tb @@ -20,15 +21,21 @@ class PandasReporter(Reporter): transformations: list[dict[str, Any]] = None final_df_output: str = None - def deserialize_reporter_config(self): - # call super class deserialize_reporter_config - super().deserialize_reporter_config() + def deserialize_config(self): + # call super class deserialize_config + super().deserialize_config() # extract PandasReporter specific fields self.transformations = self.reporter_config.get("transformations") self.final_df_output = self.reporter_config.get("final_df_output") - def _compute(self) -> tb.BeliefsDataFrame: + def _compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta = None, + belief_time: datetime = None, + ) -> tb.BeliefsDataFrame: """ This method applies the transformations and outputs the dataframe defined in `final_df_output` field of the report_config. diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index a53c60490..94987041e 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -1,5 +1,6 @@ import pytest from datetime import datetime, timedelta + from pytz import utc import pandas as pd @@ -86,8 +87,6 @@ def test_reporter(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - start="2023-04-10T00:00:00 00:00", - end="2023-04-10T10:00:00 00:00", tb_query_config=[dict(sensor=s1.id), dict(sensor=s2.id)], transformations=[ dict( @@ -120,7 +119,9 @@ def test_reporter(setup_dummy_data): reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) - report1 = reporter.compute() + start = datetime(2023, 4, 10, tzinfo=utc) + end = datetime(2023, 4, 10, 10, tzinfo=utc) + report1 = reporter.compute(start, end) assert len(report1) == 5 assert str(report1.index[0]) == "2023-04-10 00:00:00+00:00" @@ -129,12 +130,10 @@ def test_reporter(setup_dummy_data): ) # check that the output sensor is effectively assigned. # check that calling compute with different parameters changes the result - report3 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc)) + report3 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end) assert len(report3) == 4 assert str(report3.index[0]) == "2023-04-10 02:00:00+00:00" - # TODO: resample with BeliefDataFrame specific method (resample_event) - def test_reporter_repeated(setup_dummy_data): """check that calling compute doesn't change the result""" @@ -184,7 +183,10 @@ def test_reporter_repeated(setup_dummy_data): ) reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) + start = datetime(2023, 4, 10, tzinfo=utc) + end = datetime(2023, 4, 10, 10, tzinfo=utc) + + report1 = reporter.compute(start=start, end=end) + report2 = reporter.compute(start=start, end=end) - report1 = reporter.compute() - report2 = reporter.compute() pd.testing.assert_series_equal(report1, report2) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 43de9ae8d..13ebb0fab 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -84,7 +84,6 @@ def __init__(self, sensor) -> None: # create the PandasReporter reporter config reporter_config = dict( - input_resolution="PT1H", tb_query_config=[ dict(sensor=EnergyTax.id, alias="energy_tax_df"), dict(sensor=VAT.id), @@ -251,6 +250,7 @@ def test_tibber_reporter(tibber_test_data): result = tibber_reporter.compute( start=datetime(2023, 4, 13, tzinfo=utc), end=datetime(2023, 4, 14, tzinfo=utc), + input_resolution=timedelta(hours=1), ) # check that we got a result for 24 hours diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 98935f0d9..67df93372 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -41,9 +41,6 @@ class ReporterConfigSchema(Schema): """ This schema is used to validate Reporter class configurations (reporter_config). Inherit from this to extend this schema with your own parameters. - - If the fields event_starts_after or event_ends_before are not present in `tb_query_config` - they will look up in the fields `start` and `end` """ tb_query_config = fields.List( @@ -51,7 +48,3 @@ class ReporterConfigSchema(Schema): required=True, validator=validate.Length(min=1), ) - start = AwareDateTimeField() - end = AwareDateTimeField() - input_resolution = DurationField() - belief_time = AwareDateTimeField() diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 8e5000294..129c70579 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -36,7 +36,7 @@ def setup_dummy_sensors(db, app): @pytest.mark.parametrize( - "report_config, is_valid", + "reporter_config, is_valid", [ ( { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline @@ -125,12 +125,14 @@ def setup_dummy_sensors(db, app): ), ], ) -def test_pandas_reporter(report_config, is_valid, db, app, setup_dummy_sensors): +def test_pandas_reporter_schema( + reporter_config, is_valid, db, app, setup_dummy_sensors +): schema = PandasReporterConfigSchema() if is_valid: - schema.load(report_config) + schema.load(reporter_config) else: with pytest.raises(ValidationError): - schema.load(report_config) + schema.load(reporter_config) From 99c47b418177fd5f7f4856f46b9b650e22e7d36e Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 08:45:45 +0200 Subject: [PATCH 29/36] style: type hints improvements Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/models/reporting/__init__.py | 5 +++-- flexmeasures/data/models/reporting/pandas_reporter.py | 4 ++-- flexmeasures/data/schemas/reporting/__init__.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 14a419843..04a328ff5 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -1,3 +1,4 @@ +from __future__ import annotations from typing import Optional, Union, Dict import pandas as pd @@ -93,8 +94,8 @@ def compute( self, start: datetime, end: datetime, - input_resolution: timedelta = None, - belief_time: datetime = None, + input_resolution: timedelta | None = None, + belief_time: datetime | None = None, **kwargs, ) -> tb.BeliefsDataFrame: """This method triggers the creation of a new report. diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index ffda2f666..7c82fa0f7 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -33,8 +33,8 @@ def _compute( self, start: datetime, end: datetime, - input_resolution: timedelta = None, - belief_time: datetime = None, + input_resolution: timedelta | None = None, + belief_time: datetime | None = None, ) -> tb.BeliefsDataFrame: """ This method applies the transformations and outputs the dataframe diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 67df93372..44204692d 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -9,7 +9,7 @@ class TimeBeliefQueryConfigSchema(Schema): """ This schema implements the required fields to perform a TimeBeliefs search - using the method flexmeasures.data.models.time_series:TimedBelief.search + using the method flexmeasures.data.models.time_series:Sensor.search_beliefs """ sensor = SensorIdField(required=True) From 83a776f7df68890ef3d5840b055079d95dc527df Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 09:08:03 +0200 Subject: [PATCH 30/36] style: rename tb_query_config for beliefs_search_config_schema Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/models/reporting/__init__.py | 6 +++--- .../data/models/reporting/tests/test_pandas_reporter.py | 4 ++-- .../data/models/reporting/tests/test_tibber_reporter.py | 2 +- flexmeasures/data/schemas/reporting/__init__.py | 6 +++--- flexmeasures/data/schemas/reporting/pandas_reporter.py | 2 +- flexmeasures/data/schemas/tests/test_reporting.py | 8 ++++---- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 04a328ff5..a9c5ba82d 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -57,7 +57,7 @@ def fetch_data( """ self.data = {} - for tb_query in self.tb_query_config: + for tb_query in self.beliefs_search_config_schema: _tb_query = tb_query.copy() # using start / end instead of event_starts_after/event_ends_before when not defined event_starts_after = _tb_query.pop("event_starts_after", start) @@ -152,6 +152,6 @@ def deserialize_config(self): self.reporter_config = self.schema.load( self.reporter_config_raw ) # validate reporter config - self.tb_query_config = self.reporter_config.get( - "tb_query_config" + self.beliefs_search_config_schema = self.reporter_config.get( + "beliefs_search_config_schema" ) # extracting TimeBelief query configuration parameters diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 94987041e..4af4a314c 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -87,7 +87,7 @@ def test_reporter(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - tb_query_config=[dict(sensor=s1.id), dict(sensor=s2.id)], + beliefs_search_config_schema=[dict(sensor=s1.id), dict(sensor=s2.id)], transformations=[ dict( df_input="sensor_1", @@ -141,7 +141,7 @@ def test_reporter_repeated(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - tb_query_config=[ + beliefs_search_config_schema=[ dict( sensor=s1.id, event_starts_after="2023-04-10T00:00:00 00:00", diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 13ebb0fab..346c52972 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -84,7 +84,7 @@ def __init__(self, sensor) -> None: # create the PandasReporter reporter config reporter_config = dict( - tb_query_config=[ + beliefs_search_config_schema=[ dict(sensor=EnergyTax.id, alias="energy_tax_df"), dict(sensor=VAT.id), dict(sensor=tibber_tariff.id), diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 44204692d..a6cc067d7 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -6,7 +6,7 @@ from flexmeasures.data.schemas import AwareDateTimeField, DurationField -class TimeBeliefQueryConfigSchema(Schema): +class BeliefsSearchConfigSchema(Schema): """ This schema implements the required fields to perform a TimeBeliefs search using the method flexmeasures.data.models.time_series:Sensor.search_beliefs @@ -43,8 +43,8 @@ class ReporterConfigSchema(Schema): Inherit from this to extend this schema with your own parameters. """ - tb_query_config = fields.List( - fields.Nested(TimeBeliefQueryConfigSchema()), + beliefs_search_config_schema = fields.List( + fields.Nested(BeliefsSearchConfigSchema()), required=True, validator=validate.Length(min=1), ) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index c897903fe..d04db0796 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -86,7 +86,7 @@ def validate_chaining(self, data, **kwargs): # loading the initial data, the sensors' data fake_data = dict( (f"sensor_{s['sensor'].id}", BeliefsDataFrame) - for s in data.get("tb_query_config") + for s in data.get("beliefs_search_config_schema") ) final_df_output = data.get("final_df_output") diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 129c70579..81730a18a 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -40,7 +40,7 @@ def setup_dummy_sensors(db, app): [ ( { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline - "tb_query_config": [ + "beliefs_search_config_schema": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -60,7 +60,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that chaining works, applying the method copy on the previous dataframe - "tb_query_config": [ + "beliefs_search_config_schema": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -78,7 +78,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that resample cannot be the last method being applied - "tb_query_config": [ + "beliefs_search_config_schema": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -101,7 +101,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that resample cannot be the last method being applied - "tb_query_config": [ + "beliefs_search_config_schema": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", From 97f3b15c1912b026ea5672e9faef8b5109f0a3ed Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 09:08:47 +0200 Subject: [PATCH 31/36] style: remove comment Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/schemas/reporting/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index a6cc067d7..1010cb2d3 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -24,7 +24,6 @@ class BeliefsSearchConfigSchema(Schema): horizons_at_most = DurationField() source = DataSourceIdField() - # user_source_ids: Optional[Union[int, List[int]]] = None, source_types = fields.List(fields.Str()) exclude_source_types = fields.List(fields.Str()) From 912aa4e5b5e59133bfe8f49b8f1ce7a5b2aaf63d Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 13:14:01 +0200 Subject: [PATCH 32/36] docs: add entry to changelog Signed-off-by: Victor Garcia Reolid --- documentation/changelog.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 9f3c915eb..44fe81318 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -9,6 +9,9 @@ v0.14.0 | June XX, 2023 New features ------------- +* Introduction of the classes `Reporter` and `PandasReporter` [see `PR #641 `_] + + Bugfixes ----------- From 9d8c737b65da17d34b70c39c5e5f4c0c981284f9 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 16:07:07 +0200 Subject: [PATCH 33/36] style: clarifying attribute Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/models/reporting/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index a9c5ba82d..8aa72028a 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -35,7 +35,7 @@ def __init__( Attributes: :param sensor: sensor where the output of the reporter will be saved to. - :param reporter_config_raw: unserialized configuration of the reporter. + :param reporter_config_raw: dictionary with the serialized configuration of the reporter. """ self.sensor = sensor From 1383b4a275d1e130556184bdb6dedadcd9921ab7 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 16:08:00 +0200 Subject: [PATCH 34/36] style: fix docstring Signed-off-by: Victor Garcia Reolid --- .../data/models/reporting/tests/test_pandas_reporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 4af4a314c..a10cb9a61 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -13,10 +13,10 @@ @pytest.fixture(scope="module") def setup_dummy_data(db, app): - """ - Create Sensors 2, 1 Asset and 1 AssetType + Create 2 Sensors, 1 Asset and 1 AssetType """ + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") report_asset_type = GenericAssetType(name="ReportAssetType") From 08020f6e713e59bb30af6fa667bcea592fdd45fb Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 16:09:28 +0200 Subject: [PATCH 35/36] refactor: rename beliefs_search_config_schema beliefs_search_configs Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/models/reporting/__init__.py | 6 +++--- .../data/models/reporting/tests/test_pandas_reporter.py | 4 ++-- .../data/models/reporting/tests/test_tibber_reporter.py | 2 +- flexmeasures/data/schemas/reporting/__init__.py | 2 +- flexmeasures/data/schemas/reporting/pandas_reporter.py | 2 +- flexmeasures/data/schemas/tests/test_reporting.py | 8 ++++---- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 8aa72028a..b28c322bc 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -57,7 +57,7 @@ def fetch_data( """ self.data = {} - for tb_query in self.beliefs_search_config_schema: + for tb_query in self.beliefs_search_configs: _tb_query = tb_query.copy() # using start / end instead of event_starts_after/event_ends_before when not defined event_starts_after = _tb_query.pop("event_starts_after", start) @@ -152,6 +152,6 @@ def deserialize_config(self): self.reporter_config = self.schema.load( self.reporter_config_raw ) # validate reporter config - self.beliefs_search_config_schema = self.reporter_config.get( - "beliefs_search_config_schema" + self.beliefs_search_configs = self.reporter_config.get( + "beliefs_search_configs" ) # extracting TimeBelief query configuration parameters diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index a10cb9a61..2b99263fe 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -87,7 +87,7 @@ def test_reporter(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - beliefs_search_config_schema=[dict(sensor=s1.id), dict(sensor=s2.id)], + beliefs_search_configs=[dict(sensor=s1.id), dict(sensor=s2.id)], transformations=[ dict( df_input="sensor_1", @@ -141,7 +141,7 @@ def test_reporter_repeated(setup_dummy_data): s1, s2, reporter_sensor = setup_dummy_data reporter_config_raw = dict( - beliefs_search_config_schema=[ + beliefs_search_configs=[ dict( sensor=s1.id, event_starts_after="2023-04-10T00:00:00 00:00", diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 346c52972..432a197ca 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -84,7 +84,7 @@ def __init__(self, sensor) -> None: # create the PandasReporter reporter config reporter_config = dict( - beliefs_search_config_schema=[ + beliefs_search_configs=[ dict(sensor=EnergyTax.id, alias="energy_tax_df"), dict(sensor=VAT.id), dict(sensor=tibber_tariff.id), diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 1010cb2d3..af7e4e918 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -42,7 +42,7 @@ class ReporterConfigSchema(Schema): Inherit from this to extend this schema with your own parameters. """ - beliefs_search_config_schema = fields.List( + beliefs_search_configs = fields.List( fields.Nested(BeliefsSearchConfigSchema()), required=True, validator=validate.Length(min=1), diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index d04db0796..3b076ddb2 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -86,7 +86,7 @@ def validate_chaining(self, data, **kwargs): # loading the initial data, the sensors' data fake_data = dict( (f"sensor_{s['sensor'].id}", BeliefsDataFrame) - for s in data.get("beliefs_search_config_schema") + for s in data.get("beliefs_search_configs") ) final_df_output = data.get("final_df_output") diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 81730a18a..cb5490052 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -40,7 +40,7 @@ def setup_dummy_sensors(db, app): [ ( { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline - "beliefs_search_config_schema": [ + "beliefs_search_configs": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -60,7 +60,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that chaining works, applying the method copy on the previous dataframe - "beliefs_search_config_schema": [ + "beliefs_search_configs": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -78,7 +78,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that resample cannot be the last method being applied - "beliefs_search_config_schema": [ + "beliefs_search_configs": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", @@ -101,7 +101,7 @@ def setup_dummy_sensors(db, app): ), ( { # this checks that resample cannot be the last method being applied - "beliefs_search_config_schema": [ + "beliefs_search_configs": [ { "sensor": 1, "event_starts_after": "2022-01-01T00:00:00 00:00", From 96c03b369513b978bcdcc6c9977f2c3617741795 Mon Sep 17 00:00:00 2001 From: Victor Garcia Reolid Date: Mon, 8 May 2023 16:17:52 +0200 Subject: [PATCH 36/36] style: typo Signed-off-by: Victor Garcia Reolid --- flexmeasures/data/schemas/reporting/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index af7e4e918..7ceaa7152 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -8,7 +8,7 @@ class BeliefsSearchConfigSchema(Schema): """ - This schema implements the required fields to perform a TimeBeliefs search + This schema implements the required fields to perform a TimedBeliefs search using the method flexmeasures.data.models.time_series:Sensor.search_beliefs """