From 286623740b7357d8500d74ced3821f3344af93d4 Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 7 Aug 2023 09:55:18 +0200 Subject: [PATCH] feat: save/fetch `Reporter` as `DataSource` (#751) * feat: revision to add `attributes` column to the `data_source` table Signed-off-by: Victor Garcia Reolid * feat: add `attributes` column to the DataSource model Signed-off-by: Victor Garcia Reolid * feat: add sensors relationship in DataSource Signed-off-by: Victor Garcia Reolid * fix: make sensors relationship viewonly Signed-off-by: Victor Garcia Reolid * feat: add report_config to Reporter class Signed-off-by: Victor Garcia Reolid * feat: add PandasReporter report and reporter schemas Signed-off-by: Victor Garcia Reolid * fix: update fixture by removing beliefs_search_configs and adding input_variables Signed-off-by: Victor Garcia Reolid * feat: add report config to PandasReporter Signed-off-by: Victor Garcia Reolid * feat: add helper methods to DataSource Signed-off-by: Victor Garcia Reolid * fix: modernize AggregatorReporter Signed-off-by: Victor Garcia Reolid * feat: add attributes hash Signed-off-by: Victor Garcia Reolid * feat: add attributes to the function get_or_create_source Signed-off-by: Victor Garcia Reolid * feat: add attribute hash to get_or_create_source Signed-off-by: Victor Garcia Reolid * feat: save/fetch data generator to/from data source Signed-off-by: Victor Garcia Reolid * refactor: adapt reporters to use new DataGenerator class Signed-off-by: Victor Garcia Reolid * fix: use default method on load Signed-off-by: Victor Garcia Reolid * fix: adapt tests of the schemas Signed-off-by: Victor Garcia Reolid * fix: use a DataGenerator with a schema defined Signed-off-by: Victor Garcia Reolid * changing backref from "dynamic" to "select" Signed-off-by: Victor Garcia Reolid * feat: add hash_attributes static method Signed-off-by: Victor Garcia Reolid * fix: use hash_attributes static method Signed-off-by: Victor Garcia Reolid * feat: adding attributes_hash to the DataSource unique constraint list Signed-off-by: Victor Garcia Reolid * fix: add constraint to migration and downgrade Signed-off-by: Victor Garcia Reolid * fix: only returning keys from the attributes field Signed-off-by: Victor Garcia Reolid * refactor: rename _inputs_schema to _input_schema Signed-off-by: Victor Garcia Reolid * fix: typing Signed-off-by: Victor Garcia Reolid * fix: avoid future data leakage Signed-off-by: Victor Garcia Reolid * refactor: rename PandasReporterInputConfigSchema to PandasReporterInputSchema Signed-off-by: Victor Garcia Reolid * docs: clarify description of the fake_data mock variable Signed-off-by: Victor Garcia Reolid * docs: fix docstring Signed-off-by: Victor Garcia Reolid * fix: use default value Signed-off-by: Victor Garcia Reolid * fix: allow creating new attributes with the method `set_attributes` Signed-off-by: Victor Garcia Reolid * docs: add changelog entry Signed-off-by: Victor Garcia Reolid * docs: fix docstring Signed-off-by: Victor Garcia Reolid * fix: use default value Signed-off-by: Victor Garcia Reolid * fix: allow creating new attributes with the method `set_attributes` Signed-off-by: Victor Garcia Reolid * docs: add changelog entry Signed-off-by: Victor Garcia Reolid * fix: add reporters and schedulers into the data_generators attribute in the app context Signed-off-by: Victor Garcia Reolid * fix: raise Exceptions instead of returning None Signed-off-by: Victor Garcia Reolid * fix: move sensor attribute from config to inputs Signed-off-by: Victor Garcia Reolid * fix: use same structure for data generators and add test Signed-off-by: Victor Garcia Reolid * refactor: use input_resolution instead of resolution Signed-off-by: Victor Garcia Reolid * doc: update schema docstring Signed-off-by: Victor Garcia Reolid * refactor: rename input_resolution to resolution Signed-off-by: Victor Garcia Reolid * fix: remove sensor from config Signed-off-by: Victor Garcia Reolid * docs: add comment Signed-off-by: Victor Garcia Reolid * fix: remove df_output Signed-off-by: Victor Garcia Reolid * fix:. for data in data["data"] haha Signed-off-by: Victor Garcia Reolid * doc: add docstring to compute and __init__ in DataGenerator Signed-off-by: Victor Garcia Reolid * refactor: rename inputs to input Signed-off-by: Victor Garcia Reolid * fix: removing constructor Signed-off-by: Victor Garcia Reolid * docs: improve docstring Signed-off-by: Victor Garcia Reolid * test: add data to confest Signed-off-by: Victor Garcia Reolid * test: add test_dst_transition Signed-off-by: Victor Garcia Reolid * fix: never returning None Signed-off-by: Victor Garcia Reolid * test: add test to check timely-beliefs resampling and calling an aggregator between to dates with different offsets, in this case, daylight saving transitions. Signed-off-by: Victor Garcia Reolid * test: change output sensor id Signed-off-by: Victor Garcia Reolid * docs: add docstring to the data_source propert of the class DataGenerator. Signed-off-by: Victor Garcia Reolid * docs: edit data_source docstring Signed-off-by: Victor Garcia Reolid * refactor: ranming input_sensors to input_variables Signed-off-by: Victor Garcia Reolid * rename input to parameters Signed-off-by: Victor Garcia Reolid * remove unnecessary import Signed-off-by: Victor Garcia Reolid * add save_config attribute Signed-off-by: Victor Garcia Reolid * remove leftover comment Signed-off-by: Victor Garcia Reolid * add inline comments Signed-off-by: Victor Garcia Reolid * deprecation message for app.reoprters and app.schedulers Signed-off-by: Victor Garcia Reolid * fix typo Signed-off-by: Victor Garcia Reolid * use data source created by the data generator Signed-off-by: Victor Garcia Reolid * feat: support YAML in `flexmeasures add report` command (#752) * feat: add pyyaml to the requirements Signed-off-by: Victor Garcia Reolid * feat: support YAML and add report_config Signed-off-by: Victor Garcia Reolid * fix: move `types-PyYAML` dependency to the right place Signed-off-by: Victor Garcia Reolid * fix: use a DataGenerator with defined schemas Signed-off-by: Victor Garcia Reolid * fix: adapt tests of the schemas Signed-off-by: Victor Garcia Reolid * feat: add option to open default editor Signed-off-by: Victor Garcia Reolid * fix: move sensor to input Signed-off-by: Victor Garcia Reolid * fix: parse resolution properly Signed-off-by: Victor Garcia Reolid * fix: remove accidentally commited file Signed-off-by: Victor Garcia Reolid * fix: avoid potential bug Signed-off-by: Victor Garcia Reolid * rename input to parameters Signed-off-by: Victor Garcia Reolid * add chagelog entry Signed-off-by: Victor Garcia Reolid * add pyyaml to app.txt Signed-off-by: Victor Garcia Reolid * add --save-config to the add_report command Signed-off-by: Victor Garcia Reolid * improve changelog files Signed-off-by: Victor Garcia Reolid --------- Signed-off-by: Victor Garcia Reolid Signed-off-by: Victor * Polish `PandasReporter` schemas (#788) * add required_input and required_output to PandasReporterConfigSchema and input & outupt to parameters Signed-off-by: Victor Garcia Reolid * adapt tests Signed-off-by: Victor Garcia Reolid * implement multiple output and simplify tibber reporter Signed-off-by: Victor Garcia Reolid * fix example in the docstring Signed-off-by: Victor Garcia Reolid * remove max=1 constraint Signed-off-by: Victor Garcia Reolid * add example for _clean_parameters Signed-off-by: Victor Garcia Reolid * remove time parameters in input (_clean_parameters method) Signed-off-by: Victor Garcia Reolid * remove filed added accidentally Signed-off-by: Victor Garcia Reolid * improve assert Signed-off-by: Victor Garcia Reolid * update changelog entry Signed-off-by: Victor Garcia Reolid * fix changelog Signed-off-by: Victor Garcia Reolid * Adapt `test_add_report` to use the new field of the `PandasReporter` schema (#789) * fix: typo Signed-off-by: F.N. Claessen * fix: fetch output sensor only from parameters dict Signed-off-by: F.N. Claessen * adapt the CLI to deal with multiple output Signed-off-by: Victor Garcia Reolid * fix typos Signed-off-by: Victor Garcia Reolid --------- Signed-off-by: F.N. Claessen Signed-off-by: Victor Garcia Reolid Co-authored-by: F.N. Claessen --------- Signed-off-by: Victor Garcia Reolid Signed-off-by: F.N. Claessen Co-authored-by: F.N. Claessen --------- Signed-off-by: Victor Garcia Reolid Signed-off-by: Victor Signed-off-by: F.N. Claessen Co-authored-by: F.N. Claessen --- ci/run_mypy.sh | 2 +- documentation/changelog.rst | 2 + documentation/cli/change_log.rst | 1 + flexmeasures/app.py | 23 +- flexmeasures/cli/data_add.py | 230 ++++++++++++------ flexmeasures/cli/tests/conftest.py | 37 +-- flexmeasures/cli/tests/test_data_add.py | 160 ++++++++++-- flexmeasures/data/models/data_sources.py | 215 ++++++++++++++-- .../data/models/reporting/__init__.py | 186 ++++---------- .../data/models/reporting/aggregator.py | 68 ++++-- .../data/models/reporting/pandas_reporter.py | 164 +++++++++---- .../data/models/reporting/tests/conftest.py | 53 +++- .../models/reporting/tests/test_aggregator.py | 123 ++++++++-- .../reporting/tests/test_pandas_reporter.py | 107 +++++--- .../models/reporting/tests/test_reporter.py | 40 --- .../reporting/tests/test_tibber_reporter.py | 113 ++++----- flexmeasures/data/schemas/io.py | 52 ++++ .../data/schemas/reporting/__init__.py | 45 +++- .../data/schemas/reporting/aggregation.py | 78 +++--- .../data/schemas/reporting/pandas_reporter.py | 128 ++++++---- .../data/schemas/tests/test_reporting.py | 118 +++++---- flexmeasures/data/tests/conftest.py | 51 ++++ flexmeasures/data/tests/test_data_source.py | 145 +++++++++++ flexmeasures/utils/plugin_utils.py | 12 +- requirements/app.in | 1 + requirements/app.txt | 2 + 26 files changed, 1507 insertions(+), 649 deletions(-) delete mode 100644 flexmeasures/data/models/reporting/tests/test_reporter.py create mode 100644 flexmeasures/data/schemas/io.py create mode 100644 flexmeasures/data/tests/test_data_source.py diff --git a/ci/run_mypy.sh b/ci/run_mypy.sh index 3b29de77b..88d5bd181 100755 --- a/ci/run_mypy.sh +++ b/ci/run_mypy.sh @@ -1,7 +1,7 @@ #!/bin/bash set -e pip install --upgrade 'mypy>=0.902' -pip install types-pytz types-requests types-Flask types-click types-redis types-tzlocal types-python-dateutil types-setuptools types-tabulate +pip install types-pytz types-requests types-Flask types-click types-redis types-tzlocal types-python-dateutil types-setuptools types-tabulate types-PyYAML # We are checking python files which have type hints, and leave out bigger issues we made issues for # * data/scripts: We'll remove legacy code: https://trello.com/c/1wEnHOkK/7-remove-custom-data-scripts # * data/models and data/services: https://trello.com/c/rGxZ9h2H/540-makequery-call-signature-is-incoherent diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 2dd8a64cb..581a84610 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -25,6 +25,8 @@ New features * Added API endpoints `/sensors/` for fetching a single sensor, `/sensors` (POST) for adding a sensor, `/sensors/` (PATCH) for updating a sensor and `/sensors/` (DELETE) for deleting a sensor. [see `PR #759 `_] and [see `PR #767 `_] and [see `PR #773 `_] and [see `PR #784 `_] * The CLI now allows to set lists and dicts as asset & sensor attributes (formerly only single values) [see `PR #762 `_] * Add `ProcessScheduler` class to optimize the starting time of processes one of the policies developed (INFLEXIBLE, SHIFTABLE and BREAKABLE), accessible via the CLI command `flexmeasures add schedule for-process` [see `PR #729 `_ and `PR #768 `_] +* Users will be able to see (e.g. in the UI) exactly which reporter created the report (saved as sensor data), and hosts will be able to identify exactly which configuration was used to create a given report [see `PR #751 `_, `PR #752 `_ and `PR #788 `_] +* The CLI `flexmeasures add report` now allows passing `config` and `parameters` in YAML format as files or editable via the system's default editor [see `PR #788 `_] Bugfixes ----------- diff --git a/documentation/cli/change_log.rst b/documentation/cli/change_log.rst index 2d5112445..7ea243638 100644 --- a/documentation/cli/change_log.rst +++ b/documentation/cli/change_log.rst @@ -9,6 +9,7 @@ since v0.15.0 | July XX, 2023 * Allow deleting multiple sensors with a single call to ``flexmeasures delete sensor`` by passing the ``--id`` option multiple times. * Add ``flexmeasures add schedule for-process`` to create a new process schedule for a given power sensor. +* Add support for describing ``config`` and ``parameters`` in YAML for the command ``flexmeasures add report``, editable in user's code editor using the flags ``--edit-config`` or ``--edit-parameters``. * Add ``--kind process`` option to create the asset and sensors for the ``ProcessScheduler`` tutorial. since v0.14.1 | June XX, 2023 diff --git a/flexmeasures/app.py b/flexmeasures/app.py index b26c0aad3..64e745ff6 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -5,6 +5,7 @@ from __future__ import annotations import time +from copy import copy import os from pathlib import Path from datetime import date @@ -123,8 +124,26 @@ def create( # noqa C901 from flexmeasures.utils.coding_utils import get_classes_module from flexmeasures.data.models import reporting, planning - app.reporters = get_classes_module("flexmeasures.data.models", reporting.Reporter) - app.schedulers = get_classes_module("flexmeasures.data.models", planning.Scheduler) + reporters = get_classes_module("flexmeasures.data.models", reporting.Reporter) + schedulers = get_classes_module("flexmeasures.data.models", planning.Scheduler) + + app.data_generators = dict() + app.data_generators["reporter"] = copy( + reporters + ) # use copy to avoid mutating app.reporters + app.data_generators["scheduler"] = schedulers + + # deprecation of app.reporters + app.reporters = reporters + app.schedulers = schedulers + + def get_reporters(): + app.logger.warning( + '`app.reporters` is deprecated. Use `app.data_generators["reporter"]` instead.' + ) + return app.data_generators["reporter"] + + setattr(app, "reporters", property(get_reporters)) # add auth policy diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index cc60491cf..9e48e5c88 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -8,8 +8,10 @@ from typing import Type, List import isodate import json +import yaml from pathlib import Path from io import TextIOBase +from string import Template from marshmallow import validate import pandas as pd @@ -57,6 +59,7 @@ from flexmeasures.data.schemas.times import TimeIntervalSchema from flexmeasures.data.schemas.scheduling.storage import EfficiencyField from flexmeasures.data.schemas.sensors import SensorSchema +from flexmeasures.data.schemas.io import Output from flexmeasures.data.schemas.units import QuantityField from flexmeasures.data.schemas.generic_assets import ( GenericAssetSchema, @@ -1305,19 +1308,18 @@ def add_schedule_process( @fm_add_data.command("report") @with_appcontext @click.option( - "--sensor-id", - "sensor", - type=SensorIdField(), - required=True, - help="Sensor used to save the report. Follow up with the sensor's ID. " - " If needed, use `flexmeasures add sensor` to create a new sensor first.", + "--config", + "config_file", + required=False, + type=click.File("r"), + help="Path to the JSON or YAML file with the configuration of the reporter.", ) @click.option( - "--reporter-config", - "reporter_config", - required=True, + "--parameters", + "parameters_file", + required=False, type=click.File("r"), - help="Path to the JSON file with the reporter configuration.", + help="Path to the JSON or YAML file with the report parameters (passed to the compute step).", ) @click.option( "--reporter", @@ -1364,17 +1366,20 @@ def add_schedule_process( ) @click.option( "--output-file", - "output_file", + "output_file_pattern", required=False, type=click.Path(), - help="Path to save the report to file. Will override any previous file contents." - " Use the `.csv` suffix to save the results as Comma Separated Values and `.xlsx` to export them as Excel sheets.", + help="Format of the output file. Use dollar sign ($) to interpolate values among the following ones:" + " now (current time), name (name of the output), sensor_id (id of the sensor), column (column of the output)." + " Example: 'result_file_$name_$now.csv'. " + "Use the `.csv` suffix to save the results as Comma Separated Values and `.xlsx` to export them as Excel sheets.", ) @click.option( "--timezone", "timezone", required=False, - help="Timezone as string, e.g. 'UTC' or 'Europe/Amsterdam' (defaults to the timezone of the sensor used to save the report).", + help="Timezone as string, e.g. 'UTC' or 'Europe/Amsterdam' (defaults to the timezone of the sensor used to save the report)." + "The timezone of the first output sensor (specified in the parameters) is taken as a default.", ) @click.option( "--dry-run", @@ -1382,17 +1387,38 @@ def add_schedule_process( is_flag=True, help="Add this flag to avoid saving the results to the database.", ) +@click.option( + "--edit-config", + "edit_config", + is_flag=True, + help="Add this flag to edit the configuration of the Reporter in your default text editor (e.g. nano).", +) +@click.option( + "--edit-parameters", + "edit_parameters", + is_flag=True, + help="Add this flag to edit the parameters passed to the Reporter in your default text editor (e.g. nano).", +) +@click.option( + "--save-config", + "save_config", + is_flag=True, + help="Add this flag to save the `config` in the attributes of the DataSource for future reference.", +) def add_report( # noqa: C901 reporter_class: str, - sensor: Sensor, - reporter_config: TextIOBase, + config_file: TextIOBase | None = None, + parameters_file: TextIOBase | None = None, start: datetime | None = None, end: datetime | None = None, start_offset: str | None = None, end_offset: str | None = None, resolution: timedelta | None = None, - output_file: Path | None = None, + output_file_pattern: Path | None = None, dry_run: bool = False, + edit_config: bool = False, + edit_parameters: bool = False, + save_config: bool = False, timezone: str | None = None, ): """ @@ -1400,11 +1426,38 @@ def add_report( # noqa: C901 to the database or export them as CSV or Excel file. """ + config = dict() + + if config_file: + config = yaml.safe_load(config_file) + + if edit_config: + config = launch_editor("/tmp/config.yml") + + parameters = dict() + + if parameters_file: + parameters = yaml.safe_load(parameters_file) + + if edit_parameters: + parameters = launch_editor("/tmp/parameters.yml") + + # check if sensor is not provided in the `parameters` description + if "output" not in parameters or len(parameters["output"]) == 0: + click.secho( + "At least one output sensor needs to be specified in the parameters description.", + **MsgStyle.ERROR, + ) + raise click.Abort() + + output = [Output().load(o) for o in parameters["output"]] + # compute now in the timezone local to the output sensor if timezone is not None: check_timezone(timezone) + now = pytz.timezone( - zone=timezone if timezone is not None else sensor.timezone + zone=timezone if timezone is not None else output[0]["sensor"].timezone ).localize(datetime.now()) # apply offsets, if provided @@ -1425,9 +1478,11 @@ def add_report( # noqa: C901 " Trying to use the latest datapoint of the report sensor as the start time...", **MsgStyle.WARN, ) + + # todo: get the oldest last_value among all the sensors last_value_datetime = ( db.session.query(func.max(TimedBelief.event_start)) - .filter(TimedBelief.sensor_id == sensor.id) + .filter(TimedBelief.sensor_id == output[0]["sensor"].id) .one_or_none() ) @@ -1436,7 +1491,8 @@ def add_report( # noqa: C901 start = last_value_datetime[0] else: click.secho( - f"Could not find any data for the report sensor {sensor}.", + "Could not find any data for the output sensors provided. Such data is needed to compute" + " a sensible default start for the report, so setting a start explicitly would resolve this issue.", **MsgStyle.ERROR, ) raise click.Abort() @@ -1457,7 +1513,9 @@ def add_report( # noqa: C901 ) # get reporter class - ReporterClass: Type[Reporter] = app.reporters.get(reporter_class) + ReporterClass: Type[Reporter] = app.data_generators.get("reporter").get( + reporter_class + ) # check if it exists if ReporterClass is None: @@ -1469,70 +1527,102 @@ def add_report( # noqa: C901 click.secho(f"Reporter {reporter_class} found.", **MsgStyle.SUCCESS) - reporter_config_raw = json.load(reporter_config) - # initialize reporter class with the reporter sensor and reporter config - reporter: Reporter = ReporterClass( - sensor=sensor, reporter_config_raw=reporter_config_raw - ) + reporter: Reporter = ReporterClass(config=config, save_config=save_config) click.echo("Report computation is running...") - # compute the report - result: BeliefsDataFrame = reporter.compute( - start=start, end=end, input_resolution=resolution - ) + if ("start" not in parameters) and (start is not None): + parameters["start"] = start.isoformat() + if ("end" not in parameters) and (end is not None): + parameters["end"] = end.isoformat() + if ("resolution" not in parameters) and (resolution is not None): + parameters["resolution"] = pd.Timedelta(resolution).isoformat() - if not result.empty: - click.secho("Report computation done.", **MsgStyle.SUCCESS) - else: - click.secho( - "Report computation done, but the report is empty.", **MsgStyle.WARN - ) - - # save the report if it's not running in dry mode - if not dry_run: - click.echo("Saving report to the database...") - save_to_db(result.dropna()) - db.session.commit() - click.secho( - "Success. The report has been saved to the database.", - **MsgStyle.SUCCESS, - ) - else: - click.echo( - f"Not saving report to the database (because of --dry-run), but this is what I computed:\n{result}" - ) - - # if an output file path is provided, save the results - if output_file: - suffix = str(output_file).split(".")[-1] if "." in str(output_file) else "" + # compute the report + results: BeliefsDataFrame = reporter.compute(parameters=parameters) - if suffix == "xlsx": # save to EXCEL - result.to_excel(output_file) + for result in results: + data = result["data"] + sensor = result["sensor"] + if not data.empty: click.secho( - f"Success. The report has been exported as EXCEL to the file `{output_file}`", - **MsgStyle.SUCCESS, + f"Report computation done for sensor `{sensor}`.", **MsgStyle.SUCCESS + ) + else: + click.secho( + f"Report computation done for sensor `{sensor}`, but the report is empty.", + **MsgStyle.WARN, ) - elif suffix == "csv": # save to CSV - result.to_csv(output_file) + # save the report if it's not running in dry mode + if not dry_run: + click.echo(f"Saving report for sensor `{sensor}` to the database...") + save_to_db(data.dropna()) + db.session.commit() click.secho( - f"Success. The report has been exported as CSV to the file `{output_file}`", + f"Success. The report for sensor `{sensor}` has been saved to the database.", **MsgStyle.SUCCESS, ) + else: + click.echo( + f"Not saving report for sensor `{sensor}` to the database (because of --dry-run), but this is what I computed:\n{data}" + ) - else: # default output format: CSV. + # if an output file path is provided, save the data + if output_file_pattern: + suffix = ( + str(output_file_pattern).split(".")[-1] + if "." in str(output_file_pattern) + else "" + ) + template = Template(str(output_file_pattern)) + + filename = template.safe_substitute( + sensor_id=result["sensor"].id, + name=result.get("name", ""), + column=result.get("column", ""), + reporter_class=reporter_class, + now=now.strftime("%Y_%m_%dT%H%M%S"), + ) + + if suffix == "xlsx": # save to EXCEL + data.to_excel(filename) + click.secho( + f"Success. The report for sensor `{sensor}` has been exported as EXCEL to the file `{filename}`", + **MsgStyle.SUCCESS, + ) + + elif suffix == "csv": # save to CSV + data.to_csv(filename) + click.secho( + f"Success. The report for sensor `{sensor}` has been exported as CSV to the file `{filename}`", + **MsgStyle.SUCCESS, + ) + + else: # default output format: CSV. + click.secho( + f"File suffix not provided. Exporting results for sensor `{sensor}` as CSV to file {filename}", + **MsgStyle.WARN, + ) + data.to_csv(filename) + else: click.secho( - f"File suffix not provided. Exporting results as CSV to file {output_file}", - **MsgStyle.WARN, + "Success.", + **MsgStyle.SUCCESS, ) - result.to_csv(output_file) - else: - click.secho( - "Success.", - **MsgStyle.SUCCESS, - ) + + +def launch_editor(filename: str) -> dict: + """Launch editor to create/edit a json object""" + click.edit("{\n}", filename=filename) + + with open(filename, "r") as f: + content = yaml.safe_load(f) + if content is None: + return dict() + + return content @fm_add_data.command("toy-account") diff --git a/flexmeasures/cli/tests/conftest.py b/flexmeasures/cli/tests/conftest.py index e83b53f07..097efbda5 100644 --- a/flexmeasures/cli/tests/conftest.py +++ b/flexmeasures/cli/tests/conftest.py @@ -63,6 +63,13 @@ def setup_dummy_data(db, app, setup_dummy_asset): ) db.session.add(report_sensor) + report_sensor_2 = Sensor( + "report sensor 2", + generic_asset=pandas_report, + event_resolution=timedelta(hours=2), + ) + db.session.add(report_sensor_2) + # Create 1 DataSources source = DataSource("source1") @@ -83,35 +90,7 @@ def setup_dummy_data(db, app, setup_dummy_asset): db.session.add_all(beliefs) db.session.commit() - yield sensor1, sensor2, report_sensor - - -@pytest.fixture(scope="module") -@pytest.mark.skip_github -def reporter_config_raw(app, db, setup_dummy_data): - """ - This reporter_config defines the operations to add up the - values of the sensors 1 and 2 and resamples the result to a - two hour resolution. - """ - - sensor1, sensor2, report_sensor = setup_dummy_data - - reporter_config_raw = dict( - beliefs_search_configs=[dict(sensor=sensor1.id), dict(sensor=sensor2.id)], - transformations=[ - dict( - df_input="sensor_1", - method="add", - args=["@sensor_2"], - df_output="df_agg", - ), - dict(method="resample_events", args=["2h"]), - ], - final_df_output="df_agg", - ) - - return reporter_config_raw + yield sensor1.id, sensor2.id, report_sensor.id, report_sensor_2.id @pytest.mark.skip_github diff --git a/flexmeasures/cli/tests/test_data_add.py b/flexmeasures/cli/tests/test_data_add.py index 995cc706f..72647d494 100644 --- a/flexmeasures/cli/tests/test_data_add.py +++ b/flexmeasures/cli/tests/test_data_add.py @@ -1,7 +1,9 @@ import pytest import json +import yaml import os - +from datetime import datetime +import pytz from flexmeasures.cli.tests.utils import to_flags from flexmeasures.data.models.annotations import ( @@ -96,7 +98,7 @@ def test_cli_help(app): @pytest.mark.skip_github -def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): +def test_add_reporter(app, db, setup_dummy_data): """ The reporter aggregates input data from two sensors (both have 200 data points) to a two-hour resolution. @@ -113,30 +115,54 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): from flexmeasures.cli.data_add import add_report - sensor1, sensor2, report_sensor = setup_dummy_data - report_sensor_id = report_sensor.id + sensor1_id, sensor2_id, report_sensor_id, _ = setup_dummy_data + + reporter_config = dict( + required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], + required_output=[{"name": "df_agg"}], + transformations=[ + dict( + df_input="sensor_1", + method="add", + args=["@sensor_2"], + df_output="df_agg", + ), + dict(method="resample_events", args=["2h"]), + ], + ) # Running the command with start and end values. runner = app.test_cli_runner() cli_input_params = { - "sensor-id": report_sensor_id, - "reporter-config": "reporter_config.json", + "config": "reporter_config.yaml", + "parameters": "parameters.json", "reporter": "PandasReporter", "start": "2023-04-10T00:00:00 00:00", "end": "2023-04-10T10:00:00 00:00", "output-file": "test.csv", } + parameters = dict( + input=[ + dict(name="sensor_1", sensor=sensor1_id), + dict(name="sensor_2", sensor=sensor2_id), + ], + output=[dict(name="df_agg", sensor=report_sensor_id)], + ) + cli_input = to_flags(cli_input_params) # run test in an isolated file system with runner.isolated_filesystem(): # save reporter_config to a json file - with open("reporter_config.json", "w") as f: - json.dump(reporter_config_raw, f) + with open("reporter_config.yaml", "w") as f: + yaml.dump(reporter_config, f) + + with open("parameters.json", "w") as f: + json.dump(parameters, f) # call command result = runner.invoke(add_report, cli_input) @@ -145,15 +171,14 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): assert result.exit_code == 0 # run command without errors - assert "Reporter PandasReporter found" in result.output - assert "Report computation done." in result.output - - # Check report is saved to the database - report_sensor = Sensor.query.get( report_sensor_id ) # get fresh report sensor instance + assert "Reporter PandasReporter found" in result.output + assert f"Report computation done for sensor `{report_sensor}`." in result.output + + # Check report is saved to the database stored_report = report_sensor.search_beliefs( event_starts_after=cli_input_params.get("start").replace(" ", "+"), event_ends_before=cli_input_params.get("end").replace(" ", "+"), @@ -175,8 +200,8 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): previous_command_end = cli_input_params.get("end").replace(" ", "+") cli_input_params = { - "sensor-id": report_sensor_id, - "reporter-config": "reporter_config.json", + "config": "reporter_config.json", + "parameters": "parameters.json", "reporter": "PandasReporter", "output-file": "test.csv", "timezone": "UTC", @@ -188,7 +213,10 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): # save reporter_config to a json file with open("reporter_config.json", "w") as f: - json.dump(reporter_config_raw, f) + json.dump(reporter_config, f) + + with open("parameters.json", "w") as f: + json.dump(parameters, f) # call command result = runner.invoke(add_report, cli_input) @@ -197,14 +225,14 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): assert result.exit_code == 0 # run command without errors - assert "Reporter PandasReporter found" in result.output - assert "Report computation done." in result.output - # Check if the report is saved to the database report_sensor = Sensor.query.get( report_sensor_id ) # get fresh report sensor instance + assert "Reporter PandasReporter found" in result.output + assert f"Report computation done for sensor `{report_sensor}`." in result.output + stored_report = report_sensor.search_beliefs( event_starts_after=previous_command_end, event_ends_before=server_now(), @@ -213,6 +241,100 @@ def test_add_reporter(app, db, setup_dummy_data, reporter_config_raw): assert len(stored_report) == 95 +@pytest.mark.skip_github +def test_add_multiple_output(app, db, setup_dummy_data): + """ """ + + from flexmeasures.cli.data_add import add_report + + sensor_1_id, sensor_2_id, report_sensor_id, report_sensor_2_id = setup_dummy_data + + reporter_config = dict( + required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], + required_output=[{"name": "df_agg"}, {"name": "df_sub"}], + transformations=[ + dict( + df_input="sensor_1", + method="add", + args=["@sensor_2"], + df_output="df_agg", + ), + dict(method="resample_events", args=["2h"]), + dict( + df_input="sensor_1", + method="subtract", + args=["@sensor_2"], + df_output="df_sub", + ), + dict(method="resample_events", args=["2h"]), + ], + ) + + # Running the command with start and end values. + + runner = app.test_cli_runner() + + cli_input_params = { + "config": "reporter_config.yaml", + "parameters": "parameters.json", + "reporter": "PandasReporter", + "start": "2023-04-10T00:00:00+00:00", + "end": "2023-04-10T10:00:00+00:00", + "output-file": "test-$name.csv", + } + + parameters = dict( + input=[ + dict(name="sensor_1", sensor=sensor_1_id), + dict(name="sensor_2", sensor=sensor_2_id), + ], + output=[ + dict(name="df_agg", sensor=report_sensor_id), + dict(name="df_sub", sensor=report_sensor_2_id), + ], + ) + + cli_input = to_flags(cli_input_params) + + # run test in an isolated file system + with runner.isolated_filesystem(): + + # save reporter_config to a json file + with open("reporter_config.yaml", "w") as f: + yaml.dump(reporter_config, f) + + with open("parameters.json", "w") as f: + json.dump(parameters, f) + + # call command + result = runner.invoke(add_report, cli_input) + + assert os.path.exists("test-df_agg.csv") + assert os.path.exists("test-df_sub.csv") + + print(result) + + assert result.exit_code == 0 # run command without errors + + report_sensor = Sensor.query.get(report_sensor_id) + report_sensor_2 = Sensor.query.get(report_sensor_2_id) + + assert "Reporter PandasReporter found" in result.output + assert f"Report computation done for sensor `{report_sensor}`." in result.output + assert ( + f"Report computation done for sensor `{report_sensor_2}`." in result.output + ) + + # check that the reports are saved + assert all( + report_sensor.search_beliefs( + event_ends_before=datetime(2023, 4, 10, 10, tzinfo=pytz.UTC) + ).values.flatten() + == [1, 5, 9, 13, 17] + ) + assert all(report_sensor_2.search_beliefs() == 0) + + @pytest.mark.skip_github @pytest.mark.parametrize("process_type", [("INFLEXIBLE"), ("SHIFTABLE"), ("BREAKABLE")]) def test_add_process(app, process_power_sensor, process_type): diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 307b008d0..334ca6721 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, List, Dict from sqlalchemy.ext.mutable import MutableDict import timely_beliefs as tb @@ -10,14 +10,116 @@ from flask import current_app import hashlib +from marshmallow import Schema + if TYPE_CHECKING: from flexmeasures.data.models.user import User -class DataGeneratorMixin: +class DataGenerator: + __data_generator_base__: str | None = None _data_source: DataSource | None = None + _config: dict = None + _parameters: dict = None + + _parameters_schema: Schema | None = None + _config_schema: Schema | None = None + _save_config: bool = True + _save_parameters: bool = False + + def __init__( + self, + config: dict | None = None, + save_config=True, + save_parameters=False, + **kwargs, + ) -> None: + """Base class for the Schedulers, Reporters and Forecasters. + + The configuration `config` stores static parameters, parameters that, if + changed, trigger the creation of a new DataSource. Dynamic parameters, such as + the start date, can go into the `parameters`. See docstring of the method `DataGenerator.compute` for + more details. Nevertheless, the parameter `save_parameters` can be set to True if some `parameters` need + to be saved to the DB. In that case, the method `_clean_parameters` is called to remove any field that is not + to be persisted, e.g. time parameters which are already contained in the TimedBelief. + + Create a new DataGenerator with a certain configuration. There are two alternatives + to define the parameters: + + 1. Serialized through the keyword argument `config`. + 2. Deserialized, passing each parameter as keyword arguments. + + The configuration is validated using the schema `_config_schema`, to be defined by the subclass. + + `config` cannot contain the key `config` at its top level, otherwise it could conflict with the constructor keyword argument `config` + when passing the config as deserialized attributes. + + Example: + + The configuration requires two parameters for the PV and consumption sensors. + + Option 1: + dg = DataGenerator(config = { + "sensor_pv" : 1, + "sensor_consumption" : 2 + }) + + Option 2: + sensor_pv = Sensor.query.get(1) + sensor_consumption = Sensor.query.get(2) + + dg = DataGenerator(sensor_pv = sensor_pv, + sensor_consumption = sensor_consumption) + + + :param config: serialized `config` parameters, defaults to None + :param save_config: whether to save the config into the data source attributes + :param save_parameters: whether to save the parameters into the data source attributes + """ + + self._save_config = save_config + self._save_parameters = save_parameters + + if config is None and len(kwargs) > 0: + self._config = kwargs + DataGenerator.validate_deserialized(self._config, self._config_schema) + elif config is not None: + self._config = self._config_schema.load(config) + elif len(kwargs) == 0: + self._config = self._config_schema.load({}) + + def _compute(self, **kwargs) -> List[Dict[str, Any]]: + raise NotImplementedError() + + def compute(self, parameters: dict | None = None, **kwargs) -> List[Dict[str, Any]]: + """The configuration `parameters` stores dynamic parameters, parameters that, if + changed, DO NOT trigger the creation of a new DataSource. Static parameters, such as + the topology of an energy system, can go into `config`. + + `parameters` cannot contain the key `parameters` at its top level, otherwise it could conflict with keyword argument `parameters` + of the method compute when passing the `parameters` as deserialized attributes. + + :param parameters: serialized `parameters` parameters, defaults to None + """ + + if self._parameters is None: + self._parameters = {} + + if parameters is None: + self._parameters.update(self._parameters_schema.dump(kwargs)) + else: + self._parameters.update(parameters) + + self._parameters = self._parameters_schema.load(self._parameters) + + return self._compute(**self._parameters) + + @staticmethod + def validate_deserialized(values: dict, schema: Schema) -> bool: + schema.load(schema.dump(values)) + @classmethod def get_data_source_info(cls: type) -> dict: """ @@ -26,35 +128,68 @@ def get_data_source_info(cls: type) -> dict: See for instance get_data_source_for_job(). """ source_info = dict( - name=current_app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") + source=current_app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") ) # default - from flexmeasures.data.models.planning import Scheduler - from flexmeasures.data.models.reporting import Reporter - - if issubclass(cls, Reporter): - source_info["type"] = "reporter" - elif issubclass(cls, Scheduler): - source_info["type"] = "scheduler" - else: - source_info["type"] = "undefined" + source_info["source_type"] = cls.__data_generator_base__ + source_info["model"] = cls.__name__ return source_info @property - def data_source(self): + def data_source(self) -> "DataSource": + """DataSource property derived from the `source_info`: `source_type` (scheduler, forecaster or reporter), `model` (e.g AggregatorReporter) + and `attributes`. It looks for a data source in the database the marges the `source_info` and, in case of not finding any, it creates a new one. + This property gets created once and it's cached for the rest of the lifetime of the DataGenerator object. + """ + from flexmeasures.data.services.data_sources import get_or_create_source if self._data_source is None: data_source_info = self.get_data_source_info() - self._data_source = get_or_create_source( - source=data_source_info.get("name"), - source_type=data_source_info.get("type"), - ) + attributes = {"data_generator": {}} + + if self._save_config: + attributes["data_generator"]["config"] = self._config_schema.dump( + self._config + ) + + if self._save_parameters: + attributes["data_generator"]["parameters"] = self._clean_parameters( + self._parameters_schema.dump(self._parameters) + ) + + data_source_info["attributes"] = attributes + + self._data_source = get_or_create_source(**data_source_info) return self._data_source + def _clean_parameters(self, parameters: dict) -> dict: + """Use this function to clean up the parameters dictionary from the + fields that are not to be persisted to the DB as data source attributes (when save_parameters=True), + e.g. because they are already stored as TimedBelief properties, or otherwise. + + Example: + + An DataGenerator has the following parameters: ["start", "end", "field1", "field2"] and we want just "field1" and "field2" + to be persisted. + + Parameters provided to the `compute` method (input of the method `_clean_parameters`): + parameters = { + "start" : "2023-01-01T00:00:00+02:00", + "end" : "2023-01-02T00:00:00+02:00", + "field1" : 1, + "field2" : 2 + } + + Parameters persisted to the DB (output of the method `_clean_parameters`): + parameters = {"field1" : 1,"field2" : 2} + """ + + raise NotImplementedError() + class DataSource(db.Model, tb.BeliefSourceDBMixin): """Each data source is a data-providing entity.""" @@ -91,6 +226,8 @@ class DataSource(db.Model, tb.BeliefSourceDBMixin): viewonly=True, ) + _data_generator: DataGenerator | None = None + def __init__( self, name: str | None = None, @@ -116,6 +253,50 @@ def __init__( tb.BeliefSourceDBMixin.__init__(self, name=name) db.Model.__init__(self, **kwargs) + @property + def data_generator(self): + if self._data_generator: + return self._data_generator + + data_generator = None + + if self.type not in ["scheduler", "forecaster", "reporter"]: + raise NotImplementedError( + "Only the classes Scheduler, Forecaster and Reporters are DataGenerator's." + ) + + if not self.model: + raise NotImplementedError( + "There's no DataGenerator class defined in this DataSource." + ) + + types = current_app.data_generators + + if all( + [self.model not in current_app.data_generators[_type] for _type in types] + ): + raise NotImplementedError( + "DataGenerator `{self.model}` not registered in this FlexMeasures instance." + ) + + # fetch DataGenerator details + data_generator_details = self.attributes.get("data_generator", {}) + config = data_generator_details.get("config", {}) + parameters = data_generator_details.get("parameters", {}) + + # create DataGenerator class and add the parameters + data_generator = current_app.data_generators[self.type][self.model]( + config=config + ) + data_generator._parameters = parameters + + # assign the current DataSource (self) as its source + data_generator._data_source = self + + self._data_generator = data_generator + + return self._data_generator + @property def label(self): """Human-readable label (preferably not starting with a capital letter, so it can be used in a sentence).""" diff --git a/flexmeasures/data/models/reporting/__init__.py b/flexmeasures/data/models/reporting/__init__.py index 0364dc678..12d996f9f 100644 --- a/flexmeasures/data/models/reporting/__init__.py +++ b/flexmeasures/data/models/reporting/__init__.py @@ -1,149 +1,55 @@ from __future__ import annotations -from typing import Optional, Union, Dict -from datetime import datetime, timedelta -import pandas as pd +from copy import deepcopy -from flexmeasures.data.schemas.reporting import ReporterConfigSchema -from flexmeasures.data.models.time_series import Sensor -from flexmeasures.data.models.data_sources import DataGeneratorMixin +from typing import List, Dict, Any +from flexmeasures.data.models.data_sources import DataGenerator -import timely_beliefs as tb +from flexmeasures.data.schemas.reporting import ( + ReporterParametersSchema, + ReporterConfigSchema, +) -class Reporter(DataGeneratorMixin): +class Reporter(DataGenerator): """Superclass for all FlexMeasures Reporters.""" __version__ = None __author__ = None - __data_generator_base__ = "Reporter" + __data_generator_base__ = "reporter" - sensor: Sensor = None + _parameters_schema = ReporterParametersSchema() + _config_schema = ReporterConfigSchema() - reporter_config: Optional[dict] = None - reporter_config_raw: Optional[dict] = None - schema = ReporterConfigSchema() - data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None - - def __init__( - self, sensor: Sensor, reporter_config_raw: Optional[dict] = None - ) -> None: - """ - Initialize a new Reporter. - - Attributes: - :param sensor: sensor where the output of the reporter will be saved to. - :param reporter_config_raw: dictionary with the serialized configuration of the reporter. - """ - - self.sensor = sensor - - if not reporter_config_raw: - reporter_config_raw = {} - - self.reporter_config_raw = reporter_config_raw - - def fetch_data( - self, - start: datetime, - end: datetime, - input_resolution: timedelta = None, - belief_time: datetime = None, - ): - """ - Fetches the time_beliefs from the database - """ - - self.data = {} - for tb_query in self.beliefs_search_configs: - _tb_query = tb_query.copy() - # using start / end instead of event_starts_after/event_ends_before when not defined - event_starts_after = _tb_query.pop("event_starts_after", start) - event_ends_before = _tb_query.pop("event_ends_before", end) - resolution = _tb_query.pop("resolution", input_resolution) - belief_time = _tb_query.pop("belief_time", belief_time) - - sensor: Sensor = _tb_query.pop("sensor", None) - alias: str = _tb_query.pop("alias", None) - - bdf = sensor.search_beliefs( - event_starts_after=event_starts_after, - event_ends_before=event_ends_before, - resolution=resolution, - beliefs_before=belief_time, - **_tb_query, - ) - - # store data source as local variable - for source in bdf.sources.unique(): - self.data[f"source_{source.id}"] = source - - # store BeliefsDataFrame as local variable - if alias: - self.data[alias] = bdf - else: - self.data[f"sensor_{sensor.id}"] = bdf - - def update_attribute(self, attribute, default): - if default is not None: - setattr(self, attribute, default) - - def compute( - self, - start: datetime, - end: datetime, - input_resolution: timedelta | None = None, - belief_time: datetime | None = None, - **kwargs, - ) -> tb.BeliefsDataFrame: + def _compute(self, **kwargs) -> List[Dict[str, Any]]: """This method triggers the creation of a new report. - The same object can generate multiple reports with different start, end, input_resolution + The same object can generate multiple reports with different start, end, resolution and belief_time values. - - In the future, this function will parse arbitrary input arguments defined in a schema. """ - # deserialize configuration - if self.reporter_config is None: - self.deserialize_config() - - if input_resolution is None: - input_resolution = self.sensor.event_resolution - - # fetch data - self.fetch_data(start, end, input_resolution, belief_time) - - # Result - result: tb.BeliefsDataFrame = self._compute( - start, end, input_resolution, belief_time - ) + results: List[Dict[str, Any]] = self._compute_report(**kwargs) - # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor - assert ( - self.sensor.event_resolution == result.event_resolution - ), f"The resolution of the results ({result.event_resolution}) should match that of the output sensor ({self.sensor.event_resolution}, ID {self.sensor.id})." + for result in results: + # checking that the event_resolution of the output BeliefDataFrame is equal to the one of the output sensor + assert ( + result["sensor"].event_resolution == result["data"].event_resolution + ), f"The resolution of the results ({result['data'].event_resolution}) should match that of the output sensor ({result['sensor'].event_resolution}, ID {result['sensor'].id})." - # Assign sensor to BeliefDataFrame - result.sensor = self.sensor + # Assign sensor to BeliefDataFrame + result["data"].sensor = result["sensor"] - if result.empty: - return result + if not result["data"].empty: + # update data source + result["data"].index = result["data"].index.set_levels( + [self.data_source] * len(result["data"]), + level="source", + verify_integrity=False, + ) - # update data source - result.index = result.index.set_levels( - [self.data_source] * len(result), level="source", verify_integrity=False - ) + return results - return result - - def _compute( - self, - start: datetime, - end: datetime, - input_resolution: timedelta = None, - belief_time: datetime = None, - ) -> tb.BeliefsDataFrame: + def _compute_report(self, **kwargs) -> List[Dict[str, Any]]: """ Overwrite with the actual computation of your report. @@ -151,20 +57,22 @@ def _compute( """ raise NotImplementedError() - def deserialize_config(self): - """ - Validate the report config against a Marshmallow Schema. - Ideas: - - Override this method - - Call superclass method to apply validation and common variables deserialization (see PandasReporter) - - (Partially) extract the relevant reporter_config parameters into class attributes. + def _clean_parameters(self, parameters: dict) -> dict: + _parameters = deepcopy(parameters) + fields_to_remove = ["start", "end", "resolution", "belief_time"] - Raises ValidationErrors or ValueErrors. - """ + for field in fields_to_remove: + _parameters.pop(field, None) + + fields_to_remove_input = [ + "event_starts_after", + "event_ends_before", + "belief_time", + "resolution", + ] + + for _input in _parameters["input"]: + for field in fields_to_remove_input: + _input.pop(field, None) - self.reporter_config = self.schema.load( - self.reporter_config_raw - ) # validate reporter config - self.beliefs_search_configs = self.reporter_config.get( - "beliefs_search_configs" - ) # extracting TimeBelief query configuration parameters + return _parameters diff --git a/flexmeasures/data/models/reporting/aggregator.py b/flexmeasures/data/models/reporting/aggregator.py index e74596ae0..8fb362129 100644 --- a/flexmeasures/data/models/reporting/aggregator.py +++ b/flexmeasures/data/models/reporting/aggregator.py @@ -1,12 +1,15 @@ from __future__ import annotations from datetime import datetime, timedelta +from typing import Any, List, Dict -import timely_beliefs as tb import pandas as pd from flexmeasures.data.models.reporting import Reporter -from flexmeasures.data.schemas.reporting.aggregation import AggregatorSchema +from flexmeasures.data.schemas.reporting.aggregation import ( + AggregatorConfigSchema, + AggregatorParametersSchema, +) from flexmeasures.utils.time_utils import server_now @@ -16,53 +19,63 @@ class AggregatorReporter(Reporter): __version__ = "1" __author__ = "Seita" - schema = AggregatorSchema() - weights: dict - method: str - def deserialize_config(self): - # call Reporter deserialize_config - super().deserialize_config() + _config_schema = AggregatorConfigSchema() + _parameters_schema = AggregatorParametersSchema() - # extract AggregatorReporter specific fields - self.method = self.reporter_config.get("method") - self.weights = self.reporter_config.get("weights", dict()) + weights: dict + method: str - def _compute( + def _compute_report( self, start: datetime, end: datetime, - input_resolution: timedelta | None = None, + input: List[Dict[str, Any]], + output: List[Dict[str, Any]], + resolution: timedelta | None = None, belief_time: datetime | None = None, - ) -> tb.BeliefsDataFrame: + ) -> List[Dict[str, Any]]: """ This method merges all the BeliefDataFrames into a single one, dropping all indexes but event_start, and applies an aggregation function over the columns. """ + method: str = self._config.get("method") + weights: list = self._config.get("weights", {}) + dataframes = [] if belief_time is None: belief_time = server_now() - for belief_search_config in self.beliefs_search_configs: - # if alias is not in belief_search_config, using the Sensor id instead - column_name = belief_search_config.get( - "alias", f"sensor_{belief_search_config['sensor'].id}" + for input_description in input: + # if name is not in belief_search_config, using the Sensor id instead + column_name = input_description.get( + "name", f"sensor_{input_description['sensor'].id}" + ) + + df = ( + input_description["sensor"] + .search_beliefs( + event_starts_after=start, + event_ends_before=end, + resolution=resolution, + beliefs_before=belief_time, + ) + .droplevel([1, 2, 3]) ) - data = self.data[column_name].droplevel([1, 2, 3]) # apply weight - if column_name in self.weights: - data *= self.weights[column_name] + if column_name in weights: + df *= weights[column_name] - dataframes.append(data) + dataframes.append(df) output_df = pd.concat(dataframes, axis=1) # apply aggregation method - output_df = output_df.aggregate(self.method, axis=1) + output_df = output_df.aggregate(method, axis=1) # convert BeliefsSeries into a BeliefsDataFrame output_df = output_df.to_frame("event_value") @@ -74,4 +87,11 @@ def _compute( ["belief_time", "source", "cumulative_probability"], append=True ) - return output_df + return [ + { + "name": "aggregate", + "column": "event_value", + "sensor": output[0]["sensor"], + "data": output_df, + } + ] diff --git a/flexmeasures/data/models/reporting/pandas_reporter.py b/flexmeasures/data/models/reporting/pandas_reporter.py index 1e1e98179..7c8fdaba8 100644 --- a/flexmeasures/data/models/reporting/pandas_reporter.py +++ b/flexmeasures/data/models/reporting/pandas_reporter.py @@ -1,15 +1,18 @@ from __future__ import annotations -from typing import Any +from typing import Any, Union, Dict, List from datetime import datetime, timedelta +from copy import deepcopy, copy from flask import current_app import timely_beliefs as tb - +import pandas as pd from flexmeasures.data.models.reporting import Reporter from flexmeasures.data.schemas.reporting.pandas_reporter import ( PandasReporterConfigSchema, + PandasReporterParametersSchema, ) +from flexmeasures.data.models.time_series import Sensor from flexmeasures.utils.time_utils import server_now @@ -18,70 +21,147 @@ class PandasReporter(Reporter): __version__ = "1" __author__ = "Seita" - schema = PandasReporterConfigSchema() + + _config_schema = PandasReporterConfigSchema() + _parameters_schema = PandasReporterParametersSchema() + + input: list[str] = None transformations: list[dict[str, Any]] = None final_df_output: str = None - def deserialize_config(self): - # call super class deserialize_config - super().deserialize_config() + data: Dict[str, Union[tb.BeliefsDataFrame, pd.DataFrame]] = None - # extract PandasReporter specific fields - self.transformations = self.reporter_config.get("transformations") - self.final_df_output = self.reporter_config.get("final_df_output") - - def _compute( + def fetch_data( self, start: datetime, end: datetime, - input_resolution: timedelta | None = None, + input: dict, + resolution: timedelta | None = None, belief_time: datetime | None = None, - ) -> tb.BeliefsDataFrame: + ): + """ + Fetches the time_beliefs from the database + """ + + self.data = {} + for input_search_parameters in input: + _input_search_parameters = input_search_parameters.copy() + + sensor: Sensor = _input_search_parameters.pop("sensor", None) + + name = _input_search_parameters.pop("name", f"sensor_{sensor.id}") + + # using start / end instead of event_starts_after/event_ends_before when not defined + event_starts_after = _input_search_parameters.pop( + "event_starts_after", start + ) + event_ends_before = _input_search_parameters.pop("event_ends_before", end) + resolution = _input_search_parameters.pop("resolution", resolution) + belief_time = _input_search_parameters.pop("belief_time", belief_time) + + bdf = sensor.search_beliefs( + event_starts_after=event_starts_after, + event_ends_before=event_ends_before, + resolution=resolution, + beliefs_before=belief_time, + **_input_search_parameters, + ) + + # store data source as local variable + for source in bdf.sources.unique(): + self.data[f"source_{source.id}"] = source + + # store BeliefsDataFrame as local variable + self.data[name] = bdf + + def _compute_report(self, **kwargs) -> List[Dict[str, Any]]: """ This method applies the transformations and outputs the dataframe defined in `final_df_output` field of the report_config. """ + # report configuration + start: datetime = kwargs.get("start") + end: datetime = kwargs.get("end") + input: dict = kwargs.get("input") + + resolution: timedelta | None = kwargs.get("resolution", None) + belief_time: datetime | None = kwargs.get("belief_time", None) + output: List[Dict[str, Any]] = kwargs.get("output") + + # by default, use the minimum resolution among the output sensors + if resolution is None: + resolution = min([o["sensor"].event_resolution for o in output]) + + # fetch sensor data + self.fetch_data(start, end, input, resolution, belief_time) + if belief_time is None: belief_time = server_now() # apply pandas transformations to the dataframes in `self.data` self._apply_transformations() - final_output = self.data[self.final_df_output] + results = [] - if isinstance(final_output, tb.BeliefsDataFrame): + for output_description in output: + result = copy(output_description) - # filing the missing indexes with default values: - # belief_time=belief_time, cummulative_probability=0.5, source=data_source - if "belief_time" not in final_output.index.names: - final_output["belief_time"] = [belief_time] * len(final_output) - final_output = final_output.set_index("belief_time", append=True) + name = output_description["name"] - if "cumulative_probability" not in final_output.index.names: - final_output["cumulative_probability"] = [0.5] * len(final_output) - final_output = final_output.set_index( - "cumulative_probability", append=True - ) + output_data = self.data[name] - if "source" not in final_output.index.names: - final_output["source"] = [self.data_source] * len(final_output) - final_output = final_output.set_index("source", append=True) + if isinstance(output_data, tb.BeliefsDataFrame): + # if column is missing, use the first column + column = output_description.get("column", output_data.columns[0]) + output_data = output_data.rename(columns={column: "event_value"})[ + ["event_value"] + ] + output_data = self._clean_belief_dataframe(output_data, belief_time) - final_output = final_output.reorder_levels( - tb.BeliefsDataFrame().index.names - ) + elif isinstance(output_data, tb.BeliefsSeries): + output_data = self._clean_belief_series(output_data, belief_time) - elif isinstance(final_output, tb.BeliefsSeries): - final_output = final_output.to_frame("event_value") - final_output["belief_time"] = belief_time - final_output["cumulative_probability"] = 0.5 - final_output["source"] = self.data_source - final_output = final_output.set_index( - ["belief_time", "source", "cumulative_probability"], append=True - ) + result["data"] = output_data + + results.append(result) + + return results + + def _clean_belief_series( + self, belief_series: tb.BeliefsSeries, belief_time: datetime + ) -> tb.BeliefsDataFrame: + """Create a BeliefDataFrame from a BeliefsSeries creating the necessary indexes.""" - return final_output + belief_series = belief_series.to_frame("event_value") + belief_series["belief_time"] = belief_time + belief_series["cumulative_probability"] = 0.5 + belief_series["source"] = self.data_source + belief_series = belief_series.set_index( + ["belief_time", "source", "cumulative_probability"], append=True + ) + + return belief_series + + def _clean_belief_dataframe( + self, bdf: tb.BeliefsDataFrame, belief_time: datetime + ) -> tb.BeliefsDataFrame: + """Add missing indexes to build a proper BeliefDataFrame.""" + + # filing the missing indexes with default values: + if "belief_time" not in bdf.index.names: + bdf["belief_time"] = [belief_time] * len(bdf) + bdf = bdf.set_index("belief_time", append=True) + + if "cumulative_probability" not in bdf.index.names: + bdf["cumulative_probability"] = [0.5] * len(bdf) + bdf = bdf.set_index("cumulative_probability", append=True) + + if "source" not in bdf.index.names: + bdf["source"] = [self.data_source] * len(bdf) + bdf = bdf.set_index("source", append=True) + + return bdf def get_object_or_literal(self, value: Any, method: str) -> Any: """This method allows using the dataframes as inputs of the Pandas methods that @@ -154,7 +234,9 @@ def _apply_transformations(self): previous_df = None - for transformation in self.transformations: + for _transformation in self._config.get("transformations"): + transformation = deepcopy(_transformation) + df_input = transformation.get( "df_input", previous_df ) # default is using the previous transformation output diff --git a/flexmeasures/data/models/reporting/tests/conftest.py b/flexmeasures/data/models/reporting/tests/conftest.py index a75a1a465..b8e9d74cc 100644 --- a/flexmeasures/data/models/reporting/tests/conftest.py +++ b/flexmeasures/data/models/reporting/tests/conftest.py @@ -33,10 +33,26 @@ def setup_dummy_data(db, app): db.session.add(sensor1) sensor2 = Sensor("sensor 2", generic_asset=dummy_asset, event_resolution="1h") db.session.add(sensor2) + sensor3 = Sensor( + "sensor 3", + generic_asset=dummy_asset, + event_resolution="1h", + timezone="Europe/Amsterdam", + ) + db.session.add(sensor3) + report_sensor = Sensor( "report sensor", generic_asset=pandas_report, event_resolution="1h" ) db.session.add(report_sensor) + daily_report_sensor = Sensor( + "daily report sensor", + generic_asset=pandas_report, + event_resolution="1D", + timezone="Europe/Amsterdam", + ) + + db.session.add(daily_report_sensor) """ Create 2 DataSources @@ -77,18 +93,31 @@ def setup_dummy_data(db, app): ) ) + # add simple data for testing DST transition + for t in range(24 * 4): # create data for 4 days + # UTC+1 -> UTC+2 + beliefs.append( + TimedBelief( + event_start=datetime(2023, 3, 24, tzinfo=utc) + timedelta(hours=t), + belief_horizon=timedelta(hours=24), + event_value=t, + sensor=sensor3, + source=source1, + ) + ) + + # UTC+2 -> UTC+1 + beliefs.append( + TimedBelief( + event_start=datetime(2023, 10, 27, tzinfo=utc) + timedelta(hours=t), + belief_horizon=timedelta(hours=24), + event_value=t, + sensor=sensor3, + source=source1, + ) + ) + db.session.add_all(beliefs) db.session.commit() - yield sensor1, sensor2, report_sensor - - db.session.delete(sensor1) - db.session.delete(sensor2) - - for b in beliefs: - db.session.delete(b) - - db.session.delete(dummy_asset) - db.session.delete(dummy_asset_type) - - db.session.commit() + yield sensor1, sensor2, sensor3, report_sensor, daily_report_sensor diff --git a/flexmeasures/data/models/reporting/tests/test_aggregator.py b/flexmeasures/data/models/reporting/tests/test_aggregator.py index 8cd287fa4..6c7b06237 100644 --- a/flexmeasures/data/models/reporting/tests/test_aggregator.py +++ b/flexmeasures/data/models/reporting/tests/test_aggregator.py @@ -1,9 +1,11 @@ import pytest from flexmeasures.data.models.reporting.aggregator import AggregatorReporter - +from flexmeasures.data.models.data_sources import DataSource from datetime import datetime -from pytz import utc +from pytz import utc, timezone + +import pandas as pd @pytest.mark.parametrize( @@ -34,27 +36,120 @@ def test_aggregator(setup_dummy_data, aggregation_method, expected_value): 7) prod: -1 = (1) * (-1) 8) median: even number of elements, mean of the most central elements, 0 = ((1) + (-1))/2 """ - s1, s2, reporter_sensor = setup_dummy_data + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data - reporter_config_raw = dict( - beliefs_search_configs=[ - dict(sensor=s1.id, source=1), - dict(sensor=s2.id, source=2), - ], - method=aggregation_method, - ) + agg_reporter = AggregatorReporter(method=aggregation_method) - agg_reporter = AggregatorReporter( - reporter_sensor, reporter_config_raw=reporter_config_raw - ) + source_1 = DataSource.query.get(1) result = agg_reporter.compute( + input=[dict(sensor=s1, source=source_1), dict(sensor=s2, source=source_1)], + output=[dict(sensor=report_sensor)], start=datetime(2023, 5, 10, tzinfo=utc), end=datetime(2023, 5, 11, tzinfo=utc), - ) + )[0]["data"] # check that we got a result for 24 hours assert len(result) == 24 # check that the value is equal to expected_value assert (result == expected_value).all().event_value + + +@pytest.mark.parametrize( + "weight_1, weight_2, expected_result", + [(1, 1, 0), (1, -1, 2), (2, 0, 2), (0, 2, -2)], +) +def test_aggregator_reporter_weights( + setup_dummy_data, weight_1, weight_2, expected_result +): + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + + reporter_config = dict(method="sum", weights={"s1": weight_1, "sensor_2": weight_2}) + + source_1 = DataSource.query.get(1) + source_2 = DataSource.query.get(1) + + agg_reporter = AggregatorReporter(config=reporter_config) + + result = agg_reporter.compute( + input=[ + dict(name="s1", sensor=s1, source=source_1), + dict(sensor=s2, source=source_2), + ], + output=[dict(sensor=report_sensor)], + start=datetime(2023, 5, 10, tzinfo=utc), + end=datetime(2023, 5, 11, tzinfo=utc), + )[0]["data"] + + # check that we got a result for 24 hours + assert len(result) == 24 + + # check that the value is equal to expected_value + assert (result == expected_result).all().event_value + + +def test_dst_transition(setup_dummy_data): + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + + agg_reporter = AggregatorReporter() + + tz = timezone("Europe/Amsterdam") + + # transition from winter (CET) to summer (CEST) + result = agg_reporter.compute( + input=[dict(sensor=s3, source=DataSource.query.get(1))], + output=[dict(sensor=report_sensor)], + start=tz.localize(datetime(2023, 3, 26)), + end=tz.localize(datetime(2023, 3, 27)), + belief_time=tz.localize(datetime(2023, 12, 1)), + )[0]["data"] + + assert len(result) == 23 + + # transition from summer (CEST) to winter (CET) + result = agg_reporter.compute( + input=[dict(sensor=s3, source=DataSource.query.get(1))], + output=[dict(sensor=report_sensor)], + start=tz.localize(datetime(2023, 10, 29)), + end=tz.localize(datetime(2023, 10, 30)), + belief_time=tz.localize(datetime(2023, 12, 1)), + )[0]["data"] + + assert len(result) == 25 + + +def test_resampling(setup_dummy_data): + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + + agg_reporter = AggregatorReporter() + + tz = timezone("Europe/Amsterdam") + + # transition from winter (CET) to summer (CEST) + result = agg_reporter.compute( + start=tz.localize(datetime(2023, 3, 27)), + end=tz.localize(datetime(2023, 3, 28)), + input=[dict(sensor=s3, source=DataSource.query.get(1))], + output=[dict(sensor=daily_report_sensor, source=DataSource.query.get(1))], + belief_time=tz.localize(datetime(2023, 12, 1)), + resolution=pd.Timedelta("1D"), + )[0]["data"] + + assert result.event_starts[0] == pd.Timestamp( + year=2023, month=3, day=27, tz="Europe/Amsterdam" + ) + + # transition from summer (CEST) to winter (CET) + result = agg_reporter.compute( + start=tz.localize(datetime(2023, 10, 29)), + end=tz.localize(datetime(2023, 10, 30)), + input=[dict(sensor=s3, source=DataSource.query.get(1))], + output=[dict(sensor=daily_report_sensor, source=DataSource.query.get(1))], + belief_time=tz.localize(datetime(2023, 12, 1)), + resolution=pd.Timedelta("1D"), + )[0]["data"] + + assert result.event_starts[0] == pd.Timestamp( + year=2023, month=10, day=29, tz="Europe/Amsterdam" + ) diff --git a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py index 07f7dac9d..64c8ba7fd 100644 --- a/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_pandas_reporter.py @@ -6,10 +6,11 @@ def test_reporter(app, setup_dummy_data): - s1, s2, reporter_sensor = setup_dummy_data + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data - reporter_config_raw = dict( - beliefs_search_configs=[dict(sensor=s1.id), dict(sensor=s2.id)], + reporter_config = dict( + required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], + required_output=[{"name": "df_merge"}], transformations=[ dict( df_input="sensor_1", @@ -36,19 +37,22 @@ def test_reporter(app, setup_dummy_data): dict(method="mean"), dict(method="sum", kwargs=dict(axis=1)), ], - final_df_output="df_merge", ) - reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) + reporter = PandasReporter(config=reporter_config) start = datetime(2023, 4, 10, tzinfo=utc) end = datetime(2023, 4, 10, 10, tzinfo=utc) - report1 = reporter.compute(start, end) + input = [dict(name="sensor_1", sensor=s1), dict(name="sensor_2", sensor=s2)] + output = [dict(name="df_merge", sensor=report_sensor)] - assert len(report1) == 5 - assert str(report1.event_starts[0]) == "2023-04-10 00:00:00+00:00" + report1 = reporter.compute(start=start, end=end, input=input, output=output) + result = report1[0]["data"] + + assert len(result) == 5 + assert str(result.event_starts[0]) == "2023-04-10 00:00:00+00:00" assert ( - report1.sensor == reporter_sensor + result.sensor == report_sensor ) # check that the output sensor is effectively assigned. data_source_name = app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") @@ -56,33 +60,27 @@ def test_reporter(app, setup_dummy_data): assert all( (source.name == data_source_name) and (source.type == data_source_type) - for source in report1.sources + for source in result.sources ) # check data source is assigned # check that calling compute with different parameters changes the result - report3 = reporter.compute(start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end) - assert len(report3) == 4 - assert str(report3.event_starts[0]) == "2023-04-10 02:00:00+00:00" + report2 = reporter.compute( + start=datetime(2023, 4, 10, 3, tzinfo=utc), end=end, input=input, output=output + ) + result2 = report2[0]["data"] + + assert len(result2) == 4 + assert str(result2.event_starts[0]) == "2023-04-10 02:00:00+00:00" def test_reporter_repeated(setup_dummy_data): """check that calling compute doesn't change the result""" - s1, s2, reporter_sensor = setup_dummy_data + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data - reporter_config_raw = dict( - beliefs_search_configs=[ - dict( - sensor=s1.id, - event_starts_after="2023-04-10T00:00:00 00:00", - event_ends_before="2023-04-10T10:00:00 00:00", - ), - dict( - sensor=s2.id, - event_starts_after="2023-04-10T00:00:00 00:00", - event_ends_before="2023-04-10T10:00:00 00:00", - ), - ], + reporter_config = dict( + required_input=[{"name": "sensor_1"}, {"name": "sensor_2"}], + required_output=[{"name": "df_merge"}], transformations=[ dict( df_input="sensor_1", @@ -109,14 +107,55 @@ def test_reporter_repeated(setup_dummy_data): dict(method="mean"), dict(method="sum", kwargs=dict(axis=1)), ], - final_df_output="df_merge", ) - reporter = PandasReporter(reporter_sensor, reporter_config_raw=reporter_config_raw) - start = datetime(2023, 4, 10, tzinfo=utc) - end = datetime(2023, 4, 10, 10, tzinfo=utc) + parameters = dict( + start="2023-04-10T00:00:00 00:00", + end="2023-04-10T10:00:00 00:00", + input=[ + dict(name="sensor_1", sensor=s1.id), + dict(name="sensor_2", sensor=s2.id), + ], + output=[dict(name="df_merge", sensor=report_sensor.id)], + ) - report1 = reporter.compute(start=start, end=end) - report2 = reporter.compute(start=start, end=end) + reporter = PandasReporter(config=reporter_config) + + report1 = reporter.compute(parameters=parameters) + report2 = reporter.compute(parameters=parameters) + + assert all(report2[0]["data"].values == report1[0]["data"].values) + + +def test_reporter_empty(setup_dummy_data): + """check that calling compute with missing data returns an empty report""" + s1, s2, s3, report_sensor, daily_report_sensor = setup_dummy_data + + config = dict( + required_input=[{"name": "sensor_1"}], + required_output=[{"name": "sensor_1"}], + transformations=[], + ) + + reporter = PandasReporter(config=config) + + # compute report on available data + report = reporter.compute( + start=datetime(2023, 4, 10, tzinfo=utc), + end=datetime(2023, 4, 10, 10, tzinfo=utc), + input=[dict(name="sensor_1", sensor=s1)], + output=[dict(name="sensor_1", sensor=report_sensor)], + ) + + assert not report[0]["data"].empty + + # compute report on dates with no data available + report = reporter.compute( + sensor=report_sensor, + start=datetime(2021, 4, 10, tzinfo=utc), + end=datetime(2021, 4, 10, 10, tzinfo=utc), + input=[dict(name="sensor_1", sensor=s1)], + output=[dict(name="sensor_1", sensor=report_sensor)], + ) - assert all(report2.values == report1.values) + assert report[0]["data"].empty diff --git a/flexmeasures/data/models/reporting/tests/test_reporter.py b/flexmeasures/data/models/reporting/tests/test_reporter.py deleted file mode 100644 index 82bf7a7a3..000000000 --- a/flexmeasures/data/models/reporting/tests/test_reporter.py +++ /dev/null @@ -1,40 +0,0 @@ -from datetime import datetime -from pytz import utc -import timely_beliefs as tb - -from flexmeasures.data.models.reporting import Reporter -from flexmeasures.data.models.time_series import Sensor - - -def test_reporter_empty(setup_dummy_data): - """check that calling compute with missing data returns an empty report""" - - class DummyReporter(Reporter): - def __init__(self, sensor: Sensor, input_sensor: Sensor) -> None: - reporter_config_raw = dict( - beliefs_search_configs=[ - dict(sensor=input_sensor.id, alias="input_sensor") - ] - ) - super().__init__(sensor, reporter_config_raw) - - def _compute(self, *args, **kwargs) -> tb.BeliefsDataFrame: - return self.data["input_sensor"] - - s1, s2, reporter_sensor = setup_dummy_data - - reporter = DummyReporter(reporter_sensor, s1) - - # compute report on available data - report = reporter.compute( - datetime(2023, 4, 10, tzinfo=utc), datetime(2023, 4, 10, 10, tzinfo=utc) - ) - - assert not report.empty - - # compute report on dates with no data available - report = reporter.compute( - datetime(2021, 4, 10, tzinfo=utc), datetime(2021, 4, 10, 10, tzinfo=utc) - ) - - assert report.empty diff --git a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py index 30171da86..23b5f57b6 100644 --- a/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py +++ b/flexmeasures/data/models/reporting/tests/test_tibber_reporter.py @@ -1,3 +1,4 @@ +from __future__ import annotations import pytest from flexmeasures.data.models.reporting.pandas_reporter import PandasReporter @@ -67,70 +68,37 @@ ] # cents/kWh -class TibberReporter(PandasReporter): - def __init__(self, sensor) -> None: - """This class calculates the price of energy of a tariff indexed to the Day Ahead prices. - Energy Price = (1 + VAT) x ( EnergyTax + Tiber + DA Prices) - """ - - # search the sensors - EnergyTax = Sensor.query.filter(Sensor.name == "EnergyTax").one_or_none() - VAT = Sensor.query.filter(Sensor.name == "VAT").one_or_none() - tibber_tariff = Sensor.query.filter( - Sensor.name == "Tibber Tariff" - ).one_or_none() - - da_prices = Sensor.query.filter(Sensor.name == "DA prices").one_or_none() - - # create the PandasReporter reporter config - reporter_config = dict( - beliefs_search_configs=[ - dict(sensor=EnergyTax.id, alias="energy_tax_df"), - dict(sensor=VAT.id), - dict(sensor=tibber_tariff.id), - dict(sensor=da_prices.id), - ], - transformations=[ - dict( - df_input="sensor_1", - df_output="VAT", - method="droplevel", - args=[[1, 2, 3]], - ), - dict(method="add", args=[1]), # this is to get 1 + VAT - dict( - df_input="energy_tax_df", - df_output="EnergyTax", - method="droplevel", - args=[[1, 2, 3]], - ), - dict( - df_input="sensor_3", - df_output="tibber_tariff", - method="droplevel", - args=[[1, 2, 3]], - ), - dict( - df_input="sensor_4", - df_output="da_prices", - method="droplevel", - args=[[1, 2, 3]], - ), - dict( - method="add", args=["@tibber_tariff"] - ), # da_prices = da_prices + tibber_tariff - dict( - method="add", args=["@EnergyTax"] - ), # da_prices = da_prices + EnergyTax - dict( - method="multiply", args=["@VAT"] - ), # da_prices = da_price * VAT, VAT - dict(method="round"), - ], - final_df_output="da_prices", - ) - - super().__init__(sensor, reporter_config) +pandas_reporter_config = dict( + required_input=[{"name": v} for v in ["energy_tax", "VAT", "tariff", "da_prices"]], + required_output=[{"name": "da_prices"}], + transformations=[ + dict( + df_input="VAT", + method="droplevel", + args=[[1, 2, 3]], + ), + dict(method="add", args=[1]), # this is to get 1 + VAT + dict( + df_input="energy_tax", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="tariff", + method="droplevel", + args=[[1, 2, 3]], + ), + dict( + df_input="da_prices", + method="droplevel", + args=[[1, 2, 3]], + ), + dict(method="add", args=["@tariff"]), # da_prices = da_prices + tibber_tariff + dict(method="add", args=["@energy_tax"]), # da_prices = da_prices + energy_tax + dict(method="multiply", args=["@VAT"]), # da_prices = da_price * VAT, VAT + dict(method="round"), + ], +) def beliefs_from_timeseries(index, values, sensor, source): @@ -234,7 +202,9 @@ def tibber_test_data(fresh_db, app): ) db.session.add(tibber_report_sensor) - return tibber_report_sensor + db.session.commit() + + return tibber_report_sensor, EnergyTax, VAT, tibber_tariff, da_prices def test_tibber_reporter(tibber_test_data): @@ -243,14 +213,21 @@ def test_tibber_reporter(tibber_test_data): displayed in Tibber's App. """ - tibber_report_sensor = tibber_test_data + tibber_report_sensor, EnergyTax, VAT, tibber_tariff, da_prices = tibber_test_data - tibber_reporter = TibberReporter(tibber_report_sensor) + tibber_reporter = PandasReporter(config=pandas_reporter_config) result = tibber_reporter.compute( + input=[ + {"name": "energy_tax", "sensor": EnergyTax}, + {"name": "VAT", "sensor": VAT}, + {"name": "tariff", "sensor": tibber_tariff}, + {"name": "da_prices", "sensor": da_prices}, + ], + output=[dict(sensor=tibber_report_sensor, name="da_prices")], start=datetime(2023, 4, 13, tzinfo=utc), end=datetime(2023, 4, 14, tzinfo=utc), - ) + )[0]["data"] # check that we got a result for 24 hours assert len(result) == 24 diff --git a/flexmeasures/data/schemas/io.py b/flexmeasures/data/schemas/io.py new file mode 100644 index 000000000..34a8317e1 --- /dev/null +++ b/flexmeasures/data/schemas/io.py @@ -0,0 +1,52 @@ +from marshmallow import fields, Schema + +from flexmeasures.data.schemas.sensors import SensorIdField +from flexmeasures.data.schemas import AwareDateTimeField, DurationField +from flexmeasures.data.schemas.sources import DataSourceIdField + + +class RequiredInput(Schema): + name = fields.Str(required=True) + + +class Input(Schema): + """ + This schema implements the required fields to perform a TimedBeliefs search + using the method flexmeasures.data.models.time_series:TimedBelief.search_beliefs. + + It includes the field `name`, which is not part of the search query, for later reference of the belief. + """ + + name = fields.Str(required=False) + + sensor = SensorIdField(required=True) + source = DataSourceIdField() + + event_starts_after = AwareDateTimeField() + event_ends_before = AwareDateTimeField() + + belief_time = AwareDateTimeField() + + horizons_at_least = DurationField() + horizons_at_most = DurationField() + + source_types = fields.List(fields.Str()) + exclude_source_types = fields.List(fields.Str()) + most_recent_beliefs_only = fields.Boolean() + most_recent_events_only = fields.Boolean() + + one_deterministic_belief_per_event = fields.Boolean() + one_deterministic_belief_per_event_per_source = fields.Boolean() + resolution = DurationField() + sum_multiple = fields.Boolean() + + +class Output(Schema): + name = fields.Str(required=False) + column = fields.Str(required=False) + sensor = SensorIdField(required=True) + + +class RequiredOutput(Schema): + name = fields.Str(required=True) + column = fields.Str(required=False) diff --git a/flexmeasures/data/schemas/reporting/__init__.py b/flexmeasures/data/schemas/reporting/__init__.py index 7ceaa7152..ed5f08b4f 100644 --- a/flexmeasures/data/schemas/reporting/__init__.py +++ b/flexmeasures/data/schemas/reporting/__init__.py @@ -4,6 +4,38 @@ from flexmeasures.data.schemas.sources import DataSourceIdField from flexmeasures.data.schemas import AwareDateTimeField, DurationField +from flexmeasures.data.schemas.io import Input, Output + + +class ReporterConfigSchema(Schema): + """ + This schema is used to validate Reporter class configurations (config). + Inherit from this class to extend this schema with your own parameters. + """ + + pass + + +class ReporterParametersSchema(Schema): + """ + This schema is used to validate the parameters to the method `compute` of + the Reporter class. + Inherit from this class to extend this schema with your own parameters. + """ + + input = fields.List( + fields.Nested(Input()), + required=True, + validator=validate.Length(min=1), + ) + + output = fields.List(fields.Nested(Output()), validate=validate.Length(min=1)) + + start = AwareDateTimeField(required=True) + end = AwareDateTimeField(required=True) + + resolution = DurationField(required=False) + belief_time = AwareDateTimeField(required=False) class BeliefsSearchConfigSchema(Schema): @@ -34,16 +66,3 @@ class BeliefsSearchConfigSchema(Schema): one_deterministic_belief_per_event_per_source = fields.Boolean() resolution = DurationField() sum_multiple = fields.Boolean() - - -class ReporterConfigSchema(Schema): - """ - This schema is used to validate Reporter class configurations (reporter_config). - Inherit from this to extend this schema with your own parameters. - """ - - beliefs_search_configs = fields.List( - fields.Nested(BeliefsSearchConfigSchema()), - required=True, - validator=validate.Length(min=1), - ) diff --git a/flexmeasures/data/schemas/reporting/aggregation.py b/flexmeasures/data/schemas/reporting/aggregation.py index a42c6d7b9..c83cee1dd 100644 --- a/flexmeasures/data/schemas/reporting/aggregation.py +++ b/flexmeasures/data/schemas/reporting/aggregation.py @@ -1,58 +1,66 @@ -from marshmallow import fields, ValidationError, validates_schema +from marshmallow import fields, ValidationError, validates_schema, validate -from flexmeasures.data.schemas.reporting import ReporterConfigSchema +from flexmeasures.data.schemas.reporting import ( + ReporterConfigSchema, + ReporterParametersSchema, +) +from flexmeasures.data.schemas.io import Output -class AggregatorSchema(ReporterConfigSchema): - """Schema for the reporter_config of the AggregatorReporter + +class AggregatorConfigSchema(ReporterConfigSchema): + """Schema for the AggregatorReporter configuration + + Example: + .. code-block:: json + { + "method" : "sum", + "weights" : { + "pv" : 1.0, + "consumption" : -1.0 + } + } + """ + + method = fields.Str(required=False, dump_default="sum", load_default="sum") + weights = fields.Dict(fields.Str(), fields.Float(), required=False) + + +class AggregatorParametersSchema(ReporterParametersSchema): + """Schema for the AggregatorReporter parameters Example: .. code-block:: json { - "beliefs_search_configs": [ + "input": [ { + "name" : "pv", "sensor": 1, "source" : 1, - "alias" : "pv" }, { + "name" : "consumption", "sensor": 1, "source" : 2, - "alias" : "consumption" } ], - "method" : "sum", - "weights" : { - "pv" : 1.0, - "consumption" : -1.0 - } + "output": [ + { + "sensor": 3, + } + ], + "start" : "2023-01-01T00:00:00+00:00", + "end" : "2023-01-03T00:00:00+00:00", } """ - method = fields.Str(required=False, dump_default="sum") - weights = fields.Dict(fields.Str(), fields.Float(), required=False) + # redefining output to restrict the output length to 1 + output = fields.List( + fields.Nested(Output()), validate=validate.Length(min=1, max=1) + ) @validates_schema def validate_source(self, data, **kwargs): - - for beliefs_search_config in data["beliefs_search_configs"]: - if "source" not in beliefs_search_config: + for input_description in data["input"]: + if "source" not in input_description: raise ValidationError("`source` is a required field.") - - @validates_schema - def validate_weights(self, data, **kwargs): - if "weights" not in data: - return - - # get aliases - aliases = [] - for beliefs_search_config in data["beliefs_search_configs"]: - if "alias" in beliefs_search_config: - aliases.append(beliefs_search_config.get("alias")) - - # check that the aliases in weights are defined - for alias in data.get("weights").keys(): - if alias not in aliases: - raise ValidationError( - f"alias `{alias}` in `weights` is not defined in `beliefs_search_config`" - ) diff --git a/flexmeasures/data/schemas/reporting/pandas_reporter.py b/flexmeasures/data/schemas/reporting/pandas_reporter.py index 3b076ddb2..6dcac010f 100644 --- a/flexmeasures/data/schemas/reporting/pandas_reporter.py +++ b/flexmeasures/data/schemas/reporting/pandas_reporter.py @@ -1,8 +1,13 @@ -from marshmallow import Schema, fields, ValidationError, validates_schema +from marshmallow import Schema, fields, ValidationError, validates_schema, validate from inspect import signature -from flexmeasures.data.schemas.reporting import ReporterConfigSchema +from flexmeasures.data.schemas import AwareDateTimeField +from flexmeasures.data.schemas.reporting import ( + ReporterConfigSchema, + ReporterParametersSchema, +) +from flexmeasures.data.schemas.io import RequiredInput, RequiredOutput from timely_beliefs import BeliefsDataFrame @@ -49,31 +54,39 @@ class PandasReporterConfigSchema(ReporterConfigSchema): Example: - { - "input_sensors" : [ - {"sensor" : 1, "alias" : "df1"} - ], - "transformations" : [ - { - "df_input" : "df1", - "df_output" : "df2", - "method" : "copy" - }, - { - "df_input" : "df2", - "df_output" : "df2", - "method" : "sum" - }, - { - "method" : "sum", - "kwargs" : {"axis" : 0} - } - ], - "final_df_output" : "df2" + { + "required_input" : [ + {"name" : "df1} + ], + "required_output" : [ + {"name" : "df2"} + ], + "transformations" : [ + { + "df_input" : "df1", + "df_output" : "df2", + "method" : "copy" + }, + { + "df_input" : "df2", + "df_output" : "df2", + "method" : "sum" + }, + { + "method" : "sum", + "kwargs" : {"axis" : 0} + } + ], + } """ + required_input = fields.List( + fields.Nested(RequiredInput()), validate=validate.Length(min=1) + ) + required_output = fields.List( + fields.Nested(RequiredOutput()), validate=validate.Length(min=1) + ) transformations = fields.List(fields.Nested(PandasMethodCall()), required=True) - final_df_output = fields.Str(required=True) @validates_schema def validate_chaining(self, data, **kwargs): @@ -82,38 +95,71 @@ def validate_chaining(self, data, **kwargs): final_df_output is computed. """ - # create dictionary data with objects of the types that is supposed to be generated - # loading the initial data, the sensors' data + # fake_data mocks the PandasReporter class attribute data. It contains empty BeliefsDataFrame + # to simulate the process of applying the transformations. fake_data = dict( - (f"sensor_{s['sensor'].id}", BeliefsDataFrame) - for s in data.get("beliefs_search_configs") + (_input["name"], BeliefsDataFrame) for _input in data.get("required_input") ) - final_df_output = data.get("final_df_output") + output_names = [_output["name"] for _output in data.get("required_output")] previous_df = None - final_df_output_method = None + output_method = dict() for transformation in data.get("transformations"): df_input = transformation.get("df_input", previous_df) df_output = transformation.get("df_output", df_input) - if df_output == final_df_output: - final_df_output_method = transformation.get("method") + if df_output in output_names: + output_method[df_output] = transformation.get("method") - if not previous_df and not df_input: + if df_input not in fake_data: raise ValidationError("Cannot find the input DataFrame.") previous_df = df_output # keeping last BeliefsDataFrame calculation fake_data[df_output] = BeliefsDataFrame - if final_df_output not in fake_data: - raise ValidationError( - "Cannot find final output DataFrame among the resulting DataFrames." - ) + for _output in output_names: + if _output not in fake_data: + raise ValidationError( + "Cannot find final output `{_output}` DataFrame among the resulting DataFrames." + ) - if final_df_output_method in ["resample", "groupby"]: - raise ValidationError( - "Final output type cannot by of type `Resampler` or `DataFrameGroupBy`" - ) + if (_output in output_method) and ( + output_method[_output] in ["resample", "groupby"] + ): + raise ValidationError( + f"Final output (`{_output}`) type cannot by of type `Resampler` or `DataFrameGroupBy`" + ) + + +class PandasReporterParametersSchema(ReporterParametersSchema): + # make start and end optional, conditional on providing the time parameters + # for the single sensors in `input_variables` + start = AwareDateTimeField(required=False) + end = AwareDateTimeField(required=False) + + @validates_schema + def validate_time_parameters(self, data, **kwargs): + """This method validates that all input sensors have start + and end parameters available. + """ + + # it's enough to provide a common start and end + if ("start" in data) and ("end" in data): + return + + for input_description in data.get("input", []): + input_sensor = input_description["sensor"] + if ("event_starts_after" not in input_description) and ( + "start" not in data + ): + raise ValidationError( + f"Start parameter not provided for sensor {input_sensor}" + ) + + if ("event_ends_before" not in input_description) and ("end" not in data): + raise ValidationError( + f"End parameter not provided for sensor {input_sensor}" + ) diff --git a/flexmeasures/data/schemas/tests/test_reporting.py b/flexmeasures/data/schemas/tests/test_reporting.py index 8d30e0013..ceaab864b 100644 --- a/flexmeasures/data/schemas/tests/test_reporting.py +++ b/flexmeasures/data/schemas/tests/test_reporting.py @@ -1,5 +1,6 @@ from flexmeasures.data.schemas.reporting.pandas_reporter import ( PandasReporterConfigSchema, + PandasReporterParametersSchema, ) from marshmallow.exceptions import ValidationError @@ -7,17 +8,12 @@ @pytest.mark.parametrize( - "reporter_config, is_valid", + "config, is_valid", [ ( { # this checks that the final_df_output dataframe is actually generated at some point of the processing pipeline - "beliefs_search_configs": [ - { - "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - ], + "required_input": [{"name": "sensor_1"}], + "required_output": [{"name": "final_output"}], "transformations": [ { "df_output": "final_output", @@ -25,85 +21,111 @@ "method": "copy", } ], - "final_df_output": "final_output", }, True, ), ( { # this checks that chaining works, applying the method copy on the previous dataframe - "beliefs_search_configs": [ - { - "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - ], + "required_input": [{"name": "sensor_1"}], + "required_output": [{"name": "final_output"}], "transformations": [ {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, {"method": "copy"}, {"df_output": "final_output", "method": "copy"}, ], - "final_df_output": "final_output", }, True, ), ( { # this checks that resample cannot be the last method being applied - "beliefs_search_configs": [ - { - "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - { - "sensor": 2, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - ], + "required_input": [{"name": "sensor_1"}, {"name": "sensor_2"}], + "required_output": [{"name": "final_output"}], "transformations": [ {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, {"method": "copy"}, {"df_output": "final_output", "method": "resample", "args": ["1h"]}, ], - "final_df_output": "final_output", }, False, ), ( { # this checks that resample cannot be the last method being applied - "beliefs_search_configs": [ - { - "sensor": 1, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - { - "sensor": 2, - "event_starts_after": "2022-01-01T00:00:00 00:00", - "event_ends_before": "2022-01-01T23:00:00 00:00", - }, - ], + "required_input": [{"name": "sensor_1"}, {"name": "sensor_2"}], + "required_output": [{"name": "final_output"}], "transformations": [ {"df_output": "output1", "df_input": "sensor_1", "method": "copy"}, {"method": "copy"}, {"df_output": "final_output", "method": "resample", "args": ["1h"]}, {"method": "sum"}, ], - "final_df_output": "final_output", }, True, ), ], ) -def test_pandas_reporter_schema( - reporter_config, is_valid, db, app, setup_dummy_sensors -): +def test_pandas_reporter_config_schema(config, is_valid, db, app, setup_dummy_sensors): schema = PandasReporterConfigSchema() if is_valid: - schema.load(reporter_config) + schema.load(config) + else: + with pytest.raises(ValidationError): + schema.load(config) + + +@pytest.mark.parametrize( + "parameters, is_valid", + [ + ( + { + "input": [ + { + "name": "sensor_1_df", + "sensor": 1, + } # we're describing how the named variables should be constructed, by defining search filters on the sensor data, rather than on the sensor + ], + "output": [ + {"name": "df2", "sensor": 2} + ], # sensor to save the output to + "start": "2023-06-06T00:00:00+02:00", + "end": "2023-06-06T00:00:00+02:00", + }, + True, + ), + ( # missing start and end + { + "input": [{"name": "sensor_1_df", "sensor": 1}], + "output": [{"name": "df2", "sensor": 2}], + }, + False, + ), + ( + { + "input": [ + { + "name": "sensor_1_df", + "sensor": 1, + "event_starts_after": "2023-06-07T00:00:00+02:00", + "event_ends_before": "2023-06-07T00:00:00+02:00", + } + ], + "output": [ + {"name": "df2", "sensor": 2} + ], # sensor to save the output to + }, + True, + ), + ], +) +def test_pandas_reporter_parameters_schema( + parameters, is_valid, db, app, setup_dummy_sensors +): + + schema = PandasReporterParametersSchema() + + if is_valid: + schema.load(parameters) else: with pytest.raises(ValidationError): - schema.load(reporter_config) + schema.load(parameters) diff --git a/flexmeasures/data/tests/conftest.py b/flexmeasures/data/tests/conftest.py index ebf057e39..8f2e37ad5 100644 --- a/flexmeasures/data/tests/conftest.py +++ b/flexmeasures/data/tests/conftest.py @@ -8,7 +8,10 @@ import numpy as np from flask_sqlalchemy import SQLAlchemy from statsmodels.api import OLS +import timely_beliefs as tb +from flexmeasures.data.models.reporting import Reporter +from flexmeasures.data.schemas.reporting import ReporterParametersSchema from flexmeasures.data.models.annotations import Annotation from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.time_series import TimedBelief, Sensor @@ -19,6 +22,9 @@ ) from flexmeasures.utils.time_utils import as_server_time +from marshmallow import fields +from marshmallow import Schema + @pytest.fixture(scope="module") def setup_test_data( @@ -174,3 +180,48 @@ def setup_annotations( asset=asset, sensor=sensor, ) + + +@pytest.fixture(scope="module") +def test_reporter(app, db, add_nearby_weather_sensors): + class TestReporterConfigSchema(Schema): + a = fields.Str() + + class TestReporterParametersSchema(ReporterParametersSchema): + b = fields.Str(required=False) + + class TestReporter(Reporter): + _config_schema = TestReporterConfigSchema() + _parameters_schema = TestReporterParametersSchema() + + def _compute_report(self, **kwargs) -> list: + start = kwargs.get("start") + end = kwargs.get("end") + sensor = kwargs["output"][0]["sensor"] + resolution = sensor.event_resolution + + index = pd.date_range(start=start, end=end, freq=resolution) + + r = pd.DataFrame() + r["event_start"] = index + r["belief_time"] = index + r["source"] = self.data_source + r["cumulative_probability"] = 0.5 + r["event_value"] = 0 + + bdf = tb.BeliefsDataFrame(r, sensor=sensor) + + return [{"data": bdf, "sensor": sensor}] + + app.data_generators["reporter"].update({"TestReporter": TestReporter}) + + config = dict(a="b") + + ds = TestReporter(config=config).data_source + + assert ds.name == app.config.get("FLEXMEASURES_DEFAULT_DATASOURCE") + + db.session.add(ds) + db.session.commit() + + return ds diff --git a/flexmeasures/data/tests/test_data_source.py b/flexmeasures/data/tests/test_data_source.py new file mode 100644 index 000000000..712174847 --- /dev/null +++ b/flexmeasures/data/tests/test_data_source.py @@ -0,0 +1,145 @@ +import pytest + +from flexmeasures.data.models.reporting import Reporter + +from datetime import datetime +from pytz import UTC + + +def test_get_reporter_from_source(db, app, test_reporter, add_nearby_weather_sensors): + + reporter = test_reporter.data_generator + + reporter_sensor = add_nearby_weather_sensors.get("farther_temperature") + + assert isinstance(reporter, Reporter) + assert reporter.__class__.__name__ == "TestReporter" + + res = reporter.compute( + input=[{"sensor": reporter_sensor}], + output=[{"sensor": reporter_sensor}], + start=datetime(2023, 1, 1, tzinfo=UTC), + end=datetime(2023, 1, 2, tzinfo=UTC), + )[0]["data"] + + assert res.lineage.sources[0] == reporter.data_source + + with pytest.raises(AttributeError): + reporter.compute( + input=[{"sensor": reporter_sensor}], + output=[{"sensor": reporter_sensor}], + start=datetime(2023, 1, 1, tzinfo=UTC), + end="not a date", + ) + + +def test_data_source(db, app, test_reporter): + # get TestReporter class from the data_generators registry + TestReporter = app.data_generators["reporter"].get("TestReporter") + + reporter1 = TestReporter(config={"a": "1"}) + + db.session.add(reporter1.data_source) + + reporter2 = TestReporter(config={"a": "1"}) + + # reporter1 and reporter2 have the same data_source because they share the same config + assert reporter1.data_source == reporter2.data_source + assert reporter1.data_source.attributes.get("data_generator").get( + "config" + ) == reporter2.data_source.attributes.get("data_generator").get("config") + + reporter3 = TestReporter(config={"a": "2"}) + + # reporter3 and reporter2 have different data sources because they have different config values + assert reporter3.data_source != reporter2.data_source + assert reporter3.data_source.attributes.get("data_generator").get( + "config" + ) != reporter2.data_source.attributes.get("data_generator").get("config") + + # recreate reporter3 from its data source + reporter4 = reporter3.data_source.data_generator + + # check that reporter3 and reporter4 share the same config values + assert reporter4._config == reporter3._config + + +def test_data_generator_save_config(db, app, test_reporter, add_nearby_weather_sensors): + TestReporter = app.data_generators["reporter"].get("TestReporter") + + reporter_sensor = add_nearby_weather_sensors.get("farther_temperature") + + reporter = TestReporter(config={"a": "1"}) + + res = reporter.compute( + input=[{"sensor": reporter_sensor}], + output=[{"sensor": reporter_sensor}], + start=datetime(2023, 1, 1, tzinfo=UTC), + end=datetime(2023, 1, 2, tzinfo=UTC), + )[0]["data"] + + assert res.lineage.sources[0].attributes.get("data_generator").get("config") == { + "a": "1" + } + + reporter = TestReporter(config={"a": "1"}, save_config=False) + + res = reporter.compute( + input=[{"sensor": reporter_sensor}], + output=[{"sensor": reporter_sensor}], + start=datetime(2023, 1, 1, tzinfo=UTC), + end=datetime(2023, 1, 2, tzinfo=UTC), + )[0]["data"] + + # check that the data_generator is not saving the config in the data_source attributes + assert res.lineage.sources[0].attributes.get("data_generator") == dict() + + +def test_data_generator_save_parameters( + db, app, test_reporter, add_nearby_weather_sensors +): + TestReporter = app.data_generators["reporter"].get("TestReporter") + + reporter_sensor = add_nearby_weather_sensors.get("farther_temperature") + + reporter = TestReporter(config={"a": "1"}, save_parameters=True) + + parameters = { + "input": [{"sensor": reporter_sensor.id}], + "output": [{"sensor": reporter_sensor.id}], + "start": "2023-01-01T00:00:00+00:00", + "end": "2023-01-02T00:00:00+00:00", + "b": "test", + } + + parameters_without_start_end = { + "input": [{"sensor": reporter_sensor.id}], + "output": [{"sensor": reporter_sensor.id}], + "b": "test", + } + + res = reporter.compute(parameters=parameters)[0]["data"] + + assert res.lineage.sources[0].attributes.get("data_generator").get("config") == { + "a": "1" + } + + assert ( + res.lineage.sources[0].attributes.get("data_generator").get("parameters") + == parameters_without_start_end + ) + + dg2 = reporter.data_source.data_generator + + parameters_2 = { + "start": "2023-01-01T10:00:00+00:00", + "end": "2023-01-02T00:00:00+00:00", + "b": "test2", + } + + res = dg2.compute(parameters=parameters_2)[0]["data"] + + # check that compute gets data stored in the DB (i.e. `input`/`output`) and updated data + # from the method call (e.g. field `b``) + assert dg2._parameters["b"] == parameters_2["b"] + assert dg2._parameters["start"].isoformat() == parameters_2["start"] diff --git a/flexmeasures/utils/plugin_utils.py b/flexmeasures/utils/plugin_utils.py index 19568b48a..4e9af8817 100644 --- a/flexmeasures/utils/plugin_utils.py +++ b/flexmeasures/utils/plugin_utils.py @@ -110,8 +110,16 @@ def register_plugins(app: Flask): from flexmeasures.data.models.reporting import Reporter from flexmeasures.data.models.planning import Scheduler - app.reporters.update(get_classes_module(module.__name__, Reporter)) - app.schedulers.update(get_classes_module(module.__name__, Scheduler)) + plugin_reporters = get_classes_module(module.__name__, Reporter) + plugin_schedulers = get_classes_module(module.__name__, Scheduler) + + # for legacy, we keep reporters and schedulers + app.reporters.update(plugin_reporters) + app.schedulers.update(plugin_schedulers) + + # add DataGenerators + app.data_generators["scheduler"].update(plugin_schedulers) + app.data_generators["reporter"].update(plugin_reporters) app.config["LOADED_PLUGINS"][plugin_name] = plugin_version app.logger.info(f"Loaded plugins: {app.config['LOADED_PLUGINS']}") diff --git a/requirements/app.in b/requirements/app.in index b26e2a80a..0babfb74b 100644 --- a/requirements/app.in +++ b/requirements/app.in @@ -1,4 +1,5 @@ # see ui/utils/plotting_utils: separate_legend() and create_hover_tool() +pyyaml altair colour pscript diff --git a/requirements/app.txt b/requirements/app.txt index 340a23f4d..53917b19d 100644 --- a/requirements/app.txt +++ b/requirements/app.txt @@ -253,6 +253,8 @@ pytz==2023.3 # pandas # timely-beliefs # timetomodel +pyyaml==6.0.1 + # via -r requirements/app.in redis==4.6.0 # via # -r requirements/app.in