Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#574 Avoid Recomputing Schedules #616

Merged
merged 39 commits into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
21d7b3b
Issue #575
victorgarcia98 Mar 27, 2023
c715030
Fixing typos and changing hashing function.
victorgarcia98 Mar 28, 2023
d1d2f5d
Updating hash function: now classes with the method make_hashable can…
victorgarcia98 Mar 28, 2023
fcc3345
The decorator allows creating new jobs in case that the previous call…
victorgarcia98 Mar 28, 2023
d24ef38
Importing annotations from __future__ for typing of functions arguments.
victorgarcia98 Mar 28, 2023
c39d96e
Add more tests to handle ordering in dictionaries and in arrays.
victorgarcia98 Mar 28, 2023
61b8d40
Add test to check if serialized dictionaries (json strings) order yie…
victorgarcia98 Mar 28, 2023
8c28fe0
Adding PR to changelog.rst
victorgarcia98 Mar 28, 2023
f80bb9d
Adding force_new_job_creation into @redis_cache decorator to force th…
victorgarcia98 Mar 30, 2023
fbc350d
Simplyfying access to optional keyword argument.
victorgarcia98 Mar 30, 2023
495a03b
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 30, 2023
7c1271e
Main changes:
victorgarcia98 Mar 31, 2023
688b53e
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
d5c6340
Fixing some typos.
victorgarcia98 Mar 31, 2023
8429438
Merge remote-tracking branch 'origin/574-avoid-recomputing-schedules'…
victorgarcia98 Mar 31, 2023
d4c59a0
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
eddfa59
Getting generic_asset attributes right
victorgarcia98 Apr 4, 2023
9a75316
Fixing comments grammar and adding type to the @job_cache decorator.
victorgarcia98 Apr 4, 2023
7dd0df3
Adding a better changelog entry for this PR.
victorgarcia98 Apr 4, 2023
31f6228
Implementation + Tests of the requeueing feature.
victorgarcia98 Apr 5, 2023
371df95
Adding clarifications and argument description in docstrings.
victorgarcia98 Apr 6, 2023
33800a1
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 6, 2023
7f471f0
Clarify arguments in docstring
Flix6x Apr 8, 2023
cce93db
Decorator docstring should list decorator arguments, rather than argu…
Flix6x Apr 8, 2023
b78c9f9
Refactor: simpler lines of code
Flix6x Apr 8, 2023
7d41613
Remove redundant lines
Flix6x Apr 8, 2023
e1e1e88
Make the @job_cache decorator agnostic to whichever queue is passed
Flix6x Apr 8, 2023
1f4b624
Clarification: "requeue" instead of "re-enqueue", because "enqueuing"…
Flix6x Apr 8, 2023
aee155b
Add missing argument and explain how function arguments are used
Flix6x Apr 8, 2023
6acd9cf
Hashes are not stored under queues
Flix6x Apr 8, 2023
b6c60a6
Remove redundant queue (name) argument from @job_cache
Flix6x Apr 8, 2023
478fe5c
Adding TTL to job caching keys configured through the config variable…
victorgarcia98 Apr 10, 2023
86a865c
Adding function and queue names to the hash of the Job creation call.
victorgarcia98 Apr 10, 2023
e07b302
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 11, 2023
5a6fe4d
Adding FLEXMEASURES_JOB_CACHE_TTL into the config settings docs.
victorgarcia98 Apr 11, 2023
0a316ba
Merge remote-tracking branch 'origin/main' into 574-avoid-recomputing…
victorgarcia98 Apr 12, 2023
570fabf
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 13, 2023
a694443
PR: Avoid redundantly recomputing jobs that are triggered without a r…
victorgarcia98 Apr 13, 2023
bec16e2
Revert "PR: Avoid redundantly recomputing jobs that are triggered wit…
victorgarcia98 Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -13,6 +13,7 @@ New features
* Keyboard control over replay [see `PR #562 <https://www.github.com/FlexMeasures/flexmeasures/pull/562>`_]
* The ``FLEXMEASURES_MAX_PLANNING_HORIZON`` config setting can also be set as an integer number of planning steps rather than just as a fixed duration, which makes it possible to schedule further ahead in coarser time steps [see `PR #583 <https://www.github.com/FlexMeasures/flexmeasures/pull/583>`_]
* Different text styles for CLI output for errors, warnings or success messages. [see `PR #609 <https://www.github.com/FlexMeasures/flexmeasures/pull/609>`_]
* Avoid Recomputing Schedules[see `PR #616 <https://www.github.com/FlexMeasures/flexmeasures/pull/616>`_]
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

Bugfixes
-----------
Expand Down
11 changes: 11 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -524,6 +524,17 @@ def find_closest(
else:
return query.limit(n).all()

def make_hashable(self) -> tuple:
"""Returns a tuple with the properties subject to change
In principle all properties (except ID) of a given sensor could be changed, but not all changes are relevant to warrant reanalysis (e.g. scheduling or forecasting).
"""

generic_asset_attributes = getattr(self.generic_asset, "attribute", None)
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

parameters = [self.id, self.attributes, generic_asset_attributes]
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

return tuple(parameters)
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
"""A timed belief holds a precisely timed record of a belief about an event.
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/services/scheduling.py
Expand Up @@ -19,8 +19,10 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.utils import get_data_source, save_to_db
from flexmeasures.utils.time_utils import server_now
from flexmeasures.data.services.utils import job_cache


@job_cache("scheduling")
def create_scheduling_job(
sensor: Sensor,
job_id: str | None = None,
Expand Down
110 changes: 110 additions & 0 deletions flexmeasures/data/services/utils.py
@@ -0,0 +1,110 @@
import functools
from flask import current_app
from rq.job import Job

import hashlib
import base64


def make_hash_sha256(o):
"""
SHA256 instead of Python's hash function because apparently, python native hashing function
yields different results on restarts.
Source: https://stackoverflow.com/a/42151923
"""
hasher = hashlib.sha256()
hasher.update(repr(make_hashable(o)).encode())
return base64.b64encode(hasher.digest()).decode()


def make_hashable(o):
"""
Function to create hashes for dictionaries with nested objects
Source: https://stackoverflow.com/a/42151923
"""
if isinstance(o, (tuple, list)):
return tuple((make_hashable(e) for e in o))

if isinstance(o, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in o.items()))

if isinstance(o, (set, frozenset)):
return tuple(sorted(make_hashable(e) for e in o))

if callable(
getattr(o, "make_hashable", None)
): # checks if the object o has the method make_hashable
return o.make_hashable()
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

return o


def hash_function_arguments(args, kwags):
"""Combines the hashes of the args and kargs

The way to go to do h(x,y) = hash(hash(x) || hash(y)) because it avoid the following:

1) h(x,y) = hash(x || y), might create a collision if we delete the last n characters of x and we append them in front of y. e.g h("abc", "d") = h("ab", "cd")
2) we don't want to sort x and y, because we need the function h(x,y) != h(y,x)
3) extra hashing just avoid that we can't decompose the input arguments and track if the same args or kwarg are called several times. More of a security measure I think.

source: https://crypto.stackexchange.com/questions/55162/best-way-to-hash-two-values-into-one
"""
return make_hash_sha256(
make_hash_sha256(args) + make_hash_sha256(kwags)
) # concat two hashes


def job_cache(queue):
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
"""
To avoid recomputing the same task multiple times, this decortator checks if the function was already been called with the
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
same arguments. Input arguments are hashed and stored as Redis keys with values the job ids (`input_arguments_hash:job_id`).
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

The benefits of using redis to store the input arguments over a local cache, such as LRU Cache, are:
1) It will work in distributed environments (in computing clusters), where multiple workers will avoid repeating
work as the cache will be shared across them.
2) Cached calls are logged, which means that we can easily debug.
3) Cache will still be there on restarts.
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):

# checking if force is an input argument of `func`
force_new_job_creation = kwargs.pop("force_new_job_creation", False)
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

# creating a hash from args and kwargs
args_hash = hash_function_arguments(args, kwargs)

# check if the key hash exists in the redis equeue
if (
current_app.queues[queue].connection.exists(args_hash)
and not force_new_job_creation
):
current_app.logger.info(
f"The function {func.__name__} has been called already with the same arguments. Skipping..."
)

# get job id
job_id = current_app.queues[queue].connection.get(args_hash).decode()

job = Job.fetch(
job_id, connection=current_app.queues[queue].connection
) # get job object from job id

return job # returning the same job regardless of the status (SUCCESS, FAILED, ...)
else:
# if the job hasn't been called before or the job has failed -> create job
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
job = func(*args, **kwargs) # create a new job

# store function call in redis
current_app.queues[queue].connection.set(
args_hash, job.id
) # setting return value of function call to the hash of its inputs
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

return job

return wrapper

return decorator
1 change: 0 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs.py
@@ -1,4 +1,3 @@
# flake8: noqa: E402
from datetime import datetime, timedelta
import os

Expand Down