Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#574 Avoid Recomputing Schedules #616

Merged
merged 39 commits into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
21d7b3b
Issue #575
victorgarcia98 Mar 27, 2023
c715030
Fixing typos and changing hashing function.
victorgarcia98 Mar 28, 2023
d1d2f5d
Updating hash function: now classes with the method make_hashable can…
victorgarcia98 Mar 28, 2023
fcc3345
The decorator allows creating new jobs in case that the previous call…
victorgarcia98 Mar 28, 2023
d24ef38
Importing annotations from __future__ for typing of functions arguments.
victorgarcia98 Mar 28, 2023
c39d96e
Add more tests to handle ordering in dictionaries and in arrays.
victorgarcia98 Mar 28, 2023
61b8d40
Add test to check if serialized dictionaries (json strings) order yie…
victorgarcia98 Mar 28, 2023
8c28fe0
Adding PR to changelog.rst
victorgarcia98 Mar 28, 2023
f80bb9d
Adding force_new_job_creation into @redis_cache decorator to force th…
victorgarcia98 Mar 30, 2023
fbc350d
Simplyfying access to optional keyword argument.
victorgarcia98 Mar 30, 2023
495a03b
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 30, 2023
7c1271e
Main changes:
victorgarcia98 Mar 31, 2023
688b53e
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
d5c6340
Fixing some typos.
victorgarcia98 Mar 31, 2023
8429438
Merge remote-tracking branch 'origin/574-avoid-recomputing-schedules'…
victorgarcia98 Mar 31, 2023
d4c59a0
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Mar 31, 2023
eddfa59
Getting generic_asset attributes right
victorgarcia98 Apr 4, 2023
9a75316
Fixing comments grammar and adding type to the @job_cache decorator.
victorgarcia98 Apr 4, 2023
7dd0df3
Adding a better changelog entry for this PR.
victorgarcia98 Apr 4, 2023
31f6228
Implementation + Tests of the requeueing feature.
victorgarcia98 Apr 5, 2023
371df95
Adding clarifications and argument description in docstrings.
victorgarcia98 Apr 6, 2023
33800a1
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 6, 2023
7f471f0
Clarify arguments in docstring
Flix6x Apr 8, 2023
cce93db
Decorator docstring should list decorator arguments, rather than argu…
Flix6x Apr 8, 2023
b78c9f9
Refactor: simpler lines of code
Flix6x Apr 8, 2023
7d41613
Remove redundant lines
Flix6x Apr 8, 2023
e1e1e88
Make the @job_cache decorator agnostic to whichever queue is passed
Flix6x Apr 8, 2023
1f4b624
Clarification: "requeue" instead of "re-enqueue", because "enqueuing"…
Flix6x Apr 8, 2023
aee155b
Add missing argument and explain how function arguments are used
Flix6x Apr 8, 2023
6acd9cf
Hashes are not stored under queues
Flix6x Apr 8, 2023
b6c60a6
Remove redundant queue (name) argument from @job_cache
Flix6x Apr 8, 2023
478fe5c
Adding TTL to job caching keys configured through the config variable…
victorgarcia98 Apr 10, 2023
86a865c
Adding function and queue names to the hash of the Job creation call.
victorgarcia98 Apr 10, 2023
e07b302
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 11, 2023
5a6fe4d
Adding FLEXMEASURES_JOB_CACHE_TTL into the config settings docs.
victorgarcia98 Apr 11, 2023
0a316ba
Merge remote-tracking branch 'origin/main' into 574-avoid-recomputing…
victorgarcia98 Apr 12, 2023
570fabf
Merge branch 'main' into 574-avoid-recomputing-schedules
victorgarcia98 Apr 13, 2023
a694443
PR: Avoid redundantly recomputing jobs that are triggered without a r…
victorgarcia98 Apr 13, 2023
bec16e2
Revert "PR: Avoid redundantly recomputing jobs that are triggered wit…
victorgarcia98 Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions flexmeasures/data/services/scheduling.py
Expand Up @@ -19,10 +19,10 @@
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.utils import get_data_source, save_to_db
from flexmeasures.utils.time_utils import server_now
from flexmeasures.data.services.utils import redis_cache
from flexmeasures.data.services.utils import job_cache


@redis_cache("scheduling")
@job_cache("scheduling")
def create_scheduling_job(
sensor: Sensor,
job_id: str | None = None,
Expand Down
14 changes: 10 additions & 4 deletions flexmeasures/data/services/utils.py
Expand Up @@ -55,10 +55,16 @@ def hash_function_arguments(args, kwags):
) # concat two hashes


def redis_cache(queue):
def job_cache(queue):
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
"""
Decorator that checks if a function has already been called with the same arguments and
fetches the job using the job id, which is stored in Redis.
To avoid recomputing the same task multiple times, this decortator checks if the function was already been called with the
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved
same arguments. Input arguments are hashed and stored as Redis keys with values the job ids (`input_arguments_hash:job_id`).
victorgarcia98 marked this conversation as resolved.
Show resolved Hide resolved

The benefits of using redis to store the input arguments over a local cache, such as LRU Cache, are:
1) It will work in distributed environments (in computing clusters), where multiple workers will avoid repeating
work as the cache will be shared across them.
2) Cached calls are logged, which means that we can easily debug.
3) Cache will still be there on restarts.
"""

def decorator(func):
Expand All @@ -77,7 +83,7 @@ def wrapper(*args, **kwargs):
and not force_new_job_creation
):
current_app.logger.info(
f"The function {func.__name__} has been called alread with the same arguments. Skipping..."
f"The function {func.__name__} has been called already with the same arguments. Skipping..."
)

# get job id
Expand Down
79 changes: 28 additions & 51 deletions flexmeasures/data/tests/test_scheduling_repeated_jobs.py
Expand Up @@ -12,7 +12,7 @@
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.utils import hash_function_arguments, redis_cache
from flexmeasures.data.services.utils import hash_function_arguments, job_cache


@pytest.mark.parametrize(
Expand Down Expand Up @@ -178,7 +178,6 @@ def test_scheduling_multiple_triggers(
logging.INFO
) # setting the logging level of the log capture fixture

soc_at_start = 1
target_soc = 5
duration_until_target = timedelta(hours=2)

Expand All @@ -199,57 +198,35 @@ def test_scheduling_multiple_triggers(
is None
) # Make sure the scheduler data source isn't there

# schedule 1 job
job1 = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

# clear logs
caplog.clear()

# Schedule same job
job2 = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

# checking that the decorator is detecting that the job is repeated
assert (
sum(
[
"The function create_scheduling_job has been called alread with the same arguments. Skipping..."
in rec.message
for rec in caplog.records
]
jobs = []

# create jobs
for soc_start in [1, 1, 3]:
job = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_start, "soc-targets": soc_targets},
enqueue=False,
)
== 1
)

# checking that they have the same job id
assert job1.id == job2.id
# enqueue & run job
app.queues["scheduling"].enqueue_job(job)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)

# checking that a different schedule trigger is actually computed when a nested field is changed
soc_at_start = 2
job3 = create_scheduling_job(
sensor=charging_station,
start=start,
end=end,
belief_time=start,
resolution=resolution,
flex_model={"soc-at-start": soc_at_start, "soc-targets": soc_targets},
)
work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
jobs.append(job)

job1, job2, job3 = jobs

# checking that jobs 1 & 2 they have the same job id
assert job1.id == job2.id

# checking that job3 has different id
assert job3.id != job1.id


Expand All @@ -259,14 +236,14 @@ def test_allow_trigger_failed_jobs(
def failing_function(kwarg1, kwarg2):
raise Exception()

@redis_cache("scheduling")
@job_cache("scheduling")
def create_failing_job(
arg1: int,
kwarg1: int | None = None,
kwarg2: int | None = None,
) -> Job:
"""
This function creates and enques a failing job.
This function creates and enqueues a failing job.
"""

job = Job.create(
Expand Down Expand Up @@ -294,15 +271,15 @@ def test_force_new_job_creation(db, app, add_charging_station_assets, setup_test
def successful_function(kwarg1, kwarg2):
pass

@redis_cache("scheduling")
@job_cache("scheduling")
def create_successful_job(
arg1: int,
kwarg1: int | None = None,
kwarg2: int | None = None,
force_new_job_creation=False,
) -> Job:
"""
This function creates and enques a successful job.
This function creates and enqueues a successful job.
"""

job = Job.create(
Expand Down