Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#574 Avoid Recomputing Schedules #616

Merged
merged 39 commits into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
21d7b3b
Issue #575
victorgarcia98 Mar 27, 2023
c715030
Fixing typos and changing hashing function.
victorgarcia98 Mar 28, 2023
d1d2f5d
Updating hash function: now classes with the method make_hashable can…
victorgarcia98 Mar 28, 2023
fcc3345
The decorator allows creating new jobs in case that the previous call…
victorgarcia98 Mar 28, 2023
d24ef38
Importing annotations from __future__ for typing of functions arguments.
victorgarcia98 Mar 28, 2023
c39d96e
Add more tests to handle ordering in dictionaries and in arrays.
victorgarcia98 Mar 28, 2023
61b8d40
Add test to check if serialized dictionaries (json strings) order yie…
victorgarcia98 Mar 28, 2023
8c28fe0
Adding PR to changelog.rst
victorgarcia98 Mar 28, 2023
f80bb9d
Adding force_new_job_creation into @redis_cache decorator to force th…
victorgarcia98 Mar 30, 2023
fbc350d
Simplyfying access to optional keyword argument.
victorgarcia98 Mar 30, 2023
495a03b
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 30, 2023
7c1271e
Main changes:
victorgarcia98 Mar 31, 2023
688b53e
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
d5c6340
Fixing some typos.
victorgarcia98 Mar 31, 2023
8429438
Merge remote-tracking branch 'origin/574-avoid-recomputing-schedules'…
victorgarcia98 Mar 31, 2023
d4c59a0
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
eddfa59
Getting generic_asset attributes right
victorgarcia98 Apr 4, 2023
9a75316
Fixing comments grammar and adding type to the @job_cache decorator.
victorgarcia98 Apr 4, 2023
7dd0df3
Adding a better changelog entry for this PR.
victorgarcia98 Apr 4, 2023
31f6228
Implementation + Tests of the requeueing feature.
victorgarcia98 Apr 5, 2023
371df95
Adding clarifications and argument description in docstrings.
victorgarcia98 Apr 6, 2023
33800a1
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 6, 2023
7f471f0
Clarify arguments in docstring
Flix6x Apr 8, 2023
cce93db
Decorator docstring should list decorator arguments, rather than argu…
Flix6x Apr 8, 2023
b78c9f9
Refactor: simpler lines of code
Flix6x Apr 8, 2023
7d41613
Remove redundant lines
Flix6x Apr 8, 2023
e1e1e88
Make the @job_cache decorator agnostic to whichever queue is passed
Flix6x Apr 8, 2023
1f4b624
Clarification: "requeue" instead of "re-enqueue", because "enqueuing"…
Flix6x Apr 8, 2023
aee155b
Add missing argument and explain how function arguments are used
Flix6x Apr 8, 2023
6acd9cf
Hashes are not stored under queues
Flix6x Apr 8, 2023
b6c60a6
Remove redundant queue (name) argument from @job_cache
Flix6x Apr 8, 2023
478fe5c
Adding TTL to job caching keys configured through the config variable…
victorgarcia98 Apr 10, 2023
86a865c
Adding function and queue names to the hash of the Job creation call.
victorgarcia98 Apr 10, 2023
e07b302
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 11, 2023
5a6fe4d
Adding FLEXMEASURES_JOB_CACHE_TTL into the config settings docs.
victorgarcia98 Apr 11, 2023
0a316ba
Merge remote-tracking branch 'origin/main' into 574-avoid-recomputing…
victorgarcia98 Apr 12, 2023
570fabf
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 13, 2023
a694443
PR: Avoid redundantly recomputing jobs that are triggered without a r…
victorgarcia98 Apr 13, 2023
bec16e2
Revert "PR: Avoid redundantly recomputing jobs that are triggered wit…
victorgarcia98 Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions flexmeasures/data/services/scheduling.py
Expand Up @@ -19,8 +19,10 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.utils import get_data_source, save_to_db
from flexmeasures.utils.time_utils import server_now
from flexmeasures.data.services.utils import redis_cache


@redis_cache("scheduling")
def create_scheduling_job(
sensor: Sensor,
job_id: str | None = None,
Expand Down
90 changes: 90 additions & 0 deletions flexmeasures/data/services/utils.py
@@ -0,0 +1,90 @@
import functools
from flask import current_app
from rq.job import Job

import hashlib
import base64

from flexmeasures.data.models.time_series import Sensor


def make_hash_sha256(o):
"""
SHA256 instead of Python's hash function because apparently, python native hashing function
yields different results on restarts.
Source: https://stackoverflow.com/a/42151923
"""
hasher = hashlib.sha256()
hasher.update(repr(make_hashable(o)).encode())
return base64.b64encode(hasher.digest()).decode()


def make_hashable(o):
"""
Function to create hashes for dictionaries with nested objects
Source: https://stackoverflow.com/a/42151923
"""
if isinstance(o, (tuple, list)):
return tuple((make_hashable(e) for e in o))

if isinstance(o, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in o.items()))

if isinstance(o, (set, frozenset)):
return tuple(sorted(make_hashable(e) for e in o))

if isinstance(o, Sensor):
return tuple(
(
make_hashable(getattr(o, attr, None))
for attr in ["attributes", "annotations", "id"]
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
)
)

return o


def hash_function_arguments(args, kwags):
return make_hash_sha256(args) + make_hash_sha256(kwags) # concat two hashes
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved


def redis_cache(queue):
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
"""
Decorator that checks if a function has alread been called with the same arguments and
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
fetches the job using the job id, which is stored in Redis.
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Creating a hash from args and kwargs
args_hash = hash_function_arguments(args, kwargs)

# Checking if there exist the key hash in the redis queue
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
if current_app.queues[queue].connection.exists(args_hash):
current_app.logger.info(
f"The function {func.__name__} has been called alread with the same arguments. Skipping..."
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
)

# get job id
job_id = current_app.queues[queue].connection.get(args_hash).decode()

# get job object from job id
return Job.fetch(
job_id, connection=current_app.queues[queue].connection
)
else: # new call

# call function
job = func(*args, **kwargs)

# store function call in redis
current_app.queues[queue].connection.set(
args_hash, job.id
) # setting return value of function call to the hash of its inputs
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

return job # todo: try catch, if it fails, don't hash it or impose a max retry number
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

return wrapper

return decorator
215 changes: 215 additions & 0 deletions flexmeasures/data/tests/test_scheduling_repeated_jobs.py
@@ -0,0 +1,215 @@
# flake8: noqa: E402
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
from datetime import datetime, timedelta
import os
import copy
import logging

import pytz
import pytest
from rq.job import Job

from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.services.scheduling import (
create_scheduling_job,
load_custom_scheduler,
)
from flexmeasures.data.services.utils import hash_function_arguments


@pytest.mark.parametrize(
"args_modified,kwargs_modified,equal",
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
[
(
[1, 2, "1"],
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
{
"key1": "value1",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": 1, "key2_nested": 2},
},
True,
),
(
[1, 2, 1],
{
"key1": "value1",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": 1, "key2_nested": 2},
},
False,
),
(
[1, 2, "1"],
{
"key1": "different",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": 1, "key2_nested": 2},
},
False,
),
(
[1, 2, "1"],
{
"different": "value1",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": 1, "key2_nested": 2},
},
False,
),
(
[1, 2, "1"],
{
"key1": "value1",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": "different", "key2_nested": 2},
},
False,
),
],
)
def test_hashing_simple(args_modified: list, kwargs_modified: dict, equal: bool):
args = [1, 2, "1"]
kwargs = {
"key1": "value1",
"key2": "value2",
"key3": 3,
"key4": {"key1_nested": 1, "key2_nested": 2},
}

hash_original = hash_function_arguments(args, kwargs)
hash_modified = hash_function_arguments(args_modified, kwargs_modified)

if equal:
assert hash_original == hash_modified
else:
assert hash_original != hash_modified


def test_hashing(db, app, add_charging_station_assets, setup_test_data):
soc_at_start = 1
target_soc = 5
duration_until_target = timedelta(hours=2)

charging_station = Sensor.query.filter(
Sensor.name == "Test charging station"
).one_or_none()
tz = pytz.timezone("Europe/Amsterdam")
start = tz.localize(datetime(2015, 1, 2))
end = tz.localize(datetime(2015, 1, 3))
target_datetime = start + duration_until_target
resolution = timedelta(minutes=15)
soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]

kwargs = dict(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
args = []

hash = hash_function_arguments(args, kwargs)

# checks that hashes are consistent between calls
assert (
hash
== "LjjneyLDFKRJ6R+v7ZKkOCasaqQDrmqKy2z1gjn7r10=Q3FfKNrQYBgY/qWrw+lEjVEI/GWm9sThyBbOd+sGIKk="
)

# checks that different arguments yield different hashes
kwargs2 = copy.deepcopy(kwargs)
kwargs2["resolution"] = timedelta(minutes=12)

hash2 = hash_function_arguments(args, kwargs2)

assert hash != hash2


def test_scheduling_multiple_triggers(
caplog, db, app, add_charging_station_assets, setup_test_data
):
caplog.set_level(
logging.INFO
) # setting the logging level of the log capture fixture
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

soc_at_start = 1
target_soc = 5
duration_until_target = timedelta(hours=2)

charging_station = Sensor.query.filter(
Sensor.name == "Test charging station"
).one_or_none()
tz = pytz.timezone("Europe/Amsterdam")
start = tz.localize(datetime(2015, 1, 2))
end = tz.localize(datetime(2015, 1, 3))
target_datetime = start + duration_until_target
resolution = timedelta(minutes=15)
soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]

assert (
DataSource.query.filter_by(name="FlexMeasures", type="scheduling script")
.where()
.one_or_none()
is None
) # Make sure the scheduler data source isn't there

# schedule 1 job
job1 = create_scheduling_job(
sensor=charging_station,
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

caplog.clear()

# Schedule same job
job2 = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

# checking that the decorator is detecting that the job is repeated
assert (
sum(
[
"The function create_scheduling_job has been called alread with the same arguments. Skipping..."
in rec.message
for rec in caplog.records
]
)
== 1
)

# checking that they have the same job id
assert job1.id == job2.id

# checking that a different schedule trigger is actually computed
soc_at_start = 2
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
job3 = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

assert job3.id != job1.id