Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI command to delete unchanged beliefs #328

Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 85 additions & 6 deletions flexmeasures/cli/data_delete.py
@@ -1,12 +1,16 @@
from datetime import timedelta
from itertools import chain
from typing import Optional

import click
from flask import current_app as app
from flask.cli import with_appcontext
from timely_beliefs.beliefs.queries import query_unchanged_beliefs

from flexmeasures.data import db
from flexmeasures.data.models.user import Account, AccountRole, RolesAccounts, User
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.scripts.data_gen import get_affected_classes
from flexmeasures.data.services.users import find_user_by_email, delete_user

Expand All @@ -16,6 +20,11 @@ def fm_delete_data():
"""FlexMeasures: Delete data."""


@click.group("dev-delete")
def fm_dev_delete_data():
"""Developer CLI commands not yet meant for users: Delete data."""


@fm_delete_data.command("account-role")
@with_appcontext
@click.option("--name", required=True)
Expand Down Expand Up @@ -66,8 +75,7 @@ def delete_account(id: int, force: bool):
+ ",".join([ga.name for ga in generic_assets])
+ "\n"
)
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
for user in account.users:
print(f"Deleting user {user} (and assets & data) ...")
delete_user(user)
Expand Down Expand Up @@ -101,8 +109,7 @@ def delete_user_and_data(email: str, force: bool):
if not force:
# TODO: later, when assets belong to accounts, remove this.
prompt = f"Delete user '{email}', including all their assets and data?"
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
the_user = find_user_by_email(email)
if the_user is None:
print(f"Could not find user with email address '{email}' ...")
Expand All @@ -127,8 +134,7 @@ def confirm_deletion(
)
if is_by_id:
prompt = prompt.replace(" all ", " ")
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)


@fm_delete_data.command("structure")
Expand Down Expand Up @@ -196,4 +202,77 @@ def delete_prognoses(
depopulate_prognoses(app.db, sensor_id)


@fm_dev_delete_data.command("unchanged_beliefs")
@with_appcontext
@click.option(
"--sensor-id",
type=int,
help="Delete unchanged (time series) data for a single sensor only. Follow up with the sensor's ID. ",
)
@click.option(
"--delete-forecasts/--keep-forecasts",
"delete_unchanged_forecasts",
default=True,
help="Use the --keep-forecasts flag to keep unchanged beliefs with a positive belief horizon (forecasts).",
)
@click.option(
"--delete-measurements/--keep-measurements",
"delete_unchanged_measurements",
default=True,
help="Use the --keep-measurements flag to keep beliefs with a zero or negative belief horizon (measurements, nowcasts and backcasts).",
)
def delete_unchanged_beliefs(
sensor_id: Optional[int] = None,
delete_unchanged_forecasts: bool = False,
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
delete_unchanged_measurements: bool = False,
):
"""Delete unchanged beliefs (i.e. updated beliefs with a later belief time, but with the same event value)."""
q = db.session.query(TimedBelief)
if sensor_id:
sensor = Sensor.query.filter(Sensor.id == sensor_id).one_or_none()
if sensor is None:
print(f"Failed to delete any beliefs: no sensor found with id {sensor_id}.")
return
q = q.filter(TimedBelief.sensor_id == sensor.id)
num_beliefs_before = q.count()

unchanged_queries = []
if delete_unchanged_forecasts:
q_unchanged_forecasts = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon > timedelta(0),
),
include_non_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_forecasts)
if delete_unchanged_measurements:
q_unchanged_measurements = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon <= timedelta(0),
),
include_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_measurements)

num_beliefs_up_for_deletion = sum([q.count() for q in unchanged_queries])
prompt = f"Delete {num_beliefs_up_for_deletion} unchanged beliefs out of {num_beliefs_before} beliefs?"
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
click.confirm(prompt, abort=True)

beliefs_up_for_deletion = list(chain(*[q.all() for q in unchanged_queries]))
batch_size = 10000
for i, b in enumerate(beliefs_up_for_deletion, start=1):
if i % batch_size == 0 or i == num_beliefs_up_for_deletion:
print(f"{i} beliefs processed ...")
db.session.delete(b)
print(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
db.session.commit()
num_beliefs_after = q.count()
print(f"Done! {num_beliefs_after} beliefs left")


app.cli.add_command(fm_delete_data)
app.cli.add_command(fm_dev_delete_data)