Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cli resample sensor #360

Merged
merged 14 commits into from Feb 12, 2022
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -15,6 +15,7 @@ New features
* Add CLI commands ``flexmeasures add sensor``, ``flexmeasures add asset-type``, ``flexmeasures add beliefs`` (which were experimental features before). [see `PR #337 <http://www.github.com/FlexMeasures/flexmeasures/pull/337>`_]
* Add CLI commands for showing data [see `PR #339 <http://www.github.com/FlexMeasures/flexmeasures/pull/339>`_]
* Add CLI command for attaching annotations to assets: ``flexmeasures add holidays`` adds public holidays [see `PR #343 <http://www.github.com/FlexMeasures/flexmeasures/pull/343>`_]
* Add CLI command for resampling existing sensor data to new resolution [see `PR #360 <http://www.github.com/FlexMeasures/flexmeasures/pull/360>`_]
* Support for percent (%) and permille (‰) sensor units [see `PR #359 <http://www.github.com/FlexMeasures/flexmeasures/pull/359>`_]

Bugfixes
Expand Down
1 change: 1 addition & 0 deletions documentation/cli/change_log.rst
Expand Up @@ -8,6 +8,7 @@ since v0.9.0 | January 26, 2022
=====================

* add CLI comands for showing data ``flexmeasures show accounts``, ``flexmeasures show account``, ``flexmeasures show roles``, ``flexmeasures show asset-types``, ``flexmeasures show asset`` and ``flexmeasures show data-sources``.
* Add ``flexmeasures db-ops resample-data`` CLI command to resample sensor data to a different resolution.


since v0.9.0 | January 26, 2022
Expand Down
99 changes: 96 additions & 3 deletions flexmeasures/cli/db_ops.py
@@ -1,15 +1,21 @@
"""CLI Tasks for saving, resetting, etc of the database"""

from datetime import datetime
from datetime import datetime, timedelta
import subprocess
from typing import List, Optional

from flask import current_app as app
from flask.cli import with_appcontext
import flask_migrate as migrate
import click
import pandas as pd

from flexmeasures.data import db
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.utils import save_to_db

BACKUP_PATH = app.config.get("FLEXMEASURES_DB_BACKUP_PATH")

BACKUP_PATH: str = app.config.get("FLEXMEASURES_DB_BACKUP_PATH") # type: ignore


@click.group("db-ops")
Expand Down Expand Up @@ -121,7 +127,7 @@ def restore(file: str):

"""

db_uri = app.config.get("SQLALCHEMY_DATABASE_URI")
db_uri: str = app.config.get("SQLALCHEMY_DATABASE_URI") # type: ignore
db_host_and_db_name = db_uri.split("@")[-1]
click.echo(f"Restoring {db_host_and_db_name} database from file {file}")
command_for_restoring = f"pg_restore -d {db_uri} {file}"
Expand All @@ -137,4 +143,91 @@ def restore(file: str):
click.echo("db restore unsuccessful")


@fm_db_ops.command("resample-data")
@with_appcontext
@click.option(
"--sensor-id",
"sensor_ids",
multiple=True,
required=True,
help="Resample data for this sensor. Follow up with the sensor's ID. This argument can be given multiple times.",
)
@click.option(
"--event-resolution",
"event_resolution_in_minutes",
type=int,
required=True,
help="New event resolution as an integer number of minutes.",
)
@click.option(
"--from",
"start_str",
required=False,
help="Resample only data from this datetime onwards. Follow up with a timezone-aware datetime in ISO 6801 format.",
)
@click.option(
"--until",
"end_str",
required=False,
help="Resample only data until this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
)
@click.option(
"--skip-integrity-check",
is_flag=True,
help="Whether to skip checking the resampled time series data for each sensor."
" By default, an excerpt and the mean value of the original"
" and resampled data will be shown for manual approval.",
)
def resample_sensor_data(
sensor_ids: List[int],
event_resolution_in_minutes: int,
start_str: Optional[str] = None,
end_str: Optional[str] = None,
skip_integrity_check: bool = False,
):
"""Assign a new event resolution to an existing sensor and resample its data accordingly."""
event_resolution = timedelta(minutes=event_resolution_in_minutes)
event_starts_after = pd.Timestamp(start_str) # note that "" or None becomes NaT
event_ends_before = pd.Timestamp(end_str)
for sensor_id in sensor_ids:
sensor = Sensor.query.get(sensor_id)
if sensor.event_resolution == event_resolution:
print(f"{sensor} already has the desired event resolution.")
continue
df_original = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
).sort_values("event_start")
df_resampled = df_original.resample_events(event_resolution).sort_values(
"event_start"
)
if not skip_integrity_check:
message = ""
if sensor.event_resolution < event_resolution:
message += f"Downsampling {sensor} to {event_resolution} will result in a loss of data. "
click.confirm(
message
+ f"Data before:\n{df_original}\nData after:\n{df_resampled}\nMean before: {df_original['event_value'].mean()}\nMean after: {df_resampled['event_value'].mean()}\nContinue?",
abort=True,
)

# Update sensor
sensor.event_resolution = event_resolution
db.session.add(sensor)

# Update sensor data
query = TimedBelief.query.filter(TimedBelief.sensor == sensor)
if not pd.isnull(event_starts_after):
query = query.filter(TimedBelief.event_start >= event_starts_after)
if not pd.isnull(event_ends_before):
query = query.filter(
TimedBelief.event_start + sensor.event_resolution <= event_ends_before
)
query.delete()
save_to_db(df_resampled, bulk_save_objects=True)
db.session.commit()
print("Successfully resampled sensor data.")


app.cli.add_command(fm_db_ops)
79 changes: 79 additions & 0 deletions flexmeasures/cli/tests/test_db_ops.py
@@ -0,0 +1,79 @@
import pytest

import pandas as pd
import timely_beliefs as tb

from flexmeasures.cli.tests.utils import to_flags
from flexmeasures.data.models.time_series import Sensor, TimedBelief


@pytest.mark.skip_github
@pytest.mark.parametrize(
"event_starts_after, event_ends_before",
(
["", ""],
["2021-03-28 15:00:00+00:00", "2021-03-28 16:00:00+00:00"],
),
)
def test_resample_sensor_data(
app, db, setup_beliefs, event_starts_after: str, event_ends_before: str
):
"""Check resampling market data from hourly to 30 minute resolution and back."""

from flexmeasures.cli.db_ops import resample_sensor_data

sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
event_starts_after = pd.Timestamp(event_starts_after)
event_ends_before = pd.Timestamp(event_ends_before)
beliefs_before = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
)

# Check whether fixtures have flushed
assert sensor.id is not None

# Check whether we have all desired beliefs
query = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id)
if not pd.isnull(event_starts_after):
query = query.filter(TimedBelief.event_start >= event_starts_after)
if not pd.isnull(event_ends_before):
query = query.filter(
TimedBelief.event_start + sensor.event_resolution <= event_ends_before
)
all_beliefs_for_given_sensor = query.all()
pd.testing.assert_frame_equal(
tb.BeliefsDataFrame(all_beliefs_for_given_sensor), beliefs_before
)

cli_input = {
"sensor-id": sensor.id,
"event-resolution": sensor.event_resolution.seconds / 60 / 2,
}
runner = app.test_cli_runner()
result = runner.invoke(
resample_sensor_data, to_flags(cli_input) + ["--skip-integrity-check"]
)

# Check result for success
assert "Successfully resampled" in result.output

# Check that we now have twice as much data for this sensor
sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
beliefs_after = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
)
assert len(beliefs_after) == 2 * len(beliefs_before)

# Checksum
assert beliefs_after["event_value"].sum() == 2 * beliefs_before["event_value"].sum()

# Resample back to original resolution (on behalf of the next test case)
cli_input["event-resolution"] = sensor.event_resolution.seconds / 60
result = runner.invoke(
resample_sensor_data, to_flags(cli_input) + ["--skip-integrity-check"]
)
assert "Successfully resampled" in result.output
5 changes: 5 additions & 0 deletions flexmeasures/data/utils.py
Expand Up @@ -51,6 +51,7 @@ def get_data_source(

def save_to_db(
data: Union[BeliefsDataFrame, List[BeliefsDataFrame]],
bulk_save_objects: bool = False,
save_changed_beliefs_only: bool = True,
) -> str:
"""Save the timed beliefs to the database.
Expand All @@ -75,6 +76,9 @@ def save_to_db(
Servers in 'play' mode are exempt from this rule, to facilitate replaying simulations.

:param data: BeliefsDataFrame (or a list thereof) to be saved
:param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
which is quite fast but has several caveats, see:
https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
:param save_changed_beliefs_only: if True, unchanged beliefs are skipped (updated beliefs are only stored if they represent changed beliefs)
if False, all updated beliefs are stored
:returns: status string, one of the following:
Expand Down Expand Up @@ -122,6 +126,7 @@ def save_to_db(
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
bulk_save_objects=bulk_save_objects,
allow_overwrite=current_app.config.get(
"FLEXMEASURES_ALLOW_DATA_OVERWRITE", False
),
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -36,7 +36,7 @@ siphon
tables
tabulate
timetomodel>=0.7.1
timely-beliefs>=1.11.0
timely-beliefs>=1.11.2
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -367,7 +367,7 @@ tabulate==0.8.9
# via -r requirements/app.in
threadpoolctl==3.0.0
# via scikit-learn
timely-beliefs==1.11.0
timely-beliefs==1.11.2
# via -r requirements/app.in
timetomodel==0.7.1
# via -r requirements/app.in
Expand Down