Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker compose redis worker #455

Merged
merged 18 commits into from Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion documentation/changelog.rst
Expand Up @@ -8,7 +8,8 @@ v0.11.0 | June XX, 2022
New features
-------------
* Individual sensor charts show available annotations [see `PR #428 <http://www.github.com/FlexMeasures/flexmeasures/pull/428>`_]
* Collapsible sidepanel (hover/swipe) used for date selection on sensor charts, and various styling improvements [see `PR #447 <http://www.github.com/FlexMeasures/flexmeasures/pull/447>`_]
* Collapsible side-panel (hover/swipe) used for date selection on sensor charts, and various styling improvements [see `PR #447 <http://www.github.com/FlexMeasures/flexmeasures/pull/447>`_]
* Add CLI command ``flexmeasures jobs show-queues`` [see `PR #455 <http://www.github.com/FlexMeasures/flexmeasures/pull/455>`_]

Bugfixes
-----------
Expand All @@ -17,6 +18,7 @@ Infrastructure / Support
----------------------
* Docker compose stack now with Redis worker queue [see `PR #455 <http://www.github.com/FlexMeasures/flexmeasures/pull/455>`_]
* Allow access tokens to be passed as env vars as well [see `PR #443 <http://www.github.com/FlexMeasures/flexmeasures/pull/443>`_]
* Queue workers can get initialised without a custom name and name collisions are handled [see `PR #455 <http://www.github.com/FlexMeasures/flexmeasures/pull/455>`_]

v0.10.1 | June XX, 2022
===========================
Expand Down
1 change: 1 addition & 0 deletions documentation/cli/commands.rst
Expand Up @@ -86,6 +86,7 @@ of which some are referred to in this documentation.

================================================= =======================================
``flexmeasures jobs run-worker`` Start a worker process for forecasting and/or scheduling jobs.
``flexmeasures jobs show queues`` List job queues.
``flexmeasures jobs clear-queue`` Clear a job queue.
================================================= =======================================

Expand Down
29 changes: 25 additions & 4 deletions flexmeasures/cli/jobs.py
Expand Up @@ -74,13 +74,30 @@ def run_worker(queue: str, name: Optional[str]):
worker.work()


@fm_jobs.command("show-queues")
@with_appcontext
def show_queues():
"""
Show the job queues and their job counts (including "failed" queue).

To inspect contents, go to the RQ-Dashboard at <flexmeasures-URL>/tasks
We use the app context to find out which redis queues to use.
"""

configure_mappers()
for q in list(app.queues.values()) + [
Queue(connection=app.queues["forecasting"].connection, name="failed")
]:
click.echo(f"Queue {q.name} has {q.count} jobs.")


@fm_jobs.command("clear-queue")
@with_appcontext
@click.option(
"--queue",
default=None,
required=True,
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'.",
help="State which queue(s) to clear (using '|' as separator), e.g. 'forecasting', 'scheduling' or 'forecasting|scheduling'. 'failed' is also supported.",
)
def clear_queue(queue: str):
"""
Expand All @@ -89,7 +106,7 @@ def clear_queue(queue: str):
We use the app context to find out which redis queues to use.
"""

q_list = parse_queue_list(queue)
q_list = parse_queue_list(queue, allow_failed=True)
configure_mappers()
for q in q_list:
count_before = q.count
Expand All @@ -100,7 +117,7 @@ def clear_queue(queue: str):
)


def parse_queue_list(queue_names_str: str) -> List[Queue]:
def parse_queue_list(queue_names_str: str, allow_failed: bool = False) -> List[Queue]:
"""Parse a | separated string of queue names against the app.queues dict.

The app.queues dict is expected to have queue names as keys, and rq.Queue objects as values.
Expand All @@ -110,7 +127,11 @@ def parse_queue_list(queue_names_str: str) -> List[Queue]:
"""
q_list = []
for q_name in queue_names_str.split("|"):
if q_name in app.queues:
if allow_failed and q_name == "failed":
q_list.append(
Queue(connection=app.queues["forecasting"].connection, name="failed")
nhoening marked this conversation as resolved.
Show resolved Hide resolved
)
elif q_name in app.queues:
q_list.append(app.queues[q_name])
else:
raise ValueError(f"Unknown queue '{q_name}'.")
Expand Down