Skip to content

Commit

Permalink
Cli csv upload (#85)
Browse files Browse the repository at this point in the history
Add CLI command to add beliefs from csv, optionally allowing to overwrite existing data.


* Add CLI command to add beliefs from csv

* Refactor if-else block

* Add docstring note about belief timing

* Overwrite beliefs (#98)

* Try tb upsert functionality

* Try to bulk insert without overwrite (faster) before attempting to merge with overwrite (slower)

* Make overwriting data a CLI option

* Copy docstring from tb

* Simplification

* UX improvement

* Add bulk saving as class method option, but not as CLI option; change class method default to not commit transaction

* Update dependency

Co-authored-by: F.N. Claessen <felix@seita.nl>

* Print useful error message if sensor is not found

* Changelog entry

* Move CLI commands to dev-add group

* Update changelog entry

Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x and Flix6x committed Apr 16, 2021
1 parent 134fb69 commit b46c78b
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 11 deletions.
3 changes: 2 additions & 1 deletion documentation/changelog.rst
Expand Up @@ -8,7 +8,6 @@ v0.4.0 | April XX, 2021

New features
-----------
* Add sensors with CLI command [see `PR #83 <https://github.com/SeitaBV/flexmeasures/pull/83>`_]
* Configure views with ``FLEXMEASURES_LISTED_VIEWS`` [see `PR #91 <https://github.com/SeitaBV/flexmeasures/pull/91>`_]
* Allow for views and CLI functions to come from plugins [see also `PR #91 <https://github.com/SeitaBV/flexmeasures/pull/91>`_]

Expand All @@ -20,6 +19,8 @@ Infrastructure / Support
----------------------
* Updated dependencies, including Flask-Security-Too [see `PR #82 <http://www.github.com/SeitaBV/flexmeasures/pull/82>`_]
* Integration with `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib: Sensor data as TimedBeliefs [see `PR #79 <http://www.github.com/SeitaBV/flexmeasures/pull/79>`_]
* Add sensors with CLI command currently meant for developers only [see `PR #83 <https://github.com/SeitaBV/flexmeasures/pull/83>`_]
* Add data (beliefs about sensor events) with CLI command currently meant for developers only [see `PR #85 <https://github.com/SeitaBV/flexmeasures/pull/85>`_]


v0.3.1 | April 9, 2021
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/models/data_sources.py
Expand Up @@ -46,7 +46,7 @@ def label(self):
return f"schedule by {self.name}"
elif self.type == "crawling script":
return f"data retrieved from {self.name}"
elif self.type == "demo script":
elif self.type in ("demo script", "CLI script"):
return f"demo data entered by {self.name}"
else:
return f"data from {self.name}"
Expand Down
28 changes: 24 additions & 4 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -110,17 +110,37 @@ def search(
)

@classmethod
def add(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True):
def add(
cls,
bdf: tb.BeliefsDataFrame,
expunge_session: bool = False,
allow_overwrite: bool = False,
bulk_save_objects: bool = False,
commit_transaction: bool = False,
):
"""Add a BeliefsDataFrame as timed beliefs in the database.
:param bdf: the BeliefsDataFrame to be persisted
:param commit_transaction: if True, the session is committed
if False, you can still add other data to the session
and commit it all within an atomic transaction
:param expunge_session: if True, all non-flushed instances are removed from the session before adding beliefs.
Expunging can resolve problems you might encounter with states of objects in your session.
When using this option, you might want to flush newly-created objects which are not beliefs
(e.g. a sensor or data source object).
:param allow_overwrite: if True, new objects are merged
if False, objects are added to the session or bulk saved
:param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
which is quite fast but has several caveats, see:
https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
if False, objects are added to the session with session.add_all()
:param commit_transaction: if True, the session is committed
if False, you can still add other data to the session
and commit it all within an atomic transaction
"""
return cls.add_to_session(
session=db.session,
beliefs_data_frame=bdf,
expunge_session=expunge_session,
allow_overwrite=allow_overwrite,
bulk_save_objects=bulk_save_objects,
commit_transaction=commit_transaction,
)

Expand Down
114 changes: 111 additions & 3 deletions flexmeasures/data/scripts/cli_tasks/data_add.py
@@ -1,7 +1,7 @@
"""CLI Tasks for (de)populating the database - most useful in development"""

from datetime import timedelta
from typing import List
from typing import List, Optional

import pandas as pd
import pytz
Expand All @@ -10,20 +10,30 @@
from flask_security.utils import hash_password
import click
import getpass
from sqlalchemy.exc import IntegrityError
import timely_beliefs as tb

from flexmeasures.data import db
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.data.services.users import create_user
from flexmeasures.data.models.time_series import Sensor, SensorSchema
from flexmeasures.data.models.time_series import Sensor, SensorSchema, TimedBelief
from flexmeasures.data.models.assets import Asset, AssetSchema
from flexmeasures.data.models.markets import Market
from flexmeasures.data.models.weather import WeatherSensor, WeatherSensorSchema
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.utils.time_utils import server_now


@click.group("add")
def fm_add_data():
"""FlexMeasures: Add data."""


@click.group("dev-add")
def fm_dev_add_data():
"""Developer CLI commands not yet meant for users: Add data."""


@fm_add_data.command("user")
@with_appcontext
@click.option("--username", required=True)
Expand Down Expand Up @@ -63,7 +73,7 @@ def new_user(username: str, email: str, roles: List[str], timezone: str):
print(f"Successfully created user {created_user}")


@fm_add_data.command("sensor")
@fm_dev_add_data.command("sensor")
@with_appcontext
@click.option("--name", required=True)
@click.option("--unit", required=True, help="e.g. °C, m/s, kW/m²")
Expand Down Expand Up @@ -201,6 +211,103 @@ def add_initial_structure():
populate_structure(app.db)


@fm_dev_add_data.command("beliefs")
@with_appcontext
@click.argument("file", type=click.Path(exists=True))
@click.option(
"--sensor-id",
required=True,
type=click.IntRange(min=1),
help="Sensor to which the beliefs pertain.",
)
@click.option(
"--horizon",
required=False,
type=int,
help="Belief horizon in minutes (use positive horizon for ex-ante beliefs or negative horizon for ex-post beliefs).",
)
@click.option(
"--cp",
required=False,
type=click.FloatRange(0, 1),
help="Cumulative probability in the range [0, 1].",
)
@click.option(
"--allow-overwrite/--do-not-allow-overwrite",
default=False,
help="Allow overwriting possibly already existing data.\n"
"Not allowing overwriting can be much more efficient",
)
def add_beliefs(
file: str,
sensor_id: int,
horizon: Optional[int] = None,
cp: Optional[float] = None,
allow_overwrite: bool = False,
):
"""Add sensor data from a csv file.
Structure your csv file as follows:
- One header line (will be ignored!)
- UTC datetimes in 1st column
- values in 2nd column
For example:
Date,Inflow (cubic meter)
2020-12-03 14:00,212
2020-12-03 14:10,215.6
2020-12-03 14:20,203.8
In case no --horizon is specified, the moment of executing this CLI command is taken
as the time at which the beliefs were recorded.
"""
sensor = Sensor.query.filter(Sensor.id == sensor_id).one_or_none()
if sensor is None:
print(f"Failed to create beliefs: no sensor found with id {sensor_id}.")
return
source = (
DataSource.query.filter(DataSource.name == "Seita")
.filter(DataSource.type == "CLI script")
.one_or_none()
)
if not source:
print("SETTING UP CLI SCRIPT AS NEW DATA SOURCE...")
source = DataSource(name="Seita", type="CLI script")
db.session.add(source)
db.session.flush() # assigns id
bdf = tb.read_csv(
file,
sensor,
source=source,
cumulative_probability=cp,
parse_dates=True,
infer_datetime_format=True,
**(
dict(belief_horizon=timedelta(minutes=horizon))
if horizon is not None
else dict(
belief_time=server_now().astimezone(pytz.timezone(sensor.timezone))
)
),
)
try:
TimedBelief.add(
bdf,
expunge_session=True,
allow_overwrite=allow_overwrite,
bulk_save_objects=True,
commit_transaction=True,
)
print(f"Successfully created beliefs\n{bdf}")
except IntegrityError as e:
db.session.rollback()
print(f"Failed to create beliefs due to the following error: {e.orig}")
if not allow_overwrite:
print("As a possible workaround, use the --allow-overwrite flag.")


@fm_add_data.command("forecasts")
@with_appcontext
@click.option(
Expand Down Expand Up @@ -338,6 +445,7 @@ def collect_weather_data(region, location, num_cells, method, store_in_db):


app.cli.add_command(fm_add_data)
app.cli.add_command(fm_dev_add_data)


def check_timezone(timezone):
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.3.0
timely-beliefs>=1.4.0
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -317,7 +317,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==2.1.0
# via scikit-learn
timely-beliefs==1.3.0
timely-beliefs==1.4.0
# via -r requirements/app.in
timetomodel==0.6.9
# via -r requirements/app.in
Expand Down

0 comments on commit b46c78b

Please sign in to comment.