From b46c78b7210bde8d5d8b295e0466d049ffc9c53d Mon Sep 17 00:00:00 2001 From: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Date: Fri, 16 Apr 2021 16:36:09 +0200 Subject: [PATCH] Cli csv upload (#85) Add CLI command to add beliefs from csv, optionally allowing to overwrite existing data. * Add CLI command to add beliefs from csv * Refactor if-else block * Add docstring note about belief timing * Overwrite beliefs (#98) * Try tb upsert functionality * Try to bulk insert without overwrite (faster) before attempting to merge with overwrite (slower) * Make overwriting data a CLI option * Copy docstring from tb * Simplification * UX improvement * Add bulk saving as class method option, but not as CLI option; change class method default to not commit transaction * Update dependency Co-authored-by: F.N. Claessen * Print useful error message if sensor is not found * Changelog entry * Move CLI commands to dev-add group * Update changelog entry Co-authored-by: F.N. Claessen --- documentation/changelog.rst | 3 +- flexmeasures/data/models/data_sources.py | 2 +- flexmeasures/data/models/time_series.py | 28 ++++- .../data/scripts/cli_tasks/data_add.py | 114 +++++++++++++++++- requirements/app.in | 2 +- requirements/app.txt | 2 +- 6 files changed, 140 insertions(+), 11 deletions(-) diff --git a/documentation/changelog.rst b/documentation/changelog.rst index e02b4c351..eeec74783 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -8,7 +8,6 @@ v0.4.0 | April XX, 2021 New features ----------- -* Add sensors with CLI command [see `PR #83 `_] * Configure views with ``FLEXMEASURES_LISTED_VIEWS`` [see `PR #91 `_] * Allow for views and CLI functions to come from plugins [see also `PR #91 `_] @@ -20,6 +19,8 @@ Infrastructure / Support ---------------------- * Updated dependencies, including Flask-Security-Too [see `PR #82 `_] * Integration with `timely beliefs `_ lib: Sensor data as TimedBeliefs [see `PR #79 `_] +* Add sensors with CLI command currently meant for developers only [see `PR #83 `_] +* Add data (beliefs about sensor events) with CLI command currently meant for developers only [see `PR #85 `_] v0.3.1 | April 9, 2021 diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 430dc4de9..0ebed3eb5 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -46,7 +46,7 @@ def label(self): return f"schedule by {self.name}" elif self.type == "crawling script": return f"data retrieved from {self.name}" - elif self.type == "demo script": + elif self.type in ("demo script", "CLI script"): return f"demo data entered by {self.name}" else: return f"data from {self.name}" diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index f77d3285f..873e7088b 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -110,17 +110,37 @@ def search( ) @classmethod - def add(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True): + def add( + cls, + bdf: tb.BeliefsDataFrame, + expunge_session: bool = False, + allow_overwrite: bool = False, + bulk_save_objects: bool = False, + commit_transaction: bool = False, + ): """Add a BeliefsDataFrame as timed beliefs in the database. :param bdf: the BeliefsDataFrame to be persisted - :param commit_transaction: if True, the session is committed - if False, you can still add other data to the session - and commit it all within an atomic transaction + :param expunge_session: if True, all non-flushed instances are removed from the session before adding beliefs. + Expunging can resolve problems you might encounter with states of objects in your session. + When using this option, you might want to flush newly-created objects which are not beliefs + (e.g. a sensor or data source object). + :param allow_overwrite: if True, new objects are merged + if False, objects are added to the session or bulk saved + :param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(), + which is quite fast but has several caveats, see: + https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats + if False, objects are added to the session with session.add_all() + :param commit_transaction: if True, the session is committed + if False, you can still add other data to the session + and commit it all within an atomic transaction """ return cls.add_to_session( session=db.session, beliefs_data_frame=bdf, + expunge_session=expunge_session, + allow_overwrite=allow_overwrite, + bulk_save_objects=bulk_save_objects, commit_transaction=commit_transaction, ) diff --git a/flexmeasures/data/scripts/cli_tasks/data_add.py b/flexmeasures/data/scripts/cli_tasks/data_add.py index ff8b7e4ed..ab8f0a9f7 100644 --- a/flexmeasures/data/scripts/cli_tasks/data_add.py +++ b/flexmeasures/data/scripts/cli_tasks/data_add.py @@ -1,7 +1,7 @@ """CLI Tasks for (de)populating the database - most useful in development""" from datetime import timedelta -from typing import List +from typing import List, Optional import pandas as pd import pytz @@ -10,13 +10,18 @@ from flask_security.utils import hash_password import click import getpass +from sqlalchemy.exc import IntegrityError +import timely_beliefs as tb +from flexmeasures.data import db from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.data.services.users import create_user -from flexmeasures.data.models.time_series import Sensor, SensorSchema +from flexmeasures.data.models.time_series import Sensor, SensorSchema, TimedBelief from flexmeasures.data.models.assets import Asset, AssetSchema from flexmeasures.data.models.markets import Market from flexmeasures.data.models.weather import WeatherSensor, WeatherSensorSchema +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.utils.time_utils import server_now @click.group("add") @@ -24,6 +29,11 @@ def fm_add_data(): """FlexMeasures: Add data.""" +@click.group("dev-add") +def fm_dev_add_data(): + """Developer CLI commands not yet meant for users: Add data.""" + + @fm_add_data.command("user") @with_appcontext @click.option("--username", required=True) @@ -63,7 +73,7 @@ def new_user(username: str, email: str, roles: List[str], timezone: str): print(f"Successfully created user {created_user}") -@fm_add_data.command("sensor") +@fm_dev_add_data.command("sensor") @with_appcontext @click.option("--name", required=True) @click.option("--unit", required=True, help="e.g. °C, m/s, kW/m²") @@ -201,6 +211,103 @@ def add_initial_structure(): populate_structure(app.db) +@fm_dev_add_data.command("beliefs") +@with_appcontext +@click.argument("file", type=click.Path(exists=True)) +@click.option( + "--sensor-id", + required=True, + type=click.IntRange(min=1), + help="Sensor to which the beliefs pertain.", +) +@click.option( + "--horizon", + required=False, + type=int, + help="Belief horizon in minutes (use positive horizon for ex-ante beliefs or negative horizon for ex-post beliefs).", +) +@click.option( + "--cp", + required=False, + type=click.FloatRange(0, 1), + help="Cumulative probability in the range [0, 1].", +) +@click.option( + "--allow-overwrite/--do-not-allow-overwrite", + default=False, + help="Allow overwriting possibly already existing data.\n" + "Not allowing overwriting can be much more efficient", +) +def add_beliefs( + file: str, + sensor_id: int, + horizon: Optional[int] = None, + cp: Optional[float] = None, + allow_overwrite: bool = False, +): + """Add sensor data from a csv file. + + Structure your csv file as follows: + + - One header line (will be ignored!) + - UTC datetimes in 1st column + - values in 2nd column + + For example: + + Date,Inflow (cubic meter) + 2020-12-03 14:00,212 + 2020-12-03 14:10,215.6 + 2020-12-03 14:20,203.8 + + In case no --horizon is specified, the moment of executing this CLI command is taken + as the time at which the beliefs were recorded. + """ + sensor = Sensor.query.filter(Sensor.id == sensor_id).one_or_none() + if sensor is None: + print(f"Failed to create beliefs: no sensor found with id {sensor_id}.") + return + source = ( + DataSource.query.filter(DataSource.name == "Seita") + .filter(DataSource.type == "CLI script") + .one_or_none() + ) + if not source: + print("SETTING UP CLI SCRIPT AS NEW DATA SOURCE...") + source = DataSource(name="Seita", type="CLI script") + db.session.add(source) + db.session.flush() # assigns id + bdf = tb.read_csv( + file, + sensor, + source=source, + cumulative_probability=cp, + parse_dates=True, + infer_datetime_format=True, + **( + dict(belief_horizon=timedelta(minutes=horizon)) + if horizon is not None + else dict( + belief_time=server_now().astimezone(pytz.timezone(sensor.timezone)) + ) + ), + ) + try: + TimedBelief.add( + bdf, + expunge_session=True, + allow_overwrite=allow_overwrite, + bulk_save_objects=True, + commit_transaction=True, + ) + print(f"Successfully created beliefs\n{bdf}") + except IntegrityError as e: + db.session.rollback() + print(f"Failed to create beliefs due to the following error: {e.orig}") + if not allow_overwrite: + print("As a possible workaround, use the --allow-overwrite flag.") + + @fm_add_data.command("forecasts") @with_appcontext @click.option( @@ -338,6 +445,7 @@ def collect_weather_data(region, location, num_cells, method, store_in_db): app.cli.add_command(fm_add_data) +app.cli.add_command(fm_dev_add_data) def check_timezone(timezone): diff --git a/requirements/app.in b/requirements/app.in index 09d7da32f..37e6ee956 100644 --- a/requirements/app.in +++ b/requirements/app.in @@ -32,7 +32,7 @@ netCDF4 siphon tables timetomodel>=0.6.8 -timely-beliefs>=1.3.0 +timely-beliefs>=1.4.0 python-dotenv # a backport, not needed in Python3.8 importlib_metadata diff --git a/requirements/app.txt b/requirements/app.txt index 35570ffd7..8827feb91 100644 --- a/requirements/app.txt +++ b/requirements/app.txt @@ -317,7 +317,7 @@ tables==3.6.1 # via -r requirements/app.in threadpoolctl==2.1.0 # via scikit-learn -timely-beliefs==1.3.0 +timely-beliefs==1.4.0 # via -r requirements/app.in timetomodel==0.6.9 # via -r requirements/app.in