Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unchanged ex-ante and ex-post beliefs #518

Merged
merged 12 commits into from Nov 17, 2022
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -27,6 +27,7 @@ Infrastructure / Support
* Remove bokeh dependency and obsolete UI views [see `PR #476 <http://www.github.com/FlexMeasures/flexmeasures/pull/476>`_]
* Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed [see `PR #526 <http://www.github.com/FlexMeasures/flexmeasures/pull/526>`_]
* Plugins can save BeliefsSeries, too, instead of just BeliefsDataFrames [see `PR #523 <http://www.github.com/FlexMeasures/flexmeasures/pull/523>`_]
* Revised strategy for removing unchanged beliefs when saving data: retain the oldest measurement (ex-post belief), too [see `PR #518 <http://www.github.com/FlexMeasures/flexmeasures/pull/518>`_]


v0.11.3 | November 2, 2022
Expand Down
6 changes: 1 addition & 5 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -411,11 +411,7 @@ def save_to_db(

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)
timed_values = drop_unchanged_beliefs(timed_values)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
Expand Down
57 changes: 51 additions & 6 deletions flexmeasures/data/services/time_series.py
Expand Up @@ -315,23 +315,68 @@ def set_bdf_source(bdf: tb.BeliefsDataFrame, source_name: str) -> tb.BeliefsData
def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
"""Drop beliefs that are already stored in the database with an earlier belief time.

Also drop beliefs that are already in the data with an earlier belief time.

Quite useful function to prevent cluttering up your database with beliefs that remain unchanged over time.
Only works on BeliefsDataFrames with a unique belief time and unique source.
"""
if bdf.empty:
return bdf
if len(bdf.lineage.belief_times) > 1:
nhoening marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError("Beliefs should share a unique belief time.")
if len(bdf.lineage.sources) > 1:
raise NotImplementedError("Beliefs should share a unique source.")

# Save the oldest ex-post beliefs explicitly, even if they do not deviate from the most recent ex-ante beliefs
ex_ante_bdf = bdf[bdf.belief_horizons > timedelta(0)]
ex_post_bdf = bdf[bdf.belief_horizons <= timedelta(0)]
if not ex_ante_bdf.empty and not ex_post_bdf.empty:
# We treat each part separately to avoid the ex-post knowledge would be lost
ex_ante_bdf = drop_unchanged_beliefs(ex_ante_bdf)
ex_post_bdf = drop_unchanged_beliefs(ex_post_bdf)
bdf = pd.concat([ex_ante_bdf, ex_post_bdf])
return bdf

# Remove unchanged beliefs from within the new data itself
index_names = bdf.index.names
bdf = (
bdf.sort_index()
.reset_index()
.drop_duplicates(
["event_start", "source", "cumulative_probability", "event_value"],
keep="first",
)
.set_index(index_names)
)

nhoening marked this conversation as resolved.
Show resolved Hide resolved
# Remove unchanged beliefs with respect to what is already stored in the database
return (
bdf.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(_drop_unchanged_beliefs_compared_to_db)
)


def _drop_unchanged_beliefs_compared_to_db(
bdf: tb.BeliefsDataFrame,
) -> tb.BeliefsDataFrame:
"""Drop beliefs that are already stored in the database with an earlier belief time.

Assumes a BeliefsDataFrame with a unique belief time and unique source,
and either all ex-ante beliefs or all ex-post beliefs.

It is preferable to call the public function drop_unchanged_beliefs instead.
"""
if bdf.belief_horizons[0] > timedelta(0):
# Look up only ex-ante beliefs (horizon > 0)
kwargs = dict(horizons_at_least=timedelta(0))
else:
# Look up only ex-post beliefs (horizon <= 0)
kwargs = dict(horizons_at_most=timedelta(0))
previous_beliefs_in_db = bdf.sensor.search_beliefs(
event_starts_after=bdf.event_starts[0],
event_ends_before=bdf.event_ends[-1],
beliefs_before=bdf.lineage.belief_times[0], # unique belief time
source=bdf.lineage.sources[0], # unique source
most_recent_beliefs_only=False,
**kwargs,
)
# todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/issues/97
# todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/pull/117
previous_most_recent_beliefs_in_db = belief_utils.select_most_recent_belief(
previous_beliefs_in_db
)
Expand Down
6 changes: 1 addition & 5 deletions flexmeasures/data/utils.py
Expand Up @@ -109,11 +109,7 @@ def save_to_db(
if save_changed_beliefs_only:

# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)
timed_values = drop_unchanged_beliefs(timed_values)
len_after = len(timed_values)
if len_after < len_before:
status = "success_with_unchanged_beliefs_skipped"
Expand Down