Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI command to delete unchanged beliefs #328

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
130 changes: 124 additions & 6 deletions flexmeasures/cli/data_delete.py
@@ -1,12 +1,16 @@
from datetime import timedelta
from itertools import chain
from typing import Optional

import click
from flask import current_app as app
from flask.cli import with_appcontext
from timely_beliefs.beliefs.queries import query_unchanged_beliefs

from flexmeasures.data import db
from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts, User
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.scripts.data_gen import get_affected_classes
from flexmeasures.data.services.users import find_user_by_email, delete_user

Expand All @@ -16,6 +20,11 @@ def fm_delete_data():
"""FlexMeasures: Delete data."""


@click.group("dev-delete")
def fm_dev_delete_data():
"""Developer CLI commands not yet meant for users: Delete data."""


@fm_delete_data.command("account-role")
@with_appcontext
@click.option("--name", required=True)
Expand Down Expand Up @@ -66,8 +75,7 @@ def delete_account(id: int, force: bool):
+ ",".join([ga.name for ga in generic_assets])
+ "\n"
)
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
for user in account.users:
print(f"Deleting user {user} (and assets & data) ...")
delete_user(user)
Expand Down Expand Up @@ -101,8 +109,7 @@ def delete_user_and_data(email: str, force: bool):
if not force:
# TODO: later, when assets belong to accounts, remove this.
prompt = f"Delete user '{email}', including all their assets and data?"
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
the_user = find_user_by_email(email)
if the_user is None:
print(f"Could not find user with email address '{email}' ...")
Expand All @@ -127,8 +134,7 @@ def confirm_deletion(
)
if is_by_id:
prompt = prompt.replace(" all ", " ")
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)


@fm_delete_data.command("structure")
Expand Down Expand Up @@ -196,4 +202,116 @@ def delete_prognoses(
depopulate_prognoses(app.db, sensor_id)


@fm_dev_delete_data.command("unchanged_beliefs")
@with_appcontext
@click.option(
"--sensor-id",
type=int,
help="Delete unchanged (time series) data for a single sensor only. Follow up with the sensor's ID. ",
)
@click.option(
"--delete-forecasts/--keep-forecasts",
"delete_unchanged_forecasts",
default=True,
help="Use the --keep-forecasts flag to keep unchanged beliefs with a positive belief horizon (forecasts).",
)
@click.option(
"--delete-measurements/--keep-measurements",
"delete_unchanged_measurements",
default=True,
help="Use the --keep-measurements flag to keep beliefs with a zero or negative belief horizon (measurements, nowcasts and backcasts).",
)
def delete_unchanged_beliefs(
sensor_id: Optional[int] = None,
delete_unchanged_forecasts: bool = True,
delete_unchanged_measurements: bool = True,
):
"""Delete unchanged beliefs (i.e. updated beliefs with a later belief time, but with the same event value)."""
q = db.session.query(TimedBelief)
if sensor_id:
sensor = Sensor.query.filter(Sensor.id == sensor_id).one_or_none()
if sensor is None:
print(f"Failed to delete any beliefs: no sensor found with id {sensor_id}.")
return
q = q.filter(TimedBelief.sensor_id == sensor.id)
num_beliefs_before = q.count()

unchanged_queries = []
num_forecasts_up_for_deletion = 0
num_measurements_up_for_deletion = 0
if delete_unchanged_forecasts:
q_unchanged_forecasts = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon > timedelta(0),
),
include_non_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_forecasts)
num_forecasts_up_for_deletion = q_unchanged_forecasts.count()
if delete_unchanged_measurements:
q_unchanged_measurements = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon <= timedelta(0),
),
include_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_measurements)
num_measurements_up_for_deletion = q_unchanged_measurements.count()

num_beliefs_up_for_deletion = (
num_forecasts_up_for_deletion + num_measurements_up_for_deletion
)
prompt = f"Delete {num_beliefs_up_for_deletion} unchanged beliefs ({num_measurements_up_for_deletion} measurements and {num_forecasts_up_for_deletion} forecasts) out of {num_beliefs_before} beliefs?"
click.confirm(prompt, abort=True)

beliefs_up_for_deletion = list(chain(*[q.all() for q in unchanged_queries]))
batch_size = 10000
for i, b in enumerate(beliefs_up_for_deletion, start=1):
if i % batch_size == 0 or i == num_beliefs_up_for_deletion:
print(f"{i} beliefs processed ...")
db.session.delete(b)
print(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
db.session.commit()
num_beliefs_after = q.count()
print(f"Done! {num_beliefs_after} beliefs left")


@fm_dev_delete_data.command("nan_beliefs")
@with_appcontext
def delete_nan_beliefs():
"""Delete NaN beliefs."""
q = db.session.query(TimedBelief)
query = q.filter(TimedBelief.event_value == float("NaN"))
prompt = f"Delete {query.count()} NaN beliefs out of {q.count()} beliefs?"
click.confirm(prompt, abort=True)
query.delete()
db.session.commit()
print(f"Done! {q.count()} beliefs left")


@fm_delete_data.command("sensor")
@with_appcontext
@click.option(
"--sensor-id",
type=int,
help="Delete a single sensor and its (time series) data. Follow up with the sensor's ID.",
)
def delete_sensor(
sensor_id: Optional[int] = None,
):
"""Delete a sensor and all beliefs about it."""
sensor = Sensor.query.get(sensor_id)
n = TimedBelief.query.filter(TimedBelief.sensor_id == sensor_id).delete()
db.session.delete(sensor)
click.confirm(
f"Really delete sensor {sensor_id}, along with {n} beliefs?", abort=True
)
db.session.commit()


app.cli.add_command(fm_delete_data)
app.cli.add_command(fm_dev_delete_data)
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -34,7 +34,7 @@ netCDF4
siphon
tables
timetomodel>=0.7.1
timely-beliefs>=1.10.0
timely-beliefs>=1.11.0
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -356,7 +356,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==3.0.0
# via scikit-learn
timely-beliefs==1.10.0
timely-beliefs==1.11.0
# via -r requirements/app.in
timetomodel==0.7.1
# via -r requirements/app.in
Expand Down