diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 806621ae6..c4bbe0d96 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -9,6 +9,9 @@ v0.14.0 | June XX, 2023 New features ------------- +* Introduction of the classes `Reporter` and `PandasReporter` [see `PR #641 `_] + + Bugfixes ----------- diff --git a/documentation/configuration.rst b/documentation/configuration.rst index d6980a481..825e3f738 100644 --- a/documentation/configuration.rst +++ b/documentation/configuration.rst @@ -268,6 +268,15 @@ Set a negative value to persist forever. Default: ``3600`` +.. _datasource_config: + +FLEXMEASURES_DEFAULT_DATASOURCE +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The default DataSource of the resulting data from `DataGeneration` classes. + +Default: ``"FlexMeasures"`` + .. _planning_horizon_config: diff --git a/flexmeasures/app.py b/flexmeasures/app.py index 06d463fd0..70f1c89cd 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -99,6 +99,12 @@ def create( # noqa C901 register_db_at(app) + # Register Reporters + from flexmeasures.utils.coding_utils import get_classes_module + from flexmeasures.data.models.reporting import Reporter + + app.reporters = get_classes_module("flexmeasures.data.models.reporting", Reporter) + # add auth policy from flexmeasures.auth import register_at as register_auth_at diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index c53545979..a2b349fe1 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -5,11 +5,27 @@ import timely_beliefs as tb from flexmeasures.data import db +from flask import current_app if TYPE_CHECKING: from flexmeasures.data.models.user import User +class DataGeneratorMixin: + @classmethod + def get_data_source_info(cls: type) -> dict: + """ + Create and return the data source info, from which a data source lookup/creation is possible. + + See for instance get_data_source_for_job(). + """ + source_info = dict( + name=current_app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") + ) # default + + return source_info + + class DataSource(db.Model, tb.BeliefSourceDBMixin): """Each data source is a data-providing entity.""" diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py new file mode 100644 index 000000000..b28c322bc --- /dev/null +++ b/flexmeasures/data/models/reporting/__init__.py @@ -0,0 +1,157 @@ +from __future__ import annotations +from typing import Optional, Union, Dict + +import pandas as pd + +from flexmeasures.data.schemas.reporting import ReporterConfigSchema +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.data_sources import DataGeneratorMixin + + +from datetime import datetime, timedelta + +import timely_beliefs as tb + + +class Reporter(DataGeneratorMixin): + """Superclass for all FlexMeasures Reporters.""" + + __version__ = None + __author__ = None + __data_generator_base__ = "Reporter" + + sensor: Sensor = None + + reporter_config: Optional[dict] = None + reporter_config_raw: Optional[dict] = None + schema = ReporterConfigSchema + data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None + + def __init__( + self, sensor: Sensor, reporter_config_raw: Optional[dict] = None + ) -> None: + """ + Initialize a new Reporter. + + Attributes: + :param sensor: sensor where the output of the reporter will be saved to. + :param reporter_config_raw: dictionary with the serialized configuration of the reporter. + """ + + self.sensor = sensor + + if not reporter_config_raw: + reporter_config_raw = {} + + self.reporter_config_raw = reporter_config_raw + + def fetch_data( + self, + start: datetime, + end: datetime, + input_resolution: timedelta = None, + belief_time: datetime = None, + ): + """ + Fetches the time_beliefs from the database + """ + + self.data = {} + for tb_query in self.beliefs_search_configs: + _tb_query = tb_query.copy() + # using start / end instead of event_starts_after/event_ends_before when not defined + event_starts_after = _tb_query.pop("event_starts_after", start) + event_ends_before = _tb_query.pop("event_ends_before", end) + resolution = _tb_query.pop("resolution", input_resolution) + belief_time = _tb_query.pop("belief_time", belief_time) + + sensor: Sensor = _tb_query.pop("sensor", None) + alias: str = _tb_query.pop("alias", None) + + bdf = sensor.search_beliefs( + event_starts_after=event_starts_after, + event_ends_before=event_ends_before, + resolution=resolution, + beliefs_before=belief_time, + **_tb_query, + ) + + # store data source as local variable + for source in bdf.sources.unique(): + self.data[f"source_{source.id}"] = source + + # store BeliefsDataFrame as local variable + if alias: + self.data[alias] = bdf + else: + self.data[f"sensor_{sensor.id}"] = bdf + + def update_attribute(self, attribute, default): + if default is not None: + setattr(self, attribute, default) + + def compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta | None = None, + belief_time: datetime | None = None, + **kwargs, + ) -> tb.BeliefsDataFrame: + """This method triggers the creation of a new report. + + The same object can generate multiple reports with different start, end, input_resolution + and belief_time values. + + In the future, this function will parse arbitrary input arguments defined in a schema. + """ + + # deserialize configuration + if self.reporter_config is None: + self.deserialize_config() + + # fetch data + self.fetch_data(start, end, input_resolution, belief_time) + + # Result + result = self._compute(start, end, input_resolution, belief_time) + + # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor + assert self.sensor.event_resolution == result.event_resolution + + # Assign sensor to BeliefDataFrame + result.sensor = self.sensor + + return result + + def _compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta = None, + belief_time: datetime = None, + ) -> tb.BeliefsDataFrame: + """ + Overwrite with the actual computation of your report. + + :returns BeliefsDataFrame: report as a BeliefsDataFrame. + """ + raise NotImplementedError() + + def deserialize_config(self): + """ + Validate the report config against a Marshmallow Schema. + Ideas: + - Override this method + - Call superclass method to apply validation and common variables deserialization (see PandasReporter) + - (Partially) extract the relevant reporter_config parameters into class attributes. + + Raises ValidationErrors or ValueErrors. + """ + + self.reporter_config = self.schema.load( + self.reporter_config_raw + ) # validate reporter config + self.beliefs_search_configs = self.reporter_config.get( + "beliefs_search_configs" + ) # extracting TimeBelief query configuration parameters diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py new file mode 100644 index 000000000..7c82fa0f7 --- /dev/null +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from typing import Any +from datetime import datetime, timedelta + +from flask import current_app +import timely_beliefs as tb + +from flexmeasures.data.models.reporting import Reporter +from flexmeasures.data.schemas.reporting.pandas_reporter import ( + PandasReporterConfigSchema, +) + + +class PandasReporter(Reporter): + """This reporter applies a series of pandas methods on""" + + __version__ = "1" + __author__ = None + schema = PandasReporterConfigSchema() + transformations: list[dict[str, Any]] = None + final_df_output: str = None + + def deserialize_config(self): + # call super class deserialize_config + super().deserialize_config() + + # extract PandasReporter specific fields + self.transformations = self.reporter_config.get("transformations") + self.final_df_output = self.reporter_config.get("final_df_output") + + def _compute( + self, + start: datetime, + end: datetime, + input_resolution: timedelta | None = None, + belief_time: datetime | None = None, + ) -> tb.BeliefsDataFrame: + """ + This method applies the transformations and outputs the dataframe + defined in `final_df_output` field of the report_config. + """ + + # apply pandas transformations to the dataframes in `self.data` + self._apply_transformations() + + final_output = self.data[self.final_df_output] + + return final_output + + def get_object_or_literal(self, value: Any, method: str) -> Any: + """This method allows using the dataframes as inputs of the Pandas methods that + are run in the transformations. Make sure that they have been created before accessed. + + This works by putting the symbol `@` in front of the name of the dataframe that we want to reference. + For instance, to reference the dataframe test_df, which lives in self.data, we would do `@test_df`. + + This functionality is disabled for methods `eval`and `query` to avoid interfering their internal behaviour + given that they also use `@` to allow using local variables. + + Example: + >>> self.get_object_or_literal(["@df_wind", "@df_solar"], "sum") + [, ] + """ + + if method in ["eval", "query"]: + if isinstance(value, str) and value.startswith("@"): + current_app.logger.debug( + "Cannot reference objects in self.data using the method eval or query. That is because these methods use the symbol `@` to make reference to local variables." + ) + return value + + if isinstance(value, str) and value.startswith("@"): + value = value.replace("@", "") + return self.data[value] + + if isinstance(value, list): + return [self.get_object_or_literal(v, method) for v in value] + + return value + + def _process_pandas_args(self, args: list, method: str) -> list: + """This method applies the function get_object_or_literal to all the arguments + to detect where to replace a string "@" with the actual object stored in `self.data[""]`. + """ + for i in range(len(args)): + args[i] = self.get_object_or_literal(args[i], method) + return args + + def _process_pandas_kwargs(self, kwargs: dict, method: str) -> dict: + """This method applies the function get_object_or_literal to all the keyword arguments + to detect where to replace a string "@" with the actual object stored in `self.data[""]`. + """ + for k, v in kwargs.items(): + kwargs[k] = self.get_object_or_literal(v, method) + return kwargs + + def _apply_transformations(self): + """Convert the series using the given list of transformation specs, which is called in the order given. + + Each transformation specs should include a 'method' key specifying a method name of a Pandas DataFrame. + + Optionally, 'args' and 'kwargs' keys can be specified to pass on arguments or keyword arguments to the given method. + + All data exchange is made through the dictionary `self.data`. The superclass Reporter already fetches BeliefsDataFrames of + the sensors and saves them in the self.data dictionary fields `sensor_`. In case you need to perform complex operations on dataframes, you can + split the operations in several steps and saving the intermediate results using the parameters `df_input` and `df_output` for the + input and output dataframes, respectively. + + Example: + + The example below converts from hourly meter readings in kWh to electricity demand in kW. + transformations = [ + {"method": "diff"}, + {"method": "shift", "kwargs": {"periods": -1}}, + {"method": "head", "args": [-1]}, + ], + """ + + previous_df = None + + for transformation in self.transformations: + df_input = transformation.get( + "df_input", previous_df + ) # default is using the previous transformation output + df_output = transformation.get( + "df_output", df_input + ) # default is OUTPUT = INPUT.method() + + method = transformation.get("method") + args = self._process_pandas_args(transformation.get("args", []), method) + kwargs = self._process_pandas_kwargs( + transformation.get("kwargs", {}), method + ) + + self.data[df_output] = getattr(self.data[df_input], method)(*args, **kwargs) + + previous_df = df_output diff --git a/flexmeasures/data/models/reporting/tests/__init__.py b/flexmeasures/data/models/reporting/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py new file mode 100644 index 000000000..2b99263fe --- /dev/null +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -0,0 +1,192 @@ +import pytest +from datetime import datetime, timedelta + +from pytz import utc + +import pandas as pd + +from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter +from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.time_series import Sensor, TimedBelief + + +@pytest.fixture(scope="module") +def setup_dummy_data(db, app): + """ + Create 2 Sensors, 1 Asset and 1 AssetType + """ + + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") + report_asset_type = GenericAssetType(name="ReportAssetType") + + db.session.add_all([dummy_asset_type, report_asset_type]) + + dummy_asset = GenericAsset( + name="DummyGenericAsset", generic_asset_type=dummy_asset_type + ) + + pandas_report = GenericAsset( + name="PandasReport", generic_asset_type=report_asset_type + ) + + db.session.add_all([dummy_asset, pandas_report]) + + sensor1 = Sensor("sensor 1", generic_asset=dummy_asset, event_resolution="1h") + db.session.add(sensor1) + sensor2 = Sensor("sensor 2", generic_asset=dummy_asset, event_resolution="1h") + db.session.add(sensor2) + report_sensor = Sensor( + "report sensor", generic_asset=pandas_report, event_resolution="1h" + ) + db.session.add(report_sensor) + + """ + Create 2 DataSources + """ + source1 = DataSource("source1") + source2 = DataSource("source2") + + """ + Create TimedBeliefs + """ + beliefs = [] + for sensor in [sensor1, sensor2]: + for si, source in enumerate([source1, source2]): + for t in range(10): + print(si) + beliefs.append( + TimedBelief( + event_start=datetime(2023, 4, 10, tzinfo=utc) + + timedelta(hours=t + si), + belief_horizon=timedelta(hours=24), + event_value=t, + sensor=sensor, + source=source, + ) + ) + + db.session.add_all(beliefs) + db.session.commit() + + yield sensor1, sensor2, report_sensor + + db.session.delete(sensor1) + db.session.delete(sensor2) + + for b in beliefs: + db.session.delete(b) + + db.session.delete(dummy_asset) + db.session.delete(dummy_asset_type) + + db.session.commit() + + +def test_reporter(setup_dummy_data): + s1, s2, reporter_sensor = setup_dummy_data + + reporter_config_raw = dict( + beliefs_search_configs=[dict(sensor=s1.id), dict(sensor=s2.id)], + transformations=[ + dict( + df_input="sensor_1", + df_output="sensor_1_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_input="sensor_2", + df_output="sensor_2_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_output="df_merge", + df_input="sensor_1_source_1", + method="merge", + args=["@sensor_2_source_1"], + kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")), + ), + dict(method="resample", args=["2h"]), + dict(method="mean"), + dict(method="sum", kwargs=dict(axis=1)), + ], + final_df_output="df_merge", + ) + + reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) + + start = datetime(2023, 4, 10, tzinfo=utc) + end = datetime(2023, 4, 10, 10, tzinfo=utc) + report1 = reporter.compute(start, end) + + assert len(report1) == 5 + assert str(report1.index[0]) == "2023-04-10 00:00:00+00:00" + assert ( + report1.sensor == reporter_sensor + ) # check that the output sensor is effectively assigned. + + # check that calling compute with different parameters changes the result + report3 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end) + assert len(report3) == 4 + assert str(report3.index[0]) == "2023-04-10 02:00:00+00:00" + + +def test_reporter_repeated(setup_dummy_data): + """check that calling compute doesn't change the result""" + + s1, s2, reporter_sensor = setup_dummy_data + + reporter_config_raw = dict( + beliefs_search_configs=[ + dict( + sensor=s1.id, + event_starts_after="2023-04-10T00:00:00 00:00", + event_ends_before="2023-04-10T10:00:00 00:00", + ), + dict( + sensor=s2.id, + event_starts_after="2023-04-10T00:00:00 00:00", + event_ends_before="2023-04-10T10:00:00 00:00", + ), + ], + transformations=[ + dict( + df_input="sensor_1", + df_output="sensor_1_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_input="sensor_2", + df_output="sensor_2_source_1", + method="xs", + args=["@source_1"], + kwargs=dict(level=2), + ), + dict( + df_output="df_merge", + df_input="sensor_1_source_1", + method="merge", + args=["@sensor_2_source_1"], + kwargs=dict(on="event_start", suffixes=("_sensor1", "_sensor2")), + ), + dict(method="resample", args=["2h"]), + dict(method="mean"), + dict(method="sum", kwargs=dict(axis=1)), + ], + final_df_output="df_merge", + ) + + reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) + start = datetime(2023, 4, 10, tzinfo=utc) + end = datetime(2023, 4, 10, 10, tzinfo=utc) + + report1 = reporter.compute(start=start, end=end) + report2 = reporter.compute(start=start, end=end) + + pd.testing.assert_series_equal(report1, report2) diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py new file mode 100644 index 000000000..432a197ca --- /dev/null +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -0,0 +1,267 @@ +import pytest + +from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter +from flexmeasures.data.models.time_series import Sensor, DataSource, TimedBelief +from flexmeasures.data.models.generic_assets import GenericAssetType, GenericAsset + +import pandas as pd +from datetime import datetime, timedelta +from pytz import utc + +index = pd.date_range( + datetime(2023, 4, 13), datetime(2023, 4, 13, 23), freq="H", tz=utc +) + +entsoe_prices = [ + 97.23, + 85.09, + 79.49, + 72.86, + 71.12, + 82.50, + 102.06, + 115.04, + 122.15, + 105.39, + 83.40, + 34.90, + -4.50, + -50.00, + -50.07, + -50.00, + -0.90, + 108.10, + 128.10, + 151.00, + 155.20, + 152.00, + 134.04, + 120.10, +] # EUR / MWh + +tibber_app_price = [ + 29.2, + 27.7, + 27.0, + 26.2, + 26.0, + 27.4, + 29.8, + 31.3, + 32.2, + 30.2, + 27.5, + 21.6, # originally 21.7 due to a rounding error from 216.4569 EUR/MWh (probably via 216.5) to 217 EUR/MWh + 16.9, + 11.4, + 11.4, + 11.4, + 17.3, + 30.5, + 32.9, + 35.7, + 36.2, + 35.8, + 33.6, + 32.0, +] # cents/kWh + + +class TibberReporter(PandasReporter): + def __init__(self, sensor) -> None: + """This class calculates the price of energy of a tariff indexed to the Day Ahead prices. + Energy Price = (1 + VAT) x ( EnergyTax + Tiber + DA Prices) + """ + + # search the sensors + EnergyTax = Sensor.query.filter(Sensor.name == "EnergyTax").one_or_none() + VAT = Sensor.query.filter(Sensor.name == "VAT").one_or_none() + tibber_tariff = Sensor.query.filter( + Sensor.name == "Tibber Tariff" + ).one_or_none() + + da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() + + # create the PandasReporter reporter config + reporter_config = dict( + beliefs_search_configs=[ + dict(sensor=EnergyTax.id, alias="energy_tax_df"), + dict(sensor=VAT.id), + dict(sensor=tibber_tariff.id), + dict(sensor=da_prices.id), + ], + transformations=[ + dict( + df_input="sensor_1", + df_output="VAT", + method="droplevel", + args=[[1, 2, 3]], + ), + dict(method="add", args=[1]), # this is to get 1 + VAT + dict( + df_input="energy_tax_df", + df_output="EnergyTax", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="sensor_3", + df_output="tibber_tariff", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="sensor_4", + df_output="da_prices", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + method="add", args=["@tibber_tariff"] + ), # da_prices = da_prices + tibber_tariff + dict( + method="add", args=["@EnergyTax"] + ), # da_prices = da_prices + EnergyTax + dict( + method="multiply", args=["@VAT"] + ), # da_prices = da_price * VAT, VAT + dict(method="round"), + ], + final_df_output="da_prices", + ) + + super().__init__(sensor, reporter_config) + + +def beliefs_from_timeseries(index, values, sensor, source): + beliefs = [] + for dt, value in zip(index, values): + beliefs.append( + TimedBelief( + event_start=dt, + belief_horizon=timedelta(hours=24), + event_value=value, + sensor=sensor, + source=source, + ) + ) + + return beliefs + + +@pytest.fixture() +def tibber_test_data(fresh_db, app): + db = fresh_db + + tax = GenericAssetType(name="Tax") + price = GenericAssetType(name="Price") + report = GenericAssetType(name="Report") + + db.session.add_all([tax, price]) + + # Taxes + + electricity_price = GenericAsset(name="Electricity Price", generic_asset_type=price) + + VAT_asset = GenericAsset(name="VAT", generic_asset_type=tax) + + electricity_tax = GenericAsset(name="Energy Tax", generic_asset_type=tax) + + tibber_report = GenericAsset(name="TibberReport", generic_asset_type=report) + + db.session.add_all([electricity_price, VAT_asset, electricity_tax, tibber_report]) + + # Taxes + VAT = Sensor( + "VAT", + generic_asset=VAT_asset, + event_resolution=timedelta(days=365), + unit="", + ) + EnergyTax = Sensor( + "EnergyTax", + generic_asset=electricity_tax, + event_resolution=timedelta(days=365), + unit="EUR/MWh", + ) + + # Tibber Tariff + tibber_tariff = Sensor( + "Tibber Tariff", + generic_asset=electricity_price, + event_resolution=timedelta(days=365), + unit="EUR/MWh", + ) + + db.session.add_all([VAT, EnergyTax, tibber_tariff]) + + """ + Saving TimeBeliefs to the DB + """ + + # Add EnergyTax, VAT and Tibber Tariff beliefs to the DB + for sensor, source_name, value in [ + (VAT, "Tax Authority", 0.21), + (EnergyTax, "Tax Authority", 125.99), # EUR / MWh + (tibber_tariff, "Tibber", 18.0), # EUR /MWh + ]: + belief = TimedBelief( + sensor=sensor, + source=DataSource(source_name), + event_value=value, + event_start=datetime(2023, 1, 1, tzinfo=utc), + belief_time=datetime(2023, 1, 1, tzinfo=utc), + ) + + db.session.add(belief) + + # DA Prices + entsoe = DataSource("ENTSOE") + da_prices = Sensor( + "DA prices", + generic_asset=electricity_price, + event_resolution=timedelta(hours=1), + ) + db.session.add(da_prices) + da_prices_beliefs = beliefs_from_timeseries(index, entsoe_prices, da_prices, entsoe) + db.session.add_all(da_prices_beliefs) + + tibber_report_sensor = Sensor( + "TibberReportSensor", + generic_asset=tibber_report, + event_resolution=timedelta(hours=1), + unit="EUR/MWh", + ) + db.session.add(tibber_report_sensor) + + return tibber_report_sensor + + +def test_tibber_reporter(tibber_test_data): + """ + This test checks if the calculation of the energy prices gets close enough to the ones + displayed in Tibber's App. + """ + + tibber_report_sensor = tibber_test_data + + tibber_reporter = TibberReporter(tibber_report_sensor) + + result = tibber_reporter.compute( + start=datetime(2023, 4, 13, tzinfo=utc), + end=datetime(2023, 4, 14, tzinfo=utc), + input_resolution=timedelta(hours=1), + ) + + # check that we got a result for 24 hours + assert len(result) == 24 + + tibber_app_price_df = ( + pd.DataFrame(tibber_app_price, index=index, columns=["event_value"]) + * 10 # convert cents/kWh to EUR/MWh + ) + + error = abs(result - tibber_app_price_df) + + # check that (EPEX + EnergyTax + Tibber Tariff)*(1 + VAT) = Tibber App Price + assert error.sum(min_count=1).event_value == 0 diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py new file mode 100644 index 000000000..7ceaa7152 --- /dev/null +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -0,0 +1,49 @@ +from marshmallow import Schema, fields, validate + +from flexmeasures.data.schemas.sensors import SensorIdField +from flexmeasures.data.schemas.sources import DataSourceIdField + +from flexmeasures.data.schemas import AwareDateTimeField, DurationField + + +class BeliefsSearchConfigSchema(Schema): + """ + This schema implements the required fields to perform a TimedBeliefs search + using the method flexmeasures.data.models.time_series:Sensor.search_beliefs + """ + + sensor = SensorIdField(required=True) + alias = fields.Str() + + event_starts_after = AwareDateTimeField() + event_ends_before = AwareDateTimeField() + + belief_time = AwareDateTimeField() + + horizons_at_least = DurationField() + horizons_at_most = DurationField() + + source = DataSourceIdField() + + source_types = fields.List(fields.Str()) + exclude_source_types = fields.List(fields.Str()) + most_recent_beliefs_only = fields.Boolean() + most_recent_events_only = fields.Boolean() + + one_deterministic_belief_per_event = fields.Boolean() + one_deterministic_belief_per_event_per_source = fields.Boolean() + resolution = DurationField() + sum_multiple = fields.Boolean() + + +class ReporterConfigSchema(Schema): + """ + This schema is used to validate Reporter class configurations (reporter_config). + Inherit from this to extend this schema with your own parameters. + """ + + beliefs_search_configs = fields.List( + fields.Nested(BeliefsSearchConfigSchema()), + required=True, + validator=validate.Length(min=1), + ) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py new file mode 100644 index 000000000..3b076ddb2 --- /dev/null +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -0,0 +1,119 @@ +from marshmallow import Schema, fields, ValidationError, validates_schema +from inspect import signature + +from flexmeasures.data.schemas.reporting import ReporterConfigSchema + +from timely_beliefs import BeliefsDataFrame + + +class PandasMethodCall(Schema): + + df_input = fields.Str() + df_output = fields.Str() + + method = fields.Str(required=True) + args = fields.List(fields.Raw()) + kwargs = fields.Dict() + + @validates_schema + def validate_method_call(self, data, **kwargs): + + method = data["method"] + method_callable = getattr( + BeliefsDataFrame, method, None + ) # what if the object which is applied to is not a BeliefsDataFrame... + + if not callable(method_callable): + raise ValidationError( + f"method {method} is not a valid BeliefsDataFrame method." + ) + + method_signature = signature(method_callable) + + try: + args = data.get("args", []).copy() + _kwargs = data.get("kwargs", {}).copy() + + args.insert(0, BeliefsDataFrame) + + method_signature.bind(*args, **_kwargs) + except TypeError: + raise ValidationError( + f"Bad arguments or keyword arguments for method {method}" + ) + + +class PandasReporterConfigSchema(ReporterConfigSchema): + """ + This schema lists fields that can be used to describe sensors in the optimised portfolio + + Example: + + { + "input_sensors" : [ + {"sensor" : 1, "alias" : "df1"} + ], + "transformations" : [ + { + "df_input" : "df1", + "df_output" : "df2", + "method" : "copy" + }, + { + "df_input" : "df2", + "df_output" : "df2", + "method" : "sum" + }, + { + "method" : "sum", + "kwargs" : {"axis" : 0} + } + ], + "final_df_output" : "df2" + """ + + transformations = fields.List(fields.Nested(PandasMethodCall()), required=True) + final_df_output = fields.Str(required=True) + + @validates_schema + def validate_chaining(self, data, **kwargs): + """ + This validator ensures that we are always given an input and that the + final_df_output is computed. + """ + + # create dictionary data with objects of the types that is supposed to be generated + # loading the initial data, the sensors' data + fake_data = dict( + (f"sensor_{s['sensor'].id}", BeliefsDataFrame) + for s in data.get("beliefs_search_configs") + ) + final_df_output = data.get("final_df_output") + + previous_df = None + final_df_output_method = None + + for transformation in data.get("transformations"): + + df_input = transformation.get("df_input", previous_df) + df_output = transformation.get("df_output", df_input) + + if df_output == final_df_output: + final_df_output_method = transformation.get("method") + + if not previous_df and not df_input: + raise ValidationError("Cannot find the input DataFrame.") + + previous_df = df_output # keeping last BeliefsDataFrame calculation + + fake_data[df_output] = BeliefsDataFrame + + if final_df_output not in fake_data: + raise ValidationError( + "Cannot find final output DataFrame among the resulting DataFrames." + ) + + if final_df_output_method in ["resample", "groupby"]: + raise ValidationError( + "Final output type cannot by of type `Resampler` or `DataFrameGroupBy`" + ) diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py new file mode 100644 index 000000000..cb5490052 --- /dev/null +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -0,0 +1,138 @@ +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType + +from flexmeasures.data.schemas.reporting.pandas_reporter import ( + PandasReporterConfigSchema, +) +from marshmallow.exceptions import ValidationError + +import pytest + + +@pytest.fixture(scope="module") +def setup_dummy_sensors(db, app): + + dummy_asset_type = GenericAssetType(name="DummyGenericAssetType") + db.session.add(dummy_asset_type) + + dummy_asset = GenericAsset( + name="DummyGenericAsset", generic_asset_type=dummy_asset_type + ) + db.session.add(dummy_asset) + + sensor1 = Sensor("sensor 1", generic_asset=dummy_asset) + db.session.add(sensor1) + sensor2 = Sensor("sensor 2", generic_asset=dummy_asset) + db.session.add(sensor2) + + db.session.commit() + + yield sensor1, sensor2 + + db.session.delete(sensor1) + db.session.delete(sensor2) + + db.session.commit() + + +@pytest.mark.parametrize( + "reporter_config, is_valid", + [ + ( + { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline + "beliefs_search_configs": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + ], + "transformations": [ + { + "df_output": "final_output", + "df_input": "sensor_1", + "method": "copy", + } + ], + "final_df_output": "final_output", + }, + True, + ), + ( + { # this checks that chaining works, applying the method copy on the previous dataframe + "beliefs_search_configs": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "copy"}, + ], + "final_df_output": "final_output", + }, + True, + ), + ( + { # this checks that resample cannot be the last method being applied + "beliefs_search_configs": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + { + "sensor": 2, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "resample", "args": ["1h"]}, + ], + "final_df_output": "final_output", + }, + False, + ), + ( + { # this checks that resample cannot be the last method being applied + "beliefs_search_configs": [ + { + "sensor": 1, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + { + "sensor": 2, + "event_starts_after": "2022-01-01T00:00:00 00:00", + "event_ends_before": "2022-01-01T23:00:00 00:00", + }, + ], + "transformations": [ + {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, + {"method": "copy"}, + {"df_output": "final_output", "method": "resample", "args": ["1h"]}, + {"method": "sum"}, + ], + "final_df_output": "final_output", + }, + True, + ), + ], +) +def test_pandas_reporter_schema( + reporter_config, is_valid, db, app, setup_dummy_sensors +): + + schema = PandasReporterConfigSchema() + + if is_valid: + schema.load(reporter_config) + else: + with pytest.raises(ValidationError): + schema.load(reporter_config) diff --git a/flexmeasures/utils/coding_utils.py b/flexmeasures/utils/coding_utils.py index eece574da..e3264c671 100644 --- a/flexmeasures/utils/coding_utils.py +++ b/flexmeasures/utils/coding_utils.py @@ -3,6 +3,8 @@ import functools import time import inspect +import importlib +import pkgutil from flask import current_app @@ -169,3 +171,36 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def find_classes_module(module, superclass, skiptest=True): + classes = [] + reporting_module = importlib.import_module(module) + + for submodule in pkgutil.iter_modules(reporting_module.__path__): + + if skiptest and ("test" in f"{module}.{submodule.name}"): + continue + + if submodule.ispkg: + classes.extend( + find_classes_module( + f"{module}.{submodule.name}", superclass, skiptest=skiptest + ) + ) + else: + module_object = importlib.import_module(f"{module}.{submodule.name}") + module_classes = inspect.getmembers(module_object, inspect.isclass) + classes.extend( + [ + (class_name, klass) + for class_name, klass in module_classes + if issubclass(klass, superclass) and klass != superclass + ] + ) + + return classes + + +def get_classes_module(module, superclass, skiptest=True) -> dict: + return dict(find_classes_module(module, superclass, skiptest=skiptest)) diff --git a/flexmeasures/utils/config_defaults.py b/flexmeasures/utils/config_defaults.py index 575a66d5b..a8f10180a 100644 --- a/flexmeasures/utils/config_defaults.py +++ b/flexmeasures/utils/config_defaults.py @@ -113,6 +113,7 @@ class Config(object): FLEXMEASURES_PLANNING_TTL: timedelta = timedelta( days=7 ) # Time to live for UDI event ids of successful scheduling jobs. Set a negative timedelta to persist forever. + FLEXMEASURES_DEFAULT_DATASOURCE: str = "FlexMeasures" FLEXMEASURES_JOB_CACHE_TTL: int = 3600 # Time to live for the job caching keys in seconds. Set a negative timedelta to persist forever. FLEXMEASURES_TASK_CHECK_AUTH_TOKEN: str | None = None FLEXMEASURES_REDIS_URL: str = "localhost" diff --git a/flexmeasures/utils/plugin_utils.py b/flexmeasures/utils/plugin_utils.py index 5acfea029..6cab82062 100644 --- a/flexmeasures/utils/plugin_utils.py +++ b/flexmeasures/utils/plugin_utils.py @@ -9,6 +9,8 @@ import sentry_sdk from flask import Flask, Blueprint +from flexmeasures.utils.coding_utils import get_classes_module + def register_plugins(app: Flask): """ @@ -100,6 +102,11 @@ def register_plugins(app: Flask): app.logger.debug(f"Registering {plugin_blueprint} ...") app.register_blueprint(plugin_blueprint) + # Loading reporters + from flexmeasures.data.models.reporting import Reporter + + app.reporters.update(get_classes_module(module.__name__, Reporter)) + app.config["LOADED_PLUGINS"][plugin_name] = plugin_version app.logger.info(f"Loaded plugins: {app.config['LOADED_PLUGINS']}") sentry_sdk.set_context("plugins", app.config.get("LOADED_PLUGINS", {}))