Skip to content

Commit

Permalink
1) prevent more redundant data from entering the database, and 2) ret…
Browse files Browse the repository at this point in the history
…ain data that one might usually consider not to be redundant: the oldest ex-post belief.

Unchanged ex-ante and ex-post beliefs (#518)

* Refactor: move groupby statement into util function, and use private function

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Remove unchanged beliefs from within new data itself

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Drop unchanged beliefs separately for ex-ante and ex-post beliefs

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Remove now false statement

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Remove explicit checks for private function

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Update todo

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Also apply separate ex-ante and ex-post look-ups to previously stored beliefs

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Update docstring

Signed-off-by: F.N. Claessen <felix@seita.nl>

* changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Improve inline comments

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Clarify use of private util function

Signed-off-by: F.N. Claessen <felix@seita.nl>

Signed-off-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x committed Nov 17, 2022
1 parent 06568b8 commit 35591a3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 16 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -28,6 +28,7 @@ Infrastructure / Support
* Remove bokeh dependency and obsolete UI views [see `PR #476 <http://www.github.com/FlexMeasures/flexmeasures/pull/476>`_]
* Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` not working in docker containers [see `PR #530 <http://www.github.com/FlexMeasures/flexmeasures/pull/530>`_] and incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed [see `PR #526 <http://www.github.com/FlexMeasures/flexmeasures/pull/526>`_]
* Plugins can save BeliefsSeries, too, instead of just BeliefsDataFrames [see `PR #523 <http://www.github.com/FlexMeasures/flexmeasures/pull/523>`_]
* Revised strategy for removing unchanged beliefs when saving data: retain the oldest measurement (ex-post belief), too [see `PR #518 <http://www.github.com/FlexMeasures/flexmeasures/pull/518>`_]


v0.11.3 | November 2, 2022
Expand Down
6 changes: 1 addition & 5 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -411,11 +411,7 @@ def save_to_db(

if save_changed_beliefs_only:
# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)
timed_values = drop_unchanged_beliefs(timed_values)

# Work around bug in which groupby still introduces an index level, even though we asked it not to
if None in timed_values.index.names:
Expand Down
57 changes: 51 additions & 6 deletions flexmeasures/data/services/time_series.py
Expand Up @@ -315,23 +315,68 @@ def set_bdf_source(bdf: tb.BeliefsDataFrame, source_name: str) -> tb.BeliefsData
def drop_unchanged_beliefs(bdf: tb.BeliefsDataFrame) -> tb.BeliefsDataFrame:
"""Drop beliefs that are already stored in the database with an earlier belief time.
Also drop beliefs that are already in the data with an earlier belief time.
Quite useful function to prevent cluttering up your database with beliefs that remain unchanged over time.
Only works on BeliefsDataFrames with a unique belief time and unique source.
"""
if bdf.empty:
return bdf
if len(bdf.lineage.belief_times) > 1:
raise NotImplementedError("Beliefs should share a unique belief time.")
if len(bdf.lineage.sources) > 1:
raise NotImplementedError("Beliefs should share a unique source.")

# Save the oldest ex-post beliefs explicitly, even if they do not deviate from the most recent ex-ante beliefs
ex_ante_bdf = bdf[bdf.belief_horizons > timedelta(0)]
ex_post_bdf = bdf[bdf.belief_horizons <= timedelta(0)]
if not ex_ante_bdf.empty and not ex_post_bdf.empty:
# We treat each part separately to avoid the ex-post knowledge would be lost
ex_ante_bdf = drop_unchanged_beliefs(ex_ante_bdf)
ex_post_bdf = drop_unchanged_beliefs(ex_post_bdf)
bdf = pd.concat([ex_ante_bdf, ex_post_bdf])
return bdf

# Remove unchanged beliefs from within the new data itself
index_names = bdf.index.names
bdf = (
bdf.sort_index()
.reset_index()
.drop_duplicates(
["event_start", "source", "cumulative_probability", "event_value"],
keep="first",
)
.set_index(index_names)
)

# Remove unchanged beliefs with respect to what is already stored in the database
return (
bdf.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(_drop_unchanged_beliefs_compared_to_db)
)


def _drop_unchanged_beliefs_compared_to_db(
bdf: tb.BeliefsDataFrame,
) -> tb.BeliefsDataFrame:
"""Drop beliefs that are already stored in the database with an earlier belief time.
Assumes a BeliefsDataFrame with a unique belief time and unique source,
and either all ex-ante beliefs or all ex-post beliefs.
It is preferable to call the public function drop_unchanged_beliefs instead.
"""
if bdf.belief_horizons[0] > timedelta(0):
# Look up only ex-ante beliefs (horizon > 0)
kwargs = dict(horizons_at_least=timedelta(0))
else:
# Look up only ex-post beliefs (horizon <= 0)
kwargs = dict(horizons_at_most=timedelta(0))
previous_beliefs_in_db = bdf.sensor.search_beliefs(
event_starts_after=bdf.event_starts[0],
event_ends_before=bdf.event_ends[-1],
beliefs_before=bdf.lineage.belief_times[0], # unique belief time
source=bdf.lineage.sources[0], # unique source
most_recent_beliefs_only=False,
**kwargs,
)
# todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/issues/97
# todo: delete next line and set most_recent_beliefs_only=True when this is resolved: https://github.com/SeitaBV/timely-beliefs/pull/117
previous_most_recent_beliefs_in_db = belief_utils.select_most_recent_belief(
previous_beliefs_in_db
)
Expand Down
6 changes: 1 addition & 5 deletions flexmeasures/data/utils.py
Expand Up @@ -109,11 +109,7 @@ def save_to_db(
if save_changed_beliefs_only:

# Drop beliefs that haven't changed
timed_values = (
timed_values.convert_index_from_belief_horizon_to_time()
.groupby(level=["belief_time", "source"], as_index=False)
.apply(drop_unchanged_beliefs)
)
timed_values = drop_unchanged_beliefs(timed_values)
len_after = len(timed_values)
if len_after < len_before:
status = "success_with_unchanged_beliefs_skipped"
Expand Down

0 comments on commit 35591a3

Please sign in to comment.