Skip to content

Commit

Permalink
Scheduling jobs on status page (#1035)
Browse files Browse the repository at this point in the history
* add job cache by sensor id in create forecasting jobs

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* fixes after review + split cache key by queue type and sensor/asset + use job_cache for scheduling jobs

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Return job data on GET assets/id/status + add tables for forecasting and scheduling jobs on status page

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Styling + annotations

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Fix for job/job.id url on status page

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Better redis connection exception handling

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Back to asset page button

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Review fixes + 'Created at' column to job tables + sort by this column

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Changelog

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Post review fixes

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Fix after spoiled main merge + fix annotations

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* Remove redundant file

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>

* add exact enqueue time for users hovering, fix hovering info for job status

Signed-off-by: Nicolas Höning <nicolas@seita.nl>

---------

Signed-off-by: Nikolai Rozanov <nickolay.rozanov@gmail.com>
Signed-off-by: Nicolas Höning <nicolas@seita.nl>
Co-authored-by: Nicolas Höning <nicolas@seita.nl>
  • Loading branch information
nrozanov and nhoening committed Apr 19, 2024
1 parent 62a8cb0 commit 031d6d6
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 8 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -9,7 +9,7 @@ v0.21.0 | April XX, 2024
New features
-------------

* Add `asset/<id>/status` page to view asset statuses [see `PR #41 <https://github.com/FlexMeasures/flexmeasures/pull/941/>`_]
* Add `asset/<id>/status` page to view asset statuses [see `PR #41 <https://github.com/FlexMeasures/flexmeasures/pull/941/>`_ and `PR #1035 <https://github.com/FlexMeasures/flexmeasures/pull/1035/>`_]
* Support `start_date` and `end_date` query parameters for the asset page [see `PR #1030 <https://github.com/FlexMeasures/flexmeasures/pull/1030/>`_]

Bugfixes
Expand Down
50 changes: 49 additions & 1 deletion flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py
@@ -1,5 +1,6 @@
from datetime import timedelta
from datetime import timedelta, datetime
import pytest
import pytz

from marshmallow import ValidationError
import pandas as pd
Expand All @@ -9,12 +10,16 @@
PostSensorDataSchema,
GetSensorDataSchema,
)
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.sensors import (
get_staleness,
get_status,
build_sensor_status_data,
build_asset_jobs_data,
)
from flexmeasures.data.schemas.reporting import StatusSchema
from flexmeasures.utils.time_utils import as_server_time


@pytest.mark.parametrize(
Expand Down Expand Up @@ -268,3 +273,46 @@ def test_build_asset_status_data(mock_get_status, add_weather_sensors):
"asset_name": asset.name,
},
]


def custom_model_params():
"""little training as we have little data, turn off transformations until they let this test run (TODO)"""
return dict(
training_and_testing_period=timedelta(hours=2),
outcome_var_transformation=None,
regressor_transformation={},
)


def test_build_asset_jobs_data(db, app, add_battery_assets):
# """Test we get both types of jobs for a battery asset."""
battery_asset = add_battery_assets["Test battery"]
battery = battery_asset.sensors[0]
tz = pytz.timezone("Europe/Amsterdam")
start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3))

scheduling_job = create_scheduling_job(
asset_or_sensor=battery,
start=start,
end=end,
belief_time=start,
resolution=timedelta(minutes=15),
)
forecasting_jobs = create_forecasting_jobs(
start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
horizons=[timedelta(hours=1)],
sensor_id=battery.id,
custom_model_params=custom_model_params(),
)

jobs_data = build_asset_jobs_data(battery_asset)
assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"]
for job_data in jobs_data:
if job_data["queue"] == "forecasting":
assert job_data["job_id"] == forecasting_jobs[0].id
else:
assert job_data["job_id"] == scheduling_job.id
assert job_data["status"] == "queued"
assert job_data["asset_or_sensor_type"] == "sensor"
assert job_data["asset_id"] == battery.id
3 changes: 3 additions & 0 deletions flexmeasures/app.py
Expand Up @@ -20,6 +20,8 @@
from redis import Redis
from rq import Queue

from flexmeasures.data.services.job_cache import JobCache


def create( # noqa C901
env: str | None = None,
Expand Down Expand Up @@ -100,6 +102,7 @@ def create( # noqa C901
# labelling=Queue(connection=redis_conn, name="labelling"),
# alerting=Queue(connection=redis_conn, name="alerting"),
)
app.job_cache = JobCache(app.redis_connection)

# Some basic security measures

Expand Down
1 change: 1 addition & 0 deletions flexmeasures/conftest.py
Expand Up @@ -1095,6 +1095,7 @@ def clean_redis(app):
app.queues["forecasting"].empty()
for job_id in failed.get_job_ids():
failed.remove(app.queues["forecasting"].fetch_job(job_id))
app.redis_connection.flushdb()


@pytest.fixture(scope="session", autouse=True)
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/data/services/forecasting.py
Expand Up @@ -119,6 +119,9 @@ def create_forecasting_jobs(
jobs.append(job)
if enqueue:
current_app.queues["forecasting"].enqueue_job(job)
current_app.job_cache.add(
sensor_id, job.id, queue="forecasting", asset_or_sensor_type="sensor"
)
return jobs


Expand Down
78 changes: 78 additions & 0 deletions flexmeasures/data/services/job_cache.py
@@ -0,0 +1,78 @@
"""
Logic around storing and retrieving jobs from redis cache.
"""

from __future__ import annotations

import redis

from redis.exceptions import ConnectionError
from rq.job import Job, NoSuchJobError


class NoRedisConfigured(Exception):
def __init__(self, message="Redis not configured"):
super().__init__(message)


class JobCache:
"""
Class is used for storing jobs and retrieving them from redis cache.
Need it to be able to get jobs for particular asset (and display them on status page).
Keeps cache up to date by removing jobs that are not found in redis - were removed by TTL.
Stores jobs by asset or sensor id, queue and asset or sensor type, cache key can look like this
- forecasting:sensor:1 (forecasting jobs can be stored by sensor only)
- scheduling:sensor:2
- scheduling:asset:3
"""

def __init__(self, connection: redis.Redis):
self.connection = connection

def _get_cache_key(
self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
) -> str:
return f"{queue}:{asset_or_sensor_type}:{asset_or_sensor_id}"

def _check_redis_connection(self):
try:
self.connection.ping() # Check if the Redis connection is okay
except (ConnectionError, ConnectionRefusedError):
raise NoRedisConfigured

def add(
self,
asset_or_sensor_id: int,
job_id: str,
queue: str = None,
asset_or_sensor_type: str = None,
):
self._check_redis_connection()
cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
self.connection.sadd(cache_key, job_id)

def _get_job(self, job_id: str) -> Job:
try:
job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError:
return None
return job

def get(
self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
) -> list[Job]:
self._check_redis_connection()

job_ids_to_remove, jobs = list(), list()
cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
for job_id in self.connection.smembers(cache_key):
job_id = job_id.decode("utf-8")
job = self._get_job(job_id)
# remove job from cache if cant be found - was removed by TTL
if job is None:
job_ids_to_remove.append(job_id)
continue
jobs.append(job)
if job_ids_to_remove:
self.connection.srem(cache_key, *job_ids_to_remove)
return jobs
11 changes: 9 additions & 2 deletions flexmeasures/data/services/scheduling.py
Expand Up @@ -198,10 +198,11 @@ def create_scheduling_job(
)
scheduler.deserialize_config()

asset_or_sensor = get_asset_or_sensor_ref(asset_or_sensor)
job = Job.create(
make_schedule,
kwargs=dict(
asset_or_sensor=get_asset_or_sensor_ref(asset_or_sensor),
asset_or_sensor=asset_or_sensor,
scheduler_specs=scheduler_specs,
**scheduler_kwargs,
),
Expand All @@ -220,7 +221,7 @@ def create_scheduling_job(
on_failure=Callback(trigger_optional_fallback),
)

job.meta["asset_or_sensor"] = get_asset_or_sensor_ref(asset_or_sensor)
job.meta["asset_or_sensor"] = asset_or_sensor
job.meta["scheduler_kwargs"] = scheduler_kwargs
job.save_meta()

Expand All @@ -230,6 +231,12 @@ def create_scheduling_job(
# with job_status=None, we ensure that only fresh new jobs are enqueued (in the contrary they should be requeued)
if enqueue and not job_status:
current_app.queues["scheduling"].enqueue_job(job)
current_app.job_cache.add(
asset_or_sensor["id"],
job.id,
queue="scheduling",
asset_or_sensor_type=asset_or_sensor["class"].lower(),
)

return job

Expand Down
80 changes: 79 additions & 1 deletion flexmeasures/data/services/sensors.py
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime, timedelta
from flask import current_app
from timely_beliefs import BeliefsDataFrame

from humanize.time import naturaldelta
Expand Down Expand Up @@ -171,7 +172,7 @@ def get_status(
def build_sensor_status_data(
asset: Asset,
now: datetime = None,
) -> list:
) -> list[dict]:
"""Get data connectivity status information for each sensor in given asset and its children
Returns a list of dictionaries, each containing the following keys:
- id: sensor id
Expand All @@ -197,3 +198,80 @@ def build_sensor_status_data(
sensor_status["asset_name"] = asset.name
sensors.append(sensor_status)
return sensors


def build_asset_jobs_data(
asset: Asset,
) -> list[dict]:
"""Get all jobs data for an asset
Returns a list of dictionaries, each containing the following keys:
- job_id: id of a job
- queue: job queue (scheduling or forecasting)
- asset_or_sensor_type: type of an asset that is linked to the job (asset or sensor)
- asset_id: id of sensor or asset
- status: job status (e.g finished, failed, etc)
- err: job error (equals to None when there was no error for a job)
- enqueued_at: time when the job was enqueued
"""

jobs = list()

# try to get scheduling jobs for asset first (only scheduling jobs can be stored by asset id)
jobs.append(
(
"scheduling",
"asset",
asset.id,
current_app.job_cache.get(asset.id, "scheduling", "asset"),
)
)

for sensor in asset.sensors:
jobs.append(
(
"scheduling",
"sensor",
sensor.id,
current_app.job_cache.get(sensor.id, "scheduling", "sensor"),
)
)
jobs.append(
(
"forecasting",
"sensor",
sensor.id,
current_app.job_cache.get(sensor.id, "forecasting", "sensor"),
)
)

jobs_data = list()
# Building the actual return list - we also unpack lists of jobs, each to its own entry, and we add error info
for queue, asset_or_sensor_type, asset_id, jobs in jobs:
for job in jobs:
e = job.meta.get(
"exception",
Exception(
"The job does not state why it failed. "
"The worker may be missing an exception handler, "
"or its exception handler is not storing the exception as job meta data."
),
)
job_err = (
f"Scheduling job failed with {type(e).__name__}: {e}"
if job.is_failed
else None
)

jobs_data.append(
{
"job_id": job.id,
"queue": queue,
"asset_or_sensor_type": asset_or_sensor_type,
"asset_id": asset_id,
"status": job.get_status(),
"err": job_err,
"enqueued_at": job.enqueued_at,
}
)

return jobs_data

0 comments on commit 031d6d6

Please sign in to comment.