Skip to content

Commit

Permalink
#574 Avoid Recomputing Schedules (#616)
Browse files Browse the repository at this point in the history
* Issue #575
This commit includes the definition of the decorator @redis_cache, which allows to cache job creation whe a function is called with the same arguments.

Signed-off-by: victor <victor@seita.nl>

* Fixing typos and changing hashing function.

Signed-off-by: victor <victor@seita.nl>

* Updating hash function: now classes with the method make_hashable can provide a tuple with the paramterers to be considered for the hash. Added some extra info to the docstrings.

Signed-off-by: victor <victor@seita.nl>

* The decorator allows creating new jobs in case that the previous call has failed.

Signed-off-by: victor <victor@seita.nl>

* Importing annotations from __future__ for typing of functions arguments.

Signed-off-by: victor <victor@seita.nl>

* Add more tests to handle ordering in dictionaries and in arrays.

Signed-off-by: victor <victor@seita.nl>

* Add test to check if serialized dictionaries (json strings) order yields different hashes.

Signed-off-by: victor <victor@seita.nl>

* Adding PR to changelog.rst

Signed-off-by: victor <victor@seita.nl>

* Adding force_new_job_creation into @redis_cache decorator to force the creation of a new job.

Signed-off-by: victor <victor@seita.nl>

* Simplyfying access to optional keyword argument.

Signed-off-by: victor <victor@seita.nl>

* Main changes:
Fixed some typos.
Simplified a test.
Renamed @redis_cache -> @job_cache.
Updated docstrings.

Signed-off-by: victor <victor@seita.nl>

* Fixing some typos.

Signed-off-by: victor <victor@seita.nl>

* Getting generic_asset attributes right

Signed-off-by: victor <victor@seita.nl>

* Fixing comments grammar and adding type to the @job_cache decorator.

Signed-off-by: victor <victor@seita.nl>

* Adding a better changelog entry for this PR.

Signed-off-by: victor <victor@seita.nl>

* Implementation + Tests of the requeueing feature.

Signed-off-by: victor <victor@seita.nl>

* Adding clarifications and argument description in docstrings.

Signed-off-by: victor <victor@seita.nl>

* Clarify arguments in docstring

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Decorator docstring should list decorator arguments, rather than arguments of the function being decorated

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Refactor: simpler lines of code

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Remove redundant lines

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Make the @job_cache decorator agnostic to whichever queue is passed

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Clarification: "requeue" instead of "re-enqueue", because "enqueuing" assigns a queue, and in case of "requeuing", a queue doesn't need to be reassigned; the job already knows which queue it should go in when it is requeued.

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Add missing argument and explain how function arguments are used

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Hashes are not stored under queues

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Remove redundant queue (name) argument from @job_cache

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Adding TTL to job caching keys configured through the config variable FLEXMEASURES_JOB_CACHE_TTL.

Signed-off-by: victor <victor@seita.nl>

* Adding function and queue names to the hash of the Job creation call.

Signed-off-by: victor <victor@seita.nl>

* Adding FLEXMEASURES_JOB_CACHE_TTL into the config settings docs.

Signed-off-by: victor <victor@seita.nl>

* PR: Avoid redundantly recomputing jobs that are triggered without a relevant state change.

Squashed commit of the following:

commit 79ef71a
Author: victor <victor@seita.nl>
Date:   Thu Mar 30 15:38:55 2023 +0200

    Fixing typos on @deprecated decorator and trigger warnings through loggere.

    Signed-off-by: victor <victor@seita.nl>

commit 7faa6e6
Author: victor <victor@seita.nl>
Date:   Wed Mar 29 23:28:00 2023 +0200

    Adding version of sunset of a function/method to the @deprecated decorator

    Signed-off-by: victor <victor@seita.nl>

commit fe228d2
Author: victor <victor@seita.nl>
Date:   Mon Mar 27 11:57:32 2023 +0200

    Getting location of the new funciton directly from the importable object.

    Signed-off-by: victor <victor@seita.nl>

commit c1be8db
Author: victor <victor@seita.nl>
Date:   Sun Mar 26 23:26:28 2023 +0200

    Adding deprecated messages for the functions that were moved.

    Signed-off-by: victor <victor@seita.nl>

commit ac4a232
Author: victor <victor@seita.nl>
Date:   Thu Mar 23 22:29:48 2023 +0100

    Issue #599

    Forgot to add `from __future__ import annotations`. Local testing worked as I'm uing Python v3.10.

    Signed-off-by: victor <victor@seita.nl>

commit 3ff0571
Author: victor <victor@seita.nl>
Date:   Thu Mar 23 22:02:11 2023 +0100

    Issue #599
    Moving get_asset_group_queries from data/services to data/queries

    Signed-off-by: victor <victor@seita.nl>

commit 53fc214
Author: victor <victor@seita.nl>
Date:   Thu Mar 23 20:30:31 2023 +0100

    Issue #599
    Moving DataSources fetching from query to services.

    Signed-off-by: victor <victor@seita.nl>

Signed-off-by: victor <victor@seita.nl>

* Revert "PR: Avoid redundantly recomputing jobs that are triggered without a relevant state change."

This reverts commit a694443.

---------

Signed-off-by: victor <victor@seita.nl>
Signed-off-by: F.N. Claessen <felix@seita.nl>
Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
victorgarcia98 and Flix6x committed Apr 14, 2023
1 parent dc99c5b commit fd62ed3
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 10 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -16,6 +16,7 @@ New features
* Overlay charts (e.g. power profiles) on the asset page using the `sensors_to_show` attribute, and distinguish plots by source (different trace), sensor (different color) and source type (different stroke dash) [see `PR #534 <https://www.github.com/FlexMeasures/flexmeasures/pull/534>`_]
* The ``FLEXMEASURES_MAX_PLANNING_HORIZON`` config setting can also be set as an integer number of planning steps rather than just as a fixed duration, which makes it possible to schedule further ahead in coarser time steps [see `PR #583 <https://www.github.com/FlexMeasures/flexmeasures/pull/583>`_]
* Different text styles for CLI output for errors, warnings or success messages. [see `PR #609 <https://www.github.com/FlexMeasures/flexmeasures/pull/609>`_]
* Avoid redundantly recomputing jobs that are triggered without a relevant state change. `FLEXMEASURES_JOB_CACHE_TTL` config setting defines the time in which the jobs with the same arguments are not being recomputed. [see `PR #616 <https://www.github.com/FlexMeasures/flexmeasures/pull/616>`_]

Bugfixes
-----------
Expand Down
17 changes: 17 additions & 0 deletions documentation/configuration.rst
Expand Up @@ -247,6 +247,23 @@ Time to live for schedule UUIDs of successful scheduling jobs. Set a negative ti

Default: ``timedelta(days=7)``

FLEXMEASURES_JOB_CACHE_TTL
^^^^^^^^^^^^^^^^^^^^^^^^^^

Time to live for the job caching keys in seconds. The default value of 1h responds to the reality that within an hour, there is not
much change, other than the input arguments, that justifies recomputing the schedules.

In an hour, we will have more accurate forecasts available and the situation of the power grid
might have changed (imbalance prices, distribution level congestion, activation of FCR or aFRR reserves, ...).

Set a negative value to persist forever.

.. warning::
Keep in mind that unless a proper clean up mechanism is set up, the number of
caching keys will grow with time if the TTL is set to a negative value.

Default: ``3600``


.. _planning_horizon_config:

Expand Down
14 changes: 6 additions & 8 deletions flexmeasures/app.py
Expand Up @@ -67,10 +67,7 @@ def create(
if app.testing:
from fakeredis import FakeStrictRedis

app.queues = dict(
forecasting=Queue(connection=FakeStrictRedis(), name="forecasting"),
scheduling=Queue(connection=FakeStrictRedis(), name="scheduling"),
)
redis_conn = FakeStrictRedis()
else:
redis_conn = Redis(
app.config["FLEXMEASURES_REDIS_URL"],
Expand All @@ -83,10 +80,11 @@ def create(
redis_conn = Redis("MY-DB-NAME", unix_socket_path="/tmp/my-redis.socket",
)
"""
app.queues = dict(
forecasting=Queue(connection=redis_conn, name="forecasting"),
scheduling=Queue(connection=redis_conn, name="scheduling"),
)
app.redis_connection = redis_conn
app.queues = dict(
forecasting=Queue(connection=redis_conn, name="forecasting"),
scheduling=Queue(connection=redis_conn, name="scheduling"),
)

# Some basic security measures

Expand Down
7 changes: 7 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -525,6 +525,13 @@ def find_closest(
else:
return query.limit(n).all()

def make_hashable(self) -> tuple:
"""Returns a tuple with the properties subject to change
In principle all properties (except ID) of a given sensor could be changed, but not all changes are relevant to warrant reanalysis (e.g. scheduling or forecasting).
"""

return (self.id, self.attributes, self.generic_asset.attributes)


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
"""A timed belief holds a precisely timed record of a belief about an event.
Expand Down
23 changes: 22 additions & 1 deletion flexmeasures/data/services/scheduling.py
Expand Up @@ -19,12 +19,16 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.utils import get_data_source, save_to_db
from flexmeasures.utils.time_utils import server_now
from flexmeasures.data.services.utils import job_cache


@job_cache("scheduling")
def create_scheduling_job(
sensor: Sensor,
job_id: str | None = None,
enqueue: bool = True,
requeue: bool = False,
force_new_job_creation: bool = False,
**scheduler_kwargs,
) -> Job:
"""
Expand All @@ -39,6 +43,17 @@ def create_scheduling_job(
1. A scheduling job is born here (in create_scheduling_job).
2. It is run in make_schedule which writes results to the db.
3. If an error occurs (and the worker is configured accordingly), handle_scheduling_exception comes in.
Arguments:
:param sensor: sensor for which the schedule is computed
:param job_id: optionally, set a job id explicitly
:param enqueue: if True, enqueues the job in case it is new
:param requeue: if True, requeues the job in case it is not new and had previously failed
(this argument is used by the @job_cache decorator)
:param force_new_job_creation: if True, this attribute forces a new job to be created (skipping cache)
(this argument is used by the @job_cache decorator)
:returns: the job
"""
# We first create a scheduler and check if deserializing works, so the flex config is checked
# and errors are raised before the job is enqueued (so users get a meaningful response right away).
Expand All @@ -62,7 +77,12 @@ def create_scheduling_job(
).total_seconds()
), # NB job.cleanup docs says a negative number of seconds means persisting forever
)
if enqueue:

# in case the function enqueues it
job_status = job.get_status(refresh=True)

# with job_status=None, we ensure that only fresh new jobs are enqueued (in the contrary they should be requeued)
if enqueue and not job_status:
current_app.queues["scheduling"].enqueue_job(job)

return job
Expand All @@ -78,6 +98,7 @@ def make_schedule(
flex_context: dict | None = None,
flex_config_has_been_deserialized: bool = False,
) -> bool:

"""
This function computes a schedule. It returns True if it ran successfully.
Expand Down
121 changes: 121 additions & 0 deletions flexmeasures/data/services/utils.py
@@ -1,9 +1,14 @@
from __future__ import annotations

import hashlib
import base64
from typing import Type
import functools

import click
from sqlalchemy import JSON, String, cast, literal
from flask import current_app
from rq.job import Job

from flexmeasures import Sensor
from flexmeasures.data import db
Expand Down Expand Up @@ -64,3 +69,119 @@ def get_or_create_model(
click.echo(f"Created {model}")
db.session.add(model)
return model


def make_hash_sha256(o):
"""
SHA256 instead of Python's hash function because apparently, python native hashing function
yields different results on restarts.
Source: https://stackoverflow.com/a/42151923
"""
hasher = hashlib.sha256()
hasher.update(repr(make_hashable(o)).encode())
return base64.b64encode(hasher.digest()).decode()


def make_hashable(o):
"""
Function to create hashes for dictionaries with nested objects
Source: https://stackoverflow.com/a/42151923
"""
if isinstance(o, (tuple, list)):
return tuple((make_hashable(e) for e in o))

if isinstance(o, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in o.items()))

if isinstance(o, (set, frozenset)):
return tuple(sorted(make_hashable(e) for e in o))

if callable(
getattr(o, "make_hashable", None)
): # checks if the object o has the method make_hashable
return o.make_hashable()

return o


def hash_function_arguments(args, kwags):
"""Combines the hashes of the args and kargs
The way to go to do h(x,y) = hash(hash(x) || hash(y)) because it avoid the following:
1) h(x,y) = hash(x || y), might create a collision if we delete the last n characters of x and we append them in front of y. e.g h("abc", "d") = h("ab", "cd")
2) we don't want to sort x and y, because we need the function h(x,y) != h(y,x)
3) extra hashing just avoid that we can't decompose the input arguments and track if the same args or kwarg are called several times. More of a security measure I think.
source: https://crypto.stackexchange.com/questions/55162/best-way-to-hash-two-values-into-one
"""
return make_hash_sha256(
make_hash_sha256(args) + make_hash_sha256(kwags)
) # concat two hashes


def job_cache(queue: str):
"""
To avoid recomputing the same task multiple times, this decorator checks if the function has already been called with the
same arguments. Input arguments are hashed and stored as Redis keys with the values being the job IDs `input_arguments_hash:job_id`).
The benefits of using redis to store the input arguments over a local cache, such as LRU Cache, are:
1) It will work in distributed environments (in computing clusters), where multiple workers will avoid repeating
work as the cache will be shared across them.
2) Cached calls are logged, which means that we can easily debug.
3) Cache will still be there on restarts.
Arguments
:param queue: name of the queue
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Get the redis connection
connection = current_app.redis_connection

requeue = kwargs.pop("requeue", False)

# checking if force is an input argument of `func`
force_new_job_creation = kwargs.pop("force_new_job_creation", False)

# creating a hash from args and kwargs
args_hash = (
f"{queue}:{func.__name__}:{hash_function_arguments(args, kwargs)}"
)

# check the redis connection for whether the key hash exists
if connection.exists(args_hash) and not force_new_job_creation:
current_app.logger.info(
f"The function {func.__name__} has been called already with the same arguments. Skipping..."
)

# get job id
job_id = connection.get(args_hash).decode()

# check if the job exists and, if it doesn't, skip fetching and generate new job
if Job.exists(job_id, connection=connection):
job = Job.fetch(
job_id, connection=connection
) # get job object from job id

# requeue if failed and requeue flag is true
if job.is_failed and requeue:
job.requeue()

return job # returning the same job regardless of the status (SUCCESS, FAILED, ...)

# if the job description is new -> create job
job = func(*args, **kwargs) # create a new job

# store function call in redis by mapping the hash of the function arguments to its job id
connection.set(
args_hash, job.id, ex=current_app.config["FLEXMEASURES_JOB_CACHE_TTL"]
)

return job

return wrapper

return decorator
1 change: 0 additions & 1 deletion flexmeasures/data/tests/test_scheduling_jobs.py
@@ -1,4 +1,3 @@
# flake8: noqa: E402
from datetime import datetime, timedelta
import os

Expand Down

0 comments on commit fd62ed3

Please sign in to comment.