diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 3ae875062..ae323c1a9 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -28,6 +28,7 @@ Infrastructure / Support * Remove bokeh dependency and obsolete UI views [see `PR #476 `_] * Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` not working in docker containers [see `PR #530 `_] and incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed [see `PR #526 `_] * Plugins can save BeliefsSeries, too, instead of just BeliefsDataFrames [see `PR #523 `_] +* Revised strategy for removing unchanged beliefs when saving data: retain the oldest measurement (ex-post belief), too [see `PR #518 `_] v0.11.3 | November 2, 2022 diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index 7d36299f0..4319c5a03 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -411,11 +411,7 @@ def save_to_db( if save_changed_beliefs_only: # Drop beliefs that haven't changed - timed_values = ( - timed_values.convert_index_from_belief_horizon_to_time() - .groupby(level=["belief_time", "source"], as_index=False) - .apply(drop_unchanged_beliefs) - ) + timed_values = drop_unchanged_beliefs(timed_values) # Work around bug in which groupby still introduces an index level, even though we asked it not to if None in timed_values.index.names: diff --git a/flexmeasures/data/services/time_series.py b/flexmeasures/data/services/time_series.py index 515fb8424..755fa6172 100644 --- a/flexmeasures/data/services/time_series.py +++ b/flexmeasures/data/services/time_series.py @@ -315,23 +315,68 @@ def set_bdf_source(bdf: tb.BeliefsDataFrame, source_name: str) -> tb.BeliefsData def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame: """Drop beliefs that are already stored in the database with an earlier belief time. + Also drop beliefs that are already in the data with an earlier belief time. + Quite useful function to prevent cluttering up your database with beliefs that remain unchanged over time. - Only works on BeliefsDataFrames with a unique belief time and unique source. """ if bdf.empty: return bdf - if len(bdf.lineage.belief_times) > 1: - raise NotImplementedError("Beliefs should share a unique belief time.") - if len(bdf.lineage.sources) > 1: - raise NotImplementedError("Beliefs should share a unique source.") + + # Save the oldest ex-post beliefs explicitly, even if they do not deviate from the most recent ex-ante beliefs + ex_ante_bdf = bdf[bdf.belief_horizons > timedelta(0)] + ex_post_bdf = bdf[bdf.belief_horizons <= timedelta(0)] + if not ex_ante_bdf.empty and not ex_post_bdf.empty: + # We treat each part separately to avoid the ex-post knowledge would be lost + ex_ante_bdf = drop_unchanged_beliefs(ex_ante_bdf) + ex_post_bdf = drop_unchanged_beliefs(ex_post_bdf) + bdf = pd.concat([ex_ante_bdf, ex_post_bdf]) + return bdf + + # Remove unchanged beliefs from within the new data itself + index_names = bdf.index.names + bdf = ( + bdf.sort_index() + .reset_index() + .drop_duplicates( + ["event_start", "source", "cumulative_probability", "event_value"], + keep="first", + ) + .set_index(index_names) + ) + + # Remove unchanged beliefs with respect to what is already stored in the database + return ( + bdf.convert_index_from_belief_horizon_to_time() + .groupby(level=["belief_time", "source"], as_index=False) + .apply(_drop_unchanged_beliefs_compared_to_db) + ) + + +def _drop_unchanged_beliefs_compared_to_db( + bdf: tb.BeliefsDataFrame, +) -> tb.BeliefsDataFrame: + """Drop beliefs that are already stored in the database with an earlier belief time. + + Assumes a BeliefsDataFrame with a unique belief time and unique source, + and either all ex-ante beliefs or all ex-post beliefs. + + It is preferable to call the public function drop_unchanged_beliefs instead. + """ + if bdf.belief_horizons[0] > timedelta(0): + # Look up only ex-ante beliefs (horizon > 0) + kwargs = dict(horizons_at_least=timedelta(0)) + else: + # Look up only ex-post beliefs (horizon <= 0) + kwargs = dict(horizons_at_most=timedelta(0)) previous_beliefs_in_db = bdf.sensor.search_beliefs( event_starts_after=bdf.event_starts[0], event_ends_before=bdf.event_ends[-1], beliefs_before=bdf.lineage.belief_times[0], # unique belief time source=bdf.lineage.sources[0], # unique source most_recent_beliefs_only=False, + **kwargs, ) - # todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/issues/97 + # todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/pull/117 previous_most_recent_beliefs_in_db = belief_utils.select_most_recent_belief( previous_beliefs_in_db ) diff --git a/flexmeasures/data/utils.py b/flexmeasures/data/utils.py index 90a83f9dc..194f8b10b 100644 --- a/flexmeasures/data/utils.py +++ b/flexmeasures/data/utils.py @@ -109,11 +109,7 @@ def save_to_db( if save_changed_beliefs_only: # Drop beliefs that haven't changed - timed_values = ( - timed_values.convert_index_from_belief_horizon_to_time() - .groupby(level=["belief_time", "source"], as_index=False) - .apply(drop_unchanged_beliefs) - ) + timed_values = drop_unchanged_beliefs(timed_values) len_after = len(timed_values) if len_after < len_before: status = "success_with_unchanged_beliefs_skipped"