Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop unchanged beliefs from BeliefsDataFrame before saving to db #194

1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -21,6 +21,7 @@ Infrastructure / Support
----------------------
* FlexMeasures plugins can be Python packages now. We provide `a cookie-cutter template <https://github.com/SeitaBV/flexmeasures-plugin-template>`_ for this approach. [see `PR #182 <http://www.github.com/SeitaBV/flexmeasures/pull/182>`_]
* Set default timezone for new users using the FLEXMEASURES_TIMEZONE config setting [see `PR #190 <http://www.github.com/SeitaBV/flexmeasures/pull/190>`_]
* To avoid databases from filling up with irrelevant information, only beliefs data representing *changed beliefs are saved*, and *unchanged beliefs are dropped* [see `PR #194 <http://www.github.com/SeitaBV/flexmeasures/pull/194>`_]
* Monitored CLI tasks can get better names for identification [see `PR #193 <http://www.github.com/SeitaBV/flexmeasures/pull/193>`_]
* Less custom logfile location, document logging for devs [see `PR #196 <http://www.github.com/SeitaBV/flexmeasures/pull/196>`_]

Expand Down
22 changes: 22 additions & 0 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -16,6 +16,7 @@
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session
from flexmeasures.api.common.responses import (
unrecognized_sensor,
Expand Down Expand Up @@ -338,6 +339,7 @@ def get_weather_sensor_by(
def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:
"""Put the timed values into the database and enqueue forecasting jobs.

Expand All @@ -347,8 +349,28 @@ def save_to_db(

:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
timed_values.index = timed_values.index.droplevel(None)

if timed_values.empty:
current_app.logger.debug("Nothing new to save")
return already_received_and_successfully_processed()

current_app.logger.info("SAVING TO DB AND QUEUEING...")
try:
if isinstance(timed_values, BeliefsDataFrame):
Expand Down
25 changes: 24 additions & 1 deletion flexmeasures/conftest.py
Expand Up @@ -303,7 +303,30 @@ def setup_beliefs(db: SQLAlchemy, setup_markets, setup_sources) -> int:
event_value=21,
event_start="2021-03-28 16:00+01",
belief_horizon=timedelta(0),
)
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=21,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(0),
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=20,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(hours=2),
cp=0.2,
),
TimedBelief(
sensor=sensor,
source=setup_sources["Seita"],
event_value=21,
event_start="2021-03-28 17:00+01",
belief_horizon=timedelta(hours=2),
cp=0.5,
),
]
db.session.add_all(beliefs)
return len(beliefs)
Expand Down
21 changes: 13 additions & 8 deletions flexmeasures/data/services/time_series.py
Expand Up @@ -337,13 +337,18 @@ def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
compare_fields = ["event_start", "source", "cumulative_probability", "event_value"]
a = bdf.reset_index().set_index(compare_fields)
b = previous_most_recent_beliefs_in_db.reset_index().set_index(compare_fields)
bdf = (
a.drop(
b.index,
errors="ignore",
axis=0,
)
.reset_index()
.set_index(["event_start", "belief_time", "source", "cumulative_probability"])
bdf = a.drop(
b.index,
errors="ignore",
axis=0,
)

# Keep whole probabilistic beliefs, not just the parts that changed
c = bdf.reset_index().set_index(["event_start", "source"])
d = a.reset_index().set_index(["event_start", "source"])
bdf = d[d.index.isin(c.index)]

bdf = bdf.reset_index().set_index(
["event_start", "belief_time", "source", "cumulative_probability"]
)
return bdf
96 changes: 96 additions & 0 deletions flexmeasures/data/tests/test_time_series_services.py
@@ -0,0 +1,96 @@
import pandas as pd
from timely_beliefs import utils as tb_utils

from flexmeasures.api.common.utils.api_utils import save_to_db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor


def test_drop_unchanged_beliefs(setup_beliefs):
"""Trying to save beliefs that are already in the database shouldn't raise an error.

Even after updating the belief time, we expect to persist only the older belief time.
"""

# Set a reference for the number of beliefs stored and their belief times
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs()
num_beliefs_before = len(bdf)
belief_times_before = bdf.belief_times

# See what happens when storing all existing beliefs verbatim
save_to_db(bdf)

# Verify that no new beliefs were saved
bdf = sensor.search_beliefs()
assert len(bdf) == num_beliefs_before

# See what happens when storing all beliefs with their belief time updated
bdf = tb_utils.replace_multi_index_level(
bdf, "belief_time", bdf.belief_times + pd.Timedelta("1H")
)
save_to_db(bdf)

# Verify that no new beliefs were saved
bdf = sensor.search_beliefs()
assert len(bdf) == num_beliefs_before
assert list(bdf.belief_times) == list(belief_times_before)


def test_do_not_drop_beliefs_copied_by_another_source(setup_beliefs):
"""Trying to copy beliefs from one source to another should double the number of beliefs."""

# Set a reference for the number of beliefs stored
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs()
num_beliefs_before = len(bdf)

# See what happens when storing all belief with their source updated
new_source = DataSource(name="Not Seita", type="demo script")
bdf = tb_utils.replace_multi_index_level(
bdf, "source", pd.Index([new_source] * num_beliefs_before)
)
save_to_db(bdf)

# Verify that all the new beliefs were added
bdf = sensor.search_beliefs()
num_beliefs_after = len(bdf)
assert num_beliefs_after == 2 * num_beliefs_before


def test_do_not_drop_changed_probabilistic_belief(setup_beliefs):
"""Trying to save a changed probabilistic belief should result in saving the whole belief.

For example, given a belief that defines both cp=0.2 and cp=0.5,
if that belief becomes more certain (e.g. cp=0.3 and cp=0.5),
we expect to see the full new belief stored, rather than just the cp=0.3 value.
"""

# Set a reference for the number of beliefs stored
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf = sensor.search_beliefs(source="Seita")
num_beliefs_before = len(bdf)

# See what happens when storing a belief with more certainty one hour later
old_belief = bdf.loc[
(
bdf.index.get_level_values("event_start")
== pd.Timestamp("2021-03-28 16:00:00+00:00")
)
& (
bdf.index.get_level_values("belief_time")
== pd.Timestamp("2021-03-28 14:00:00+00:00")
)
]
new_belief = tb_utils.replace_multi_index_level(
old_belief, "cumulative_probability", pd.Index([0.3, 0.5])
)
new_belief = tb_utils.replace_multi_index_level(
new_belief, "belief_time", new_belief.belief_times + pd.Timedelta("1H")
)
save_to_db(new_belief)

# Verify that the whole probabilistic belief was added
bdf = sensor.search_beliefs(source="Seita")
num_beliefs_after = len(bdf)
assert num_beliefs_after == num_beliefs_before + len(new_belief)
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.4.4
timely-beliefs>=1.4.5
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -319,7 +319,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==2.1.0
# via scikit-learn
timely-beliefs==1.4.4
timely-beliefs==1.4.5
# via -r requirements/app.in
timetomodel==0.6.9
# via -r requirements/app.in
Expand Down