Skip to content

Commit

Permalink
Drop unchanged beliefs from BeliefsDataFrame before saving to db (#194)
Browse files Browse the repository at this point in the history
To avoid databases from filling up with irrelevant information, only beliefs data representing *changed beliefs are saved*, and *unchanged beliefs are dropped*.


* Create draft PR for #186

* Drop unchanged beliefs before saving BeliefsDataFrame to database

* Add test for updating belief time

* Add test for updating source

* Add a probabilistic belief

* Add workaround for groupby bug (copied from flexmeasures-entsoe)

* Drop probabilistic beliefs in their entirety or not at all

* Document tests

* Use timely beliefs dev release

* Use timely beliefs release

* Changelog entry

* Make dropping unchanged beliefs optional

Co-authored-by: Flix6x <Flix6x@users.noreply.github.com>
  • Loading branch information
Flix6x and Flix6x committed Oct 1, 2021
1 parent e42fa6e commit 664c958
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 11 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -21,6 +21,7 @@ Infrastructure / Support
----------------------
* FlexMeasures plugins can be Python packages now. We provide `a cookie-cutter template <https://github.com/SeitaBV/flexmeasures-plugin-template>`_ for this approach. [see `PR #182 <http://www.github.com/SeitaBV/flexmeasures/pull/182>`_]
* Set default timezone for new users using the FLEXMEASURES_TIMEZONE config setting [see `PR #190 <http://www.github.com/SeitaBV/flexmeasures/pull/190>`_]
* To avoid databases from filling up with irrelevant information, only beliefs data representing *changed beliefs are saved*, and *unchanged beliefs are dropped* [see `PR #194 <http://www.github.com/SeitaBV/flexmeasures/pull/194>`_]
* Monitored CLI tasks can get better names for identification [see `PR #193 <http://www.github.com/SeitaBV/flexmeasures/pull/193>`_]
* Less custom logfile location, document logging for devs [see `PR #196 <http://www.github.com/SeitaBV/flexmeasures/pull/196>`_]

Expand Down
22 changes: 22 additions & 0 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -16,6 +16,7 @@
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session
from flexmeasures.api.common.responses import (
unrecognized_sensor,
Expand Down Expand Up @@ -338,6 +339,7 @@ def get_weather_sensor_by(
def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:
"""Put the timed values into the database and enqueue forecasting jobs.
Expand All @@ -347,8 +349,28 @@ def save_to_db(
:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
timed_values.index = timed_values.index.droplevel(None)

if timed_values.empty:
current_app.logger.debug("Nothing new to save")
return already_received_and_successfully_processed()

current_app.logger.info("SAVING TO DB AND QUEUEING...")
try:
if isinstance(timed_values, BeliefsDataFrame):
Expand Down
25 changes: 24 additions & 1 deletion flexmeasures/conftest.py
Expand Up @@ -303,7 +303,30 @@ def setup_beliefs(db: SQLAlchemy, setup_markets, setup_sources) -> int:
event_value=21,
event_start="2021-03-28 16:00+01",
belief_horizon=timedelta(0),
)
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=21,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(0),
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=20,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(hours=2),
cp=0.2,
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=21,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(hours=2),
cp=0.5,
),
]
db.session.add_all(beliefs)
return len(beliefs)
Expand Down
21 changes: 13 additions & 8 deletions flexmeasures/data/services/time_series.py
Expand Up @@ -337,13 +337,18 @@ def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
compare_fields = ["event_start", "source", "cumulative_probability", "event_value"]
a = bdf.reset_index().set_index(compare_fields)
b = previous_most_recent_beliefs_in_db.reset_index().set_index(compare_fields)
bdf = (
a.drop(
b.index,
errors="ignore",
axis=0,
)
.reset_index()
.set_index(["event_start", "belief_time", "source", "cumulative_probability"])
bdf = a.drop(
b.index,
errors="ignore",
axis=0,
)

# Keep whole probabilistic beliefs, not just the parts that changed
c = bdf.reset_index().set_index(["event_start", "source"])
d = a.reset_index().set_index(["event_start", "source"])
bdf = d[d.index.isin(c.index)]

bdf = bdf.reset_index().set_index(
["event_start", "belief_time", "source", "cumulative_probability"]
)
return bdf
96 changes: 96 additions & 0 deletions flexmeasures/data/tests/test_time_series_services.py
@@ -0,0 +1,96 @@
import pandas as pd
from timely_beliefs import utils as tb_utils

from flexmeasures.api.common.utils.api_utils import save_to_db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor


def test_drop_unchanged_beliefs(setup_beliefs):
"""Trying to save beliefs that are already in the database shouldn't raise an error.
Even after updating the belief time, we expect to persist only the older belief time.
"""

# Set a reference for the number of beliefs stored and their belief times
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs()
num_beliefs_before = len(bdf)
belief_times_before = bdf.belief_times

# See what happens when storing all existing beliefs verbatim
save_to_db(bdf)

# Verify that no new beliefs were saved
bdf = sensor.search_beliefs()
assert len(bdf) == num_beliefs_before

# See what happens when storing all beliefs with their belief time updated
bdf = tb_utils.replace_multi_index_level(
bdf, "belief_time", bdf.belief_times + pd.Timedelta("1H")
)
save_to_db(bdf)

# Verify that no new beliefs were saved
bdf = sensor.search_beliefs()
assert len(bdf) == num_beliefs_before
assert list(bdf.belief_times) == list(belief_times_before)


def test_do_not_drop_beliefs_copied_by_another_source(setup_beliefs):
"""Trying to copy beliefs from one source to another should double the number of beliefs."""

# Set a reference for the number of beliefs stored
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs()
num_beliefs_before = len(bdf)

# See what happens when storing all belief with their source updated
new_source = DataSource(name="Not Seita", type="demo script")
bdf = tb_utils.replace_multi_index_level(
bdf, "source", pd.Index([new_source] * num_beliefs_before)
)
save_to_db(bdf)

# Verify that all the new beliefs were added
bdf = sensor.search_beliefs()
num_beliefs_after = len(bdf)
assert num_beliefs_after == 2 * num_beliefs_before


def test_do_not_drop_changed_probabilistic_belief(setup_beliefs):
"""Trying to save a changed probabilistic belief should result in saving the whole belief.
For example, given a belief that defines both cp=0.2 and cp=0.5,
if that belief becomes more certain (e.g. cp=0.3 and cp=0.5),
we expect to see the full new belief stored, rather than just the cp=0.3 value.
"""

# Set a reference for the number of beliefs stored
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs(source="Seita")
num_beliefs_before = len(bdf)

# See what happens when storing a belief with more certainty one hour later
old_belief = bdf.loc[
(
bdf.index.get_level_values("event_start")
== pd.Timestamp("2021-03-28 16:00:00+00:00")
)
& (
bdf.index.get_level_values("belief_time")
== pd.Timestamp("2021-03-28 14:00:00+00:00")
)
]
new_belief = tb_utils.replace_multi_index_level(
old_belief, "cumulative_probability", pd.Index([0.3, 0.5])
)
new_belief = tb_utils.replace_multi_index_level(
new_belief, "belief_time", new_belief.belief_times + pd.Timedelta("1H")
)
save_to_db(new_belief)

# Verify that the whole probabilistic belief was added
bdf = sensor.search_beliefs(source="Seita")
num_beliefs_after = len(bdf)
assert num_beliefs_after == num_beliefs_before + len(new_belief)
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.4.4
timely-beliefs>=1.4.5
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -319,7 +319,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==2.1.0
# via scikit-learn
timely-beliefs==1.4.4
timely-beliefs==1.4.5
# via -r requirements/app.in
timetomodel==0.6.9
# via -r requirements/app.in
Expand Down

0 comments on commit 664c958

Please sign in to comment.