Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI command to delete unchanged beliefs #328

Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 49 additions & 14 deletions flexmeasures/cli/data_delete.py
@@ -1,3 +1,5 @@
from datetime import timedelta
from itertools import chain
from typing import Optional

import click
Expand Down Expand Up @@ -73,8 +75,7 @@ def delete_account(id: int, force: bool):
+ ",".join([ga.name for ga in generic_assets])
+ "\n"
)
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
for user in account.users:
print(f"Deleting user {user} (and assets & data) ...")
delete_user(user)
Expand Down Expand Up @@ -108,8 +109,7 @@ def delete_user_and_data(email: str, force: bool):
if not force:
# TODO: later, when assets belong to accounts, remove this.
prompt = f"Delete user '{email}', including all their assets and data?"
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)
the_user = find_user_by_email(email)
if the_user is None:
print(f"Could not find user with email address '{email}' ...")
Expand All @@ -134,8 +134,7 @@ def confirm_deletion(
)
if is_by_id:
prompt = prompt.replace(" all ", " ")
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)


@fm_delete_data.command("structure")
Expand Down Expand Up @@ -210,8 +209,22 @@ def delete_prognoses(
type=int,
help="Delete unchanged (time series) data for a single sensor only. Follow up with the sensor's ID. ",
)
@click.option(
"--delete-forecasts/--keep-forecasts",
"delete_unchanged_forecasts",
default=True,
help="Use the --keep-forecasts flag to keep unchanged beliefs with a positive belief horizon (forecasts).",
)
@click.option(
"--delete-measurements/--keep-measurements",
"delete_unchanged_measurements",
default=True,
help="Use the --keep-measurements flag to keep beliefs with a zero or negative belief horizon (measurements, nowcasts and backcasts).",
)
def delete_unchanged_beliefs(
sensor_id: Optional[int] = None,
delete_unchanged_forecasts: bool = False,
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
delete_unchanged_measurements: bool = False,
):
"""Delete unchanged beliefs (i.e. updated beliefs with a later belief time, but with the same event value)."""
q = db.session.query(TimedBelief)
Expand All @@ -221,21 +234,43 @@ def delete_unchanged_beliefs(
print(f"Failed to delete any beliefs: no sensor found with id {sensor_id}.")
return
q = q.filter(TimedBelief.sensor_id == sensor.id)
num_beliefs_before = len(q.all())
q_unchanged_beliefs = query_unchanged_beliefs(db.session, TimedBelief, q)
beliefs_up_for_deletion = q_unchanged_beliefs.all()
num_beliefs_up_for_deletion = len(beliefs_up_for_deletion)
num_beliefs_before = q.count()

unchanged_queries = []
if delete_unchanged_forecasts:
q_unchanged_forecasts = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon > timedelta(0),
),
include_non_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_forecasts)
if delete_unchanged_measurements:
q_unchanged_measurements = query_unchanged_beliefs(
db.session,
TimedBelief,
q.filter(
TimedBelief.belief_horizon <= timedelta(0),
),
include_positive_horizons=False,
)
unchanged_queries.append(q_unchanged_measurements)

num_beliefs_up_for_deletion = sum([q.count() for q in unchanged_queries])
prompt = f"Delete {num_beliefs_up_for_deletion} unchanged beliefs out of {num_beliefs_before} beliefs?"
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
if not click.confirm(prompt):
raise click.Abort()
click.confirm(prompt, abort=True)

beliefs_up_for_deletion = list(chain(*[q.all() for q in unchanged_queries]))
batch_size = 10000
for i, b in enumerate(beliefs_up_for_deletion, start=1):
batch_size = 10000
if i % batch_size == 0 or i == num_beliefs_up_for_deletion:
print(f"{i} beliefs processed ...")
db.session.delete(b)
print(f"Removing {num_beliefs_up_for_deletion} beliefs ...")
db.session.commit()
num_beliefs_after = len(q.all())
num_beliefs_after = q.count()
print(f"Done! {num_beliefs_after} beliefs left")


Expand Down