Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker compose redis worker #455

Merged
merged 18 commits into from Jul 15, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 43 additions & 28 deletions flexmeasures/cli/jobs.py
Expand Up @@ -18,18 +18,18 @@ def fm_jobs():

@fm_jobs.command("run-worker")
@with_appcontext
@click.option(
"--name",
default=None,
required=False,
help="Give your worker a recognizable name. Defaults to random string. Defaults to fm-worker-<randomstring>",
)
@click.option(
"--queue",
default=None,
required=True,
help="State which queue(s) to work on (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
)
@click.option(
"--name",
default=None,
required=False,
help="Give your worker a recognizable name. Defaults to random string. Defaults to fm-worker-<randomstring>",
)
def run_worker(queue: str, name: Optional[str]):
"""
Start a worker process for forecasting and/or scheduling jobs.
Expand Down Expand Up @@ -78,17 +78,17 @@ def run_worker(queue: str, name: Optional[str]):
@with_appcontext
def show_queues():
"""
Show the job queues and their job counts (including "failed" queue).
Show the job queues and their job counts (including the "failed" registry).

To inspect contents, go to the RQ-Dashboard at <flexmeasures-URL>/tasks
We use the app context to find out which redis queues to use.
"""

configure_mappers()
for q in list(app.queues.values()) + [
Queue(connection=app.queues["forecasting"].connection, name="failed")
]:
click.echo(f"Queue {q.name} has {q.count} jobs.")
for q in app.queues.values():
click.echo(
f"Queue {q.name} has {q.count} jobs (and {q.failed_job_registry.count} jobs have failed)."
)


@fm_jobs.command("clear-queue")
Expand All @@ -99,25 +99,44 @@ def show_queues():
required=True,
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'. 'failed' is also supported.",
)
def clear_queue(queue: str):
@click.option(
"--failed",
is_flag=True,
default=False,
help="If True, the failed registry of the queue(s) will be cleared (and not the jobs to be done).",
)
def clear_queue(queue: str, failed: bool):
"""
Clear a job queue.
Clear a job queue (or its registry of failed jobs).

We use the app context to find out which redis queues to use.
"""

q_list = parse_queue_list(queue, allow_failed=True)
q_list = parse_queue_list(queue)
configure_mappers()
for q in q_list:
count_before = q.count
q.empty()
count_after = q.count
click.echo(
f"Cleared {count_before - count_after} jobs from {q}. Queue now contains {count_after} jobs."
)
for the_queue in q_list:
if failed:
reg = the_queue.failed_job_registry
count_before = reg.count
for job_id in reg.get_job_ids():
reg.remove(job_id) # not actually deleting the job
count_after = reg.count
click.echo(
f"Cleared all {count_before - count_after} failed jobs from the registry at {the_queue}."
)
else:
count_before = the_queue.count
the_queue.empty()
nhoening marked this conversation as resolved.
Show resolved Hide resolved
count_after = the_queue.count
click.echo(
f"Cleared all {count_before - count_after} jobs from {the_queue}."
nhoening marked this conversation as resolved.
Show resolved Hide resolved
)
if count_after > 0:
click.echo(
f"There are {count_after} jobs which could not be removed for some reason."
)


def parse_queue_list(queue_names_str: str, allow_failed: bool = False) -> List[Queue]:
def parse_queue_list(queue_names_str: str) -> List[Queue]:
"""Parse a | separated string of queue names against the app.queues dict.

The app.queues dict is expected to have queue names as keys, and rq.Queue objects as values.
Expand All @@ -127,11 +146,7 @@ def parse_queue_list(queue_names_str: str, allow_failed: bool = False) -> List[Q
"""
q_list = []
for q_name in queue_names_str.split("|"):
if allow_failed and q_name == "failed":
q_list.append(
Queue(connection=app.queues["forecasting"].connection, name="failed")
)
elif q_name in app.queues:
if q_name in app.queues:
q_list.append(app.queues[q_name])
else:
raise ValueError(f"Unknown queue '{q_name}'.")
Expand Down