Skip to content

Commit

Permalink
Feature/cli/clear other job registries (#1037)
Browse files Browse the repository at this point in the history
* feature: tabulate queue info

Signed-off-by: F.N. Claessen <felix@seita.nl>

* feature: support clearing the deferred and scheduled registries in a given queue

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Changelog entry for #1037 (#1048)

---------

Signed-off-by: F.N. Claessen <felix@seita.nl>
Co-authored-by: Nicolas Höning <nicolas@seita.nl>
  • Loading branch information
Flix6x and nhoening committed Apr 27, 2024
1 parent 031d6d6 commit 8d3c778
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -19,7 +19,7 @@ Infrastructure / Support
----------------------

* Include started, deferred and scheduled jobs in the overview printed by the CLI command ``flexmeasures jobs show-queues`` [see `PR #1036 <https://github.com/FlexMeasures/flexmeasures/pull/1036/>`_]

* Make it as convenient to clear deferred or scheduled jobs from a queue as it was to clear failed jobs from a queue [see `PR #1037 <https://github.com/FlexMeasures/flexmeasures/pull/1037/>`_]

v0.20.1 | April XX, 2024
============================
Expand Down
67 changes: 45 additions & 22 deletions flexmeasures/cli/jobs.py
Expand Up @@ -124,34 +124,53 @@ def show_queues():
"--queue",
default=None,
required=True,
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'. 'failed' is also supported.",
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
)
@click.option(
"--deferred",
is_flag=True,
default=False,
help="If True, the deferred registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
@click.option(
"--scheduled",
is_flag=True,
default=False,
help="If True, the scheduled registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
@click.option(
"--failed",
is_flag=True,
default=False,
help="If True, the failed registry of the queue(s) will be cleared (and not the jobs to be done).",
help="If True, the failed registry of the queue(s) will be cleared (and not the jobs currently in queue to be done).",
)
def clear_queue(queue: str, failed: bool):
def clear_queue(queue: str, deferred: bool, scheduled: bool, failed: bool):
"""
Clear a job queue (or its registry of failed jobs).
Clear a job queue (or its registry of deferred/scheduled/failed jobs).
We use the app context to find out which redis queues to use.
"""
q_list = parse_queue_list(queue)
registries = dict(
deferred=("deferred_job_registry", deferred),
scheduled=("scheduled_job_registry", scheduled),
failed=("failed_job_registry", failed),
)
configure_mappers()
for the_queue in q_list:
if failed:
reg = the_queue.failed_job_registry
count_before = reg.count
for job_id in reg.get_job_ids():
reg.remove(job_id) # not actually deleting the job
count_after = reg.count
click.secho(
f"Cleared {count_before - count_after} failed jobs from the registry at {the_queue}.",
**MsgStyle.WARN,
)
else:
for _type, (registry, needs_clearing) in registries.items():
if needs_clearing:
reg = getattr(the_queue, registry)
count_before = reg.count
for job_id in reg.get_job_ids():
reg.remove(job_id) # not actually deleting the job
count_after = reg.count
click.secho(
f"Cleared {count_before - count_after} {_type} jobs from the {registry} at {the_queue}.",
**MsgStyle.WARN,
)
wrap_up_message(count_after)
if not any([deferred, scheduled, failed]):
count_before = the_queue.count
if count_before > 0:
the_queue.empty()
Expand All @@ -160,13 +179,17 @@ def clear_queue(queue: str, failed: bool):
f"Cleared {count_before - count_after} jobs from {the_queue}.",
**MsgStyle.SUCCESS,
)
if count_after > 0:
click.secho(
f"There are {count_after} jobs which could not be removed for some reason.",
**MsgStyle.WARN,
)
else:
click.echo("No jobs left.")
wrap_up_message(count_after)


def wrap_up_message(count_after: int):
if count_after > 0:
click.secho(
f"There are {count_after} jobs which could not be removed for some reason.",
**MsgStyle.WARN,
)
else:
click.echo("No jobs left.")


def handle_worker_exception(job, exc_type, exc_value, traceback):
Expand Down

0 comments on commit 8d3c778

Please sign in to comment.