Skip to content

Commit

Permalink
Add a CLI command to resample sensor data to a different resolution.
Browse files Browse the repository at this point in the history
Cli resample sensor (#360)

* Expose possibility of bulk saving beliefs

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Add CLI to assign a new resolution to a sensor and resample its data accordingly

Signed-off-by: F.N. Claessen <felix@seita.nl>

* CLI changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* mypy

Signed-off-by: F.N. Claessen <felix@seita.nl>

* ignore mypy

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Upgrade tb dependency

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Add test

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Resample all data, not just the most recent beliefs

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Allow to pick a time window for resampling, and test it, too

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Try test on GH Actions

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Revert "Try test on GH Actions"

This reverts commit bd8f1fa.

* Remove print statement

Signed-off-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x committed Feb 12, 2022
1 parent 7610bb6 commit 17acc1c
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 5 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -15,6 +15,7 @@ New features
* Add CLI commands ``flexmeasures add sensor``, ``flexmeasures add asset-type``, ``flexmeasures add beliefs`` (which were experimental features before). [see `PR #337 <http://www.github.com/FlexMeasures/flexmeasures/pull/337>`_]
* Add CLI commands for showing data [see `PR #339 <http://www.github.com/FlexMeasures/flexmeasures/pull/339>`_]
* Add CLI command for attaching annotations to assets: ``flexmeasures add holidays`` adds public holidays [see `PR #343 <http://www.github.com/FlexMeasures/flexmeasures/pull/343>`_]
* Add CLI command for resampling existing sensor data to new resolution [see `PR #360 <http://www.github.com/FlexMeasures/flexmeasures/pull/360>`_]
* Support for percent (%) and permille (‰) sensor units [see `PR #359 <http://www.github.com/FlexMeasures/flexmeasures/pull/359>`_]

Bugfixes
Expand Down
1 change: 1 addition & 0 deletions documentation/cli/change_log.rst
Expand Up @@ -8,6 +8,7 @@ since v0.9.0 | January 26, 2022
=====================

* add CLI comands for showing data ``flexmeasures show accounts``, ``flexmeasures show account``, ``flexmeasures show roles``, ``flexmeasures show asset-types``, ``flexmeasures show asset`` and ``flexmeasures show data-sources``.
* Add ``flexmeasures db-ops resample-data`` CLI command to resample sensor data to a different resolution.


since v0.9.0 | January 26, 2022
Expand Down
99 changes: 96 additions & 3 deletions flexmeasures/cli/db_ops.py
@@ -1,15 +1,21 @@
"""CLI Tasks for saving, resetting, etc of the database"""

from datetime import datetime
from datetime import datetime, timedelta
import subprocess
from typing import List, Optional

from flask import current_app as app
from flask.cli import with_appcontext
import flask_migrate as migrate
import click
import pandas as pd

from flexmeasures.data import db
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.utils import save_to_db

BACKUP_PATH = app.config.get("FLEXMEASURES_DB_BACKUP_PATH")

BACKUP_PATH: str = app.config.get("FLEXMEASURES_DB_BACKUP_PATH") # type: ignore


@click.group("db-ops")
Expand Down Expand Up @@ -121,7 +127,7 @@ def restore(file: str):
"""

db_uri = app.config.get("SQLALCHEMY_DATABASE_URI")
db_uri: str = app.config.get("SQLALCHEMY_DATABASE_URI") # type: ignore
db_host_and_db_name = db_uri.split("@")[-1]
click.echo(f"Restoring {db_host_and_db_name} database from file {file}")
command_for_restoring = f"pg_restore -d {db_uri} {file}"
Expand All @@ -137,4 +143,91 @@ def restore(file: str):
click.echo("db restore unsuccessful")


@fm_db_ops.command("resample-data")
@with_appcontext
@click.option(
"--sensor-id",
"sensor_ids",
multiple=True,
required=True,
help="Resample data for this sensor. Follow up with the sensor's ID. This argument can be given multiple times.",
)
@click.option(
"--event-resolution",
"event_resolution_in_minutes",
type=int,
required=True,
help="New event resolution as an integer number of minutes.",
)
@click.option(
"--from",
"start_str",
required=False,
help="Resample only data from this datetime onwards. Follow up with a timezone-aware datetime in ISO 6801 format.",
)
@click.option(
"--until",
"end_str",
required=False,
help="Resample only data until this datetime. Follow up with a timezone-aware datetime in ISO 6801 format.",
)
@click.option(
"--skip-integrity-check",
is_flag=True,
help="Whether to skip checking the resampled time series data for each sensor."
" By default, an excerpt and the mean value of the original"
" and resampled data will be shown for manual approval.",
)
def resample_sensor_data(
sensor_ids: List[int],
event_resolution_in_minutes: int,
start_str: Optional[str] = None,
end_str: Optional[str] = None,
skip_integrity_check: bool = False,
):
"""Assign a new event resolution to an existing sensor and resample its data accordingly."""
event_resolution = timedelta(minutes=event_resolution_in_minutes)
event_starts_after = pd.Timestamp(start_str) # note that "" or None becomes NaT
event_ends_before = pd.Timestamp(end_str)
for sensor_id in sensor_ids:
sensor = Sensor.query.get(sensor_id)
if sensor.event_resolution == event_resolution:
print(f"{sensor} already has the desired event resolution.")
continue
df_original = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
).sort_values("event_start")
df_resampled = df_original.resample_events(event_resolution).sort_values(
"event_start"
)
if not skip_integrity_check:
message = ""
if sensor.event_resolution < event_resolution:
message += f"Downsampling {sensor} to {event_resolution} will result in a loss of data. "
click.confirm(
message
+ f"Data before:\n{df_original}\nData after:\n{df_resampled}\nMean before: {df_original['event_value'].mean()}\nMean after: {df_resampled['event_value'].mean()}\nContinue?",
abort=True,
)

# Update sensor
sensor.event_resolution = event_resolution
db.session.add(sensor)

# Update sensor data
query = TimedBelief.query.filter(TimedBelief.sensor == sensor)
if not pd.isnull(event_starts_after):
query = query.filter(TimedBelief.event_start >= event_starts_after)
if not pd.isnull(event_ends_before):
query = query.filter(
TimedBelief.event_start + sensor.event_resolution <= event_ends_before
)
query.delete()
save_to_db(df_resampled, bulk_save_objects=True)
db.session.commit()
print("Successfully resampled sensor data.")


app.cli.add_command(fm_db_ops)
79 changes: 79 additions & 0 deletions flexmeasures/cli/tests/test_db_ops.py
@@ -0,0 +1,79 @@
import pytest

import pandas as pd
import timely_beliefs as tb

from flexmeasures.cli.tests.utils import to_flags
from flexmeasures.data.models.time_series import Sensor, TimedBelief


@pytest.mark.skip_github
@pytest.mark.parametrize(
"event_starts_after, event_ends_before",
(
["", ""],
["2021-03-28 15:00:00+00:00", "2021-03-28 16:00:00+00:00"],
),
)
def test_resample_sensor_data(
app, db, setup_beliefs, event_starts_after: str, event_ends_before: str
):
"""Check resampling market data from hourly to 30 minute resolution and back."""

from flexmeasures.cli.db_ops import resample_sensor_data

sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
event_starts_after = pd.Timestamp(event_starts_after)
event_ends_before = pd.Timestamp(event_ends_before)
beliefs_before = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
)

# Check whether fixtures have flushed
assert sensor.id is not None

# Check whether we have all desired beliefs
query = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id)
if not pd.isnull(event_starts_after):
query = query.filter(TimedBelief.event_start >= event_starts_after)
if not pd.isnull(event_ends_before):
query = query.filter(
TimedBelief.event_start + sensor.event_resolution <= event_ends_before
)
all_beliefs_for_given_sensor = query.all()
pd.testing.assert_frame_equal(
tb.BeliefsDataFrame(all_beliefs_for_given_sensor), beliefs_before
)

cli_input = {
"sensor-id": sensor.id,
"event-resolution": sensor.event_resolution.seconds / 60 / 2,
}
runner = app.test_cli_runner()
result = runner.invoke(
resample_sensor_data, to_flags(cli_input) + ["--skip-integrity-check"]
)

# Check result for success
assert "Successfully resampled" in result.output

# Check that we now have twice as much data for this sensor
sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
beliefs_after = sensor.search_beliefs(
most_recent_beliefs_only=False,
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
)
assert len(beliefs_after) == 2 * len(beliefs_before)

# Checksum
assert beliefs_after["event_value"].sum() == 2 * beliefs_before["event_value"].sum()

# Resample back to original resolution (on behalf of the next test case)
cli_input["event-resolution"] = sensor.event_resolution.seconds / 60
result = runner.invoke(
resample_sensor_data, to_flags(cli_input) + ["--skip-integrity-check"]
)
assert "Successfully resampled" in result.output
5 changes: 5 additions & 0 deletions flexmeasures/data/utils.py
Expand Up @@ -51,6 +51,7 @@ def get_data_source(

def save_to_db(
data: Union[BeliefsDataFrame, List[BeliefsDataFrame]],
bulk_save_objects: bool = False,
save_changed_beliefs_only: bool = True,
) -> str:
"""Save the timed beliefs to the database.
Expand All @@ -75,6 +76,9 @@ def save_to_db(
Servers in 'play' mode are exempt from this rule, to facilitate replaying simulations.
:param data: BeliefsDataFrame (or a list thereof) to be saved
:param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
which is quite fast but has several caveats, see:
https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
:param save_changed_beliefs_only: if True, unchanged beliefs are skipped (updated beliefs are only stored if they represent changed beliefs)
if False, all updated beliefs are stored
:returns: status string, one of the following:
Expand Down Expand Up @@ -122,6 +126,7 @@ def save_to_db(
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
bulk_save_objects=bulk_save_objects,
allow_overwrite=current_app.config.get(
"FLEXMEASURES_ALLOW_DATA_OVERWRITE", False
),
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -36,7 +36,7 @@ siphon
tables
tabulate
timetomodel>=0.7.1
timely-beliefs>=1.11.0
timely-beliefs>=1.11.2
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -367,7 +367,7 @@ tabulate==0.8.9
# via -r requirements/app.in
threadpoolctl==3.0.0
# via scikit-learn
timely-beliefs==1.11.0
timely-beliefs==1.11.2
# via -r requirements/app.in
timetomodel==0.7.1
# via -r requirements/app.in
Expand Down

0 comments on commit 17acc1c

Please sign in to comment.