diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 71e72be7f..243054d9e 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -9,7 +9,7 @@ v0.21.0 | April XX, 2024 New features ------------- -* Add `asset//status` page to view asset statuses [see `PR #41 `_] +* Add `asset//status` page to view asset statuses [see `PR #41 `_ and `PR #1035 `_] * Support `start_date` and `end_date` query parameters for the asset page [see `PR #1030 `_] Bugfixes diff --git a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py index 18618ed83..d04247465 100644 --- a/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py +++ b/flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py @@ -1,5 +1,6 @@ -from datetime import timedelta +from datetime import timedelta, datetime import pytest +import pytz from marshmallow import ValidationError import pandas as pd @@ -9,12 +10,16 @@ PostSensorDataSchema, GetSensorDataSchema, ) +from flexmeasures.data.services.forecasting import create_forecasting_jobs +from flexmeasures.data.services.scheduling import create_scheduling_job from flexmeasures.data.services.sensors import ( get_staleness, get_status, build_sensor_status_data, + build_asset_jobs_data, ) from flexmeasures.data.schemas.reporting import StatusSchema +from flexmeasures.utils.time_utils import as_server_time @pytest.mark.parametrize( @@ -268,3 +273,46 @@ def test_build_asset_status_data(mock_get_status, add_weather_sensors): "asset_name": asset.name, }, ] + + +def custom_model_params(): + """little training as we have little data, turn off transformations until they let this test run (TODO)""" + return dict( + training_and_testing_period=timedelta(hours=2), + outcome_var_transformation=None, + regressor_transformation={}, + ) + + +def test_build_asset_jobs_data(db, app, add_battery_assets): + # """Test we get both types of jobs for a battery asset.""" + battery_asset = add_battery_assets["Test battery"] + battery = battery_asset.sensors[0] + tz = pytz.timezone("Europe/Amsterdam") + start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3)) + + scheduling_job = create_scheduling_job( + asset_or_sensor=battery, + start=start, + end=end, + belief_time=start, + resolution=timedelta(minutes=15), + ) + forecasting_jobs = create_forecasting_jobs( + start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), + end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), + horizons=[timedelta(hours=1)], + sensor_id=battery.id, + custom_model_params=custom_model_params(), + ) + + jobs_data = build_asset_jobs_data(battery_asset) + assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"] + for job_data in jobs_data: + if job_data["queue"] == "forecasting": + assert job_data["job_id"] == forecasting_jobs[0].id + else: + assert job_data["job_id"] == scheduling_job.id + assert job_data["status"] == "queued" + assert job_data["asset_or_sensor_type"] == "sensor" + assert job_data["asset_id"] == battery.id diff --git a/flexmeasures/app.py b/flexmeasures/app.py index 4b4013231..c807da8b1 100644 --- a/flexmeasures/app.py +++ b/flexmeasures/app.py @@ -20,6 +20,8 @@ from redis import Redis from rq import Queue +from flexmeasures.data.services.job_cache import JobCache + def create( # noqa C901 env: str | None = None, @@ -100,6 +102,7 @@ def create( # noqa C901 # labelling=Queue(connection=redis_conn, name="labelling"), # alerting=Queue(connection=redis_conn, name="alerting"), ) + app.job_cache = JobCache(app.redis_connection) # Some basic security measures diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index 64053317e..f37191172 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -1095,6 +1095,7 @@ def clean_redis(app): app.queues["forecasting"].empty() for job_id in failed.get_job_ids(): failed.remove(app.queues["forecasting"].fetch_job(job_id)) + app.redis_connection.flushdb() @pytest.fixture(scope="session", autouse=True) diff --git a/flexmeasures/data/services/forecasting.py b/flexmeasures/data/services/forecasting.py index 1e1a6e0f4..86a7a2229 100644 --- a/flexmeasures/data/services/forecasting.py +++ b/flexmeasures/data/services/forecasting.py @@ -119,6 +119,9 @@ def create_forecasting_jobs( jobs.append(job) if enqueue: current_app.queues["forecasting"].enqueue_job(job) + current_app.job_cache.add( + sensor_id, job.id, queue="forecasting", asset_or_sensor_type="sensor" + ) return jobs diff --git a/flexmeasures/data/services/job_cache.py b/flexmeasures/data/services/job_cache.py new file mode 100644 index 000000000..522ac66b7 --- /dev/null +++ b/flexmeasures/data/services/job_cache.py @@ -0,0 +1,78 @@ +""" +Logic around storing and retrieving jobs from redis cache. +""" + +from __future__ import annotations + +import redis + +from redis.exceptions import ConnectionError +from rq.job import Job, NoSuchJobError + + +class NoRedisConfigured(Exception): + def __init__(self, message="Redis not configured"): + super().__init__(message) + + +class JobCache: + """ + Class is used for storing jobs and retrieving them from redis cache. + Need it to be able to get jobs for particular asset (and display them on status page). + Keeps cache up to date by removing jobs that are not found in redis - were removed by TTL. + Stores jobs by asset or sensor id, queue and asset or sensor type, cache key can look like this + - forecasting:sensor:1 (forecasting jobs can be stored by sensor only) + - scheduling:sensor:2 + - scheduling:asset:3 + """ + + def __init__(self, connection: redis.Redis): + self.connection = connection + + def _get_cache_key( + self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str + ) -> str: + return f"{queue}:{asset_or_sensor_type}:{asset_or_sensor_id}" + + def _check_redis_connection(self): + try: + self.connection.ping() # Check if the Redis connection is okay + except (ConnectionError, ConnectionRefusedError): + raise NoRedisConfigured + + def add( + self, + asset_or_sensor_id: int, + job_id: str, + queue: str = None, + asset_or_sensor_type: str = None, + ): + self._check_redis_connection() + cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type) + self.connection.sadd(cache_key, job_id) + + def _get_job(self, job_id: str) -> Job: + try: + job = Job.fetch(job_id, connection=self.connection) + except NoSuchJobError: + return None + return job + + def get( + self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str + ) -> list[Job]: + self._check_redis_connection() + + job_ids_to_remove, jobs = list(), list() + cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type) + for job_id in self.connection.smembers(cache_key): + job_id = job_id.decode("utf-8") + job = self._get_job(job_id) + # remove job from cache if cant be found - was removed by TTL + if job is None: + job_ids_to_remove.append(job_id) + continue + jobs.append(job) + if job_ids_to_remove: + self.connection.srem(cache_key, *job_ids_to_remove) + return jobs diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index c3b4a3c4d..b3043a734 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -198,10 +198,11 @@ def create_scheduling_job( ) scheduler.deserialize_config() + asset_or_sensor = get_asset_or_sensor_ref(asset_or_sensor) job = Job.create( make_schedule, kwargs=dict( - asset_or_sensor=get_asset_or_sensor_ref(asset_or_sensor), + asset_or_sensor=asset_or_sensor, scheduler_specs=scheduler_specs, **scheduler_kwargs, ), @@ -220,7 +221,7 @@ def create_scheduling_job( on_failure=Callback(trigger_optional_fallback), ) - job.meta["asset_or_sensor"] = get_asset_or_sensor_ref(asset_or_sensor) + job.meta["asset_or_sensor"] = asset_or_sensor job.meta["scheduler_kwargs"] = scheduler_kwargs job.save_meta() @@ -230,6 +231,12 @@ def create_scheduling_job( # with job_status=None, we ensure that only fresh new jobs are enqueued (in the contrary they should be requeued) if enqueue and not job_status: current_app.queues["scheduling"].enqueue_job(job) + current_app.job_cache.add( + asset_or_sensor["id"], + job.id, + queue="scheduling", + asset_or_sensor_type=asset_or_sensor["class"].lower(), + ) return job diff --git a/flexmeasures/data/services/sensors.py b/flexmeasures/data/services/sensors.py index 9c921e5b5..f9662cbf9 100644 --- a/flexmeasures/data/services/sensors.py +++ b/flexmeasures/data/services/sensors.py @@ -1,6 +1,7 @@ from __future__ import annotations from datetime import datetime, timedelta +from flask import current_app from timely_beliefs import BeliefsDataFrame from humanize.time import naturaldelta @@ -171,7 +172,7 @@ def get_status( def build_sensor_status_data( asset: Asset, now: datetime = None, -) -> list: +) -> list[dict]: """Get data connectivity status information for each sensor in given asset and its children Returns a list of dictionaries, each containing the following keys: - id: sensor id @@ -197,3 +198,80 @@ def build_sensor_status_data( sensor_status["asset_name"] = asset.name sensors.append(sensor_status) return sensors + + +def build_asset_jobs_data( + asset: Asset, +) -> list[dict]: + """Get all jobs data for an asset + Returns a list of dictionaries, each containing the following keys: + - job_id: id of a job + - queue: job queue (scheduling or forecasting) + - asset_or_sensor_type: type of an asset that is linked to the job (asset or sensor) + - asset_id: id of sensor or asset + - status: job status (e.g finished, failed, etc) + - err: job error (equals to None when there was no error for a job) + - enqueued_at: time when the job was enqueued + """ + + jobs = list() + + # try to get scheduling jobs for asset first (only scheduling jobs can be stored by asset id) + jobs.append( + ( + "scheduling", + "asset", + asset.id, + current_app.job_cache.get(asset.id, "scheduling", "asset"), + ) + ) + + for sensor in asset.sensors: + jobs.append( + ( + "scheduling", + "sensor", + sensor.id, + current_app.job_cache.get(sensor.id, "scheduling", "sensor"), + ) + ) + jobs.append( + ( + "forecasting", + "sensor", + sensor.id, + current_app.job_cache.get(sensor.id, "forecasting", "sensor"), + ) + ) + + jobs_data = list() + # Building the actual return list - we also unpack lists of jobs, each to its own entry, and we add error info + for queue, asset_or_sensor_type, asset_id, jobs in jobs: + for job in jobs: + e = job.meta.get( + "exception", + Exception( + "The job does not state why it failed. " + "The worker may be missing an exception handler, " + "or its exception handler is not storing the exception as job meta data." + ), + ) + job_err = ( + f"Scheduling job failed with {type(e).__name__}: {e}" + if job.is_failed + else None + ) + + jobs_data.append( + { + "job_id": job.id, + "queue": queue, + "asset_or_sensor_type": asset_or_sensor_type, + "asset_id": asset_id, + "status": job.get_status(), + "err": job_err, + "enqueued_at": job.enqueued_at, + } + ) + + return jobs_data diff --git a/flexmeasures/data/tests/test_job_cache.py b/flexmeasures/data/tests/test_job_cache.py new file mode 100644 index 000000000..14c464c39 --- /dev/null +++ b/flexmeasures/data/tests/test_job_cache.py @@ -0,0 +1,116 @@ +# flake8: noqa: E402 +from __future__ import annotations + +import pytest +import pytz +import unittest + +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch +from redis.exceptions import ConnectionError +from rq.job import NoSuchJobError + +from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.services.job_cache import JobCache, NoRedisConfigured +from flexmeasures.data.services.forecasting import create_forecasting_jobs +from flexmeasures.data.services.scheduling import create_scheduling_job +from flexmeasures.utils.time_utils import as_server_time + + +def custom_model_params(): + """little training as we have little data, turn off transformations until they let this test run (TODO)""" + return dict( + training_and_testing_period=timedelta(hours=2), + outcome_var_transformation=None, + regressor_transformation={}, + ) + + +def test_cache_on_create_forecasting_jobs(db, run_as_cli, app, setup_test_data): + """Test we add job to cache on creating forecasting job + get job from cache""" + wind_device_1: Sensor = setup_test_data["wind-asset-1"].sensors[0] + + job = create_forecasting_jobs( + start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), + end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), + horizons=[timedelta(hours=1)], + sensor_id=wind_device_1.id, + custom_model_params=custom_model_params(), + ) + + assert app.job_cache.get(wind_device_1.id, "forecasting", "sensor") == [job[0]] + + +def test_cache_on_create_scheduling_jobs(db, app, add_battery_assets, setup_test_data): + """Test we add job to cache on creating scheduling job + get job from cache""" + battery = add_battery_assets["Test battery"].sensors[0] + tz = pytz.timezone("Europe/Amsterdam") + start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3)) + + job = create_scheduling_job( + asset_or_sensor=battery, + start=start, + end=end, + belief_time=start, + resolution=timedelta(minutes=15), + ) + + assert app.job_cache.get(battery.id, "scheduling", "sensor") == [job] + + +class TestJobCache(unittest.TestCase): + def setUp(self): + self.connection = MagicMock(spec_set=["sadd", "smembers", "srem", "ping"]) + self.job_cache = JobCache(self.connection) + self.cache_key = "forecasting:sensor:sensor_id" + self.mock_redis_job = MagicMock(spec_set=["fetch"]) + + def test_no_redis_configured(self): + """Test raising NoRedisConfigured""" + self.connection.ping.side_effect = ConnectionError + with pytest.raises(NoRedisConfigured): + self.job_cache.add( + "sensor_id", + "job_id", + queue="forecasting", + asset_or_sensor_type="sensor", + ) + self.connection.sadd.assert_not_called() + + with pytest.raises(NoRedisConfigured): + self.job_cache.get("sensor_id", "forecasting", "sensor") + self.connection.smembers.assert_not_called() + + def test_add(self): + """Test adding to cache""" + self.job_cache.add( + "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor" + ) + self.connection.sadd.assert_called_with(self.cache_key, "job_id") + + def test_get_empty_queue(self): + """Test getting from cache with empty queue""" + self.job_cache.add( + "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor" + ) + self.connection.smembers.return_value = [b"job_id"] + + self.mock_redis_job.fetch.side_effect = NoSuchJobError + with patch("flexmeasures.data.services.job_cache.Job", new=self.mock_redis_job): + assert self.job_cache.get("sensor_id", "forecasting", "sensor") == [] + assert self.connection.srem.call_count == 1 + + def test_get_non_empty_queue(self): + """Test getting from cache with non empty forecasting queue""" + self.job_cache.add( + "sensor_id", "job_id", queue="forecasting", asset_or_sensor_type="sensor" + ) + forecasting_job = MagicMock() + self.connection.smembers.return_value = [b"job_id"] + + self.mock_redis_job.fetch.return_value = forecasting_job + with patch("flexmeasures.data.services.job_cache.Job", new=self.mock_redis_job): + assert self.job_cache.get("sensor_id", "forecasting", "sensor") == [ + forecasting_job + ] + assert self.connection.srem.call_count == 0 diff --git a/flexmeasures/ui/crud/assets.py b/flexmeasures/ui/crud/assets.py index e101aa3d1..64e50bbb2 100644 --- a/flexmeasures/ui/crud/assets.py +++ b/flexmeasures/ui/crud/assets.py @@ -17,6 +17,7 @@ from flexmeasures.auth.error_handling import unauthorized_handler from flexmeasures.auth.policy import check_access from flexmeasures.data.schemas import StartEndTimeSchema +from flexmeasures.data.services.job_cache import NoRedisConfigured from flexmeasures.data.models.generic_assets import ( GenericAssetType, GenericAsset, @@ -26,7 +27,10 @@ from flexmeasures.data.models.time_series import Sensor from flexmeasures.ui.utils.view_utils import render_flexmeasures_template from flexmeasures.ui.crud.api_wrapper import InternalApi -from flexmeasures.data.services.sensors import build_sensor_status_data +from flexmeasures.data.services.sensors import ( + build_sensor_status_data, + build_asset_jobs_data, +) """ @@ -321,8 +325,28 @@ def status(self, id: str): asset = process_internal_api_response(asset_dict, int(id), make_obj=True) status_data = build_sensor_status_data(asset) + # add data about forecasting and scheduling jobs + redis_connection_err = None + scheduling_job_data, forecasting_job_data = list(), list() + try: + jobs_data = build_asset_jobs_data(asset) + except NoRedisConfigured as e: + redis_connection_err = e.args[0] + else: + scheduling_job_data = [ + jd for jd in jobs_data if jd["queue"] == "scheduling" + ] + forecasting_job_data = [ + jd for jd in jobs_data if jd["queue"] == "forecasting" + ] + return render_flexmeasures_template( - "views/status.html", asset=asset, sensors=status_data + "views/status.html", + asset=asset, + sensors=status_data, + scheduling_job_data=scheduling_job_data, + forecasting_job_data=forecasting_job_data, + redis_connection_err=redis_connection_err, ) @login_required diff --git a/flexmeasures/ui/templates/views/status.html b/flexmeasures/ui/templates/views/status.html index fe9fb831b..972aa834c 100644 --- a/flexmeasures/ui/templates/views/status.html +++ b/flexmeasures/ui/templates/views/status.html @@ -2,12 +2,15 @@ {% set active_page = "assets" %} -{% block title %} {{asset.name}} {% endblock %} +{% block title %} {{asset.name}} - Status {% endblock %} {% block divs %}
+
+ +
@@ -55,6 +58,101 @@

Data connectivity for sensors of {{ asset.name }}

{% endfor %} +

Latest scheduling jobs of {{ asset.name }}

+ + + + + + + + + + + + + {% for job_data in scheduling_job_data: %} + + + + + + + + + {% endfor %} + + +

Latest forecasting jobs of {{ asset.name }}

+ + + + + + + + + + + + + {% for job_data in forecasting_job_data: %} + + + + + + + + + {% endfor %} + + + {% if redis_connection_err is not none %} +
{{ redis_connection_err }}
+ {% endif %}
@@ -63,5 +161,12 @@

Data connectivity for sensors of {{ asset.name }}

+ + {% endblock %} \ No newline at end of file