Skip to content

Commit

Permalink
Merge branch 'main' into feature/cli/clear-other-job-registries
Browse files Browse the repository at this point in the history
  • Loading branch information
nhoening committed Apr 23, 2024
2 parents 11f995c + 031d6d6 commit 41dfb25
Show file tree
Hide file tree
Showing 17 changed files with 512 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .vscode/spellright.dict
Expand Up @@ -278,3 +278,7 @@ balancer
url
HTTPS
Werkzeug
DSOs
BTM
FTM
EV
3 changes: 2 additions & 1 deletion Readme.md
Expand Up @@ -10,7 +10,8 @@
[![Coverage](https://coveralls.io/repos/github/FlexMeasures/flexmeasures/badge.svg)](https://coveralls.io/github/FlexMeasures/flexmeasures)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/6095/badge)](https://bestpractices.coreinfrastructure.org/projects/6095)

The *FlexMeasures Platform* is the intelligent & developer-friendly EMS (energy management system) to support real-time energy flexibility apps, rapidly and scalable.
*FlexMeasures* is an intelligent EMS (energy management system) to optimize behind-the-meter energy flexibility.
Build your smart energy apps & services with FlexMeasures as backend for real-time orchestration!

In a nutshell, FlexMeasures turns data into optimized schedules for flexible assets like batteries and heat pumps, or for flexible industry processes:

Expand Down
12 changes: 11 additions & 1 deletion documentation/changelog.rst
Expand Up @@ -9,7 +9,7 @@ v0.21.0 | April XX, 2024
New features
-------------

* Add `asset/<id>/status` page to view asset statuses [see `PR #41 <https://github.com/FlexMeasures/flexmeasures/pull/941/>`_]
* Add `asset/<id>/status` page to view asset statuses [see `PR #41 <https://github.com/FlexMeasures/flexmeasures/pull/941/>`_ and `PR #1035 <https://github.com/FlexMeasures/flexmeasures/pull/1035/>`_]
* Support `start_date` and `end_date` query parameters for the asset page [see `PR #1030 <https://github.com/FlexMeasures/flexmeasures/pull/1030/>`_]

Bugfixes
Expand All @@ -18,6 +18,16 @@ Bugfixes
Infrastructure / Support
----------------------

* Include started, deferred and scheduled jobs in the overview printed by the CLI command ``flexmeasures jobs show-queues`` [see `PR #1036 <https://github.com/FlexMeasures/flexmeasures/pull/1036/>`_]


v0.20.1 | April XX, 2024
============================

Bugfixes
-----------

* Prevent **p**lay/**p**ause/**s**top of replays when editing a text field in the UI [see `PR #1024 <https://github.com/FlexMeasures/flexmeasures/pull/1024>`_]

v0.20.0 | March 26, 2024
Expand Down
6 changes: 6 additions & 0 deletions documentation/cli/change_log.rst
Expand Up @@ -4,6 +4,12 @@
FlexMeasures CLI Changelog
**********************


since v.0.21.0 | April 12, 2024
=================================

* Include started, deferred and scheduled jobs in the overview printed by the CLI command ``flexmeasures jobs show-queues`.
since v.0.20.0 | March 26, 2024
=================================
Expand Down
12 changes: 12 additions & 0 deletions documentation/concepts/flexibility.rst
Expand Up @@ -12,6 +12,18 @@ Here, we define a few terms around this idea, which come up in other parts of th
:depth: 2


Behind-the-meter and front-of-the-meter
----------------------------------------

In the energy sector, we draw a distinction between behind-the-meter (BTM) and front-of-the-meter (FTM) optimization. As usual, the distinction isn't always clear, but we can give the general definition and the focus for FlexMeasures (BTM).

BTM optimization describes the optimization of assets connected on a site behind the main meter (which has the connection to the rest of the electricity grid). Think of local solar, heating, EV charging and even batteries. A (dynamic) tariff and limits to the grid connection often complete the picture, which can become quite complex and also rewarding to get right.

On the other hand, there is front-of-the-meter (FTM) optimization, which relates to grid-level optimization as is the work of utilities, DSOs and TSOs. Think of large-scale generation and its role in wholesale markets, managing transmission lines. But also, flexible grid-level assets like batteries and solar parks might belong here, and you might find that FlexMeasures can help to optimize some of these assets if you model the circumstances correctly.

When we focus on the situation behind the meter, do we ignore everything else? Not at all. It simply means to prioritize the local orchestration modeling, and then add services which the site can offer to the grid. For instance, using a dynamic tariff can already help the grid. Obeying (flexible) grid capacity constraints, as well, of course. Going further, extra flexibility can be offered explicitly to congestion markets/auctions, which is part of `FlexMeasures' roadmap <https://flexmeasures.io/roadmap/>`_. (Note: For a distinction between implicit and explicit flexibility, read on below).


Flexibility opportunities and activation
-----------------------------------------

Expand Down
5 changes: 3 additions & 2 deletions documentation/index.rst
@@ -1,9 +1,10 @@
Welcome to the FlexMeasures documentation!
===================================================================

FlexMeasures is the intelligent & developer-friendly EMS to support real-time energy flexibility apps, rapidly and scalable.
*FlexMeasures* is an intelligent EMS to optimize behind-the-meter energy flexibility.
Build your smart energy apps & services with FlexMeasures as backend for real-time orchestration!

The problem it helps you to solve is: **What are the best times to run flexible assets, such as batteries or heat pumps?**
The problem FlexMeasures helps you to solve is: **What are the best times to power flexible assets, such as batteries or heat pumps?**

In a nutshell, FlexMeasures turns data into optimized schedules for flexible assets.
*Why?* Planning ahead allows flexible assets to serve the whole system with their flexibility, e.g. by shifting energy consumption to more optimal times. For the asset owners, this creates CO₂ savings but also monetary value (e.g. through self-consumption, dynamic tariffs and grid incentives).
Expand Down
50 changes: 49 additions & 1 deletion flexmeasures/api/common/schemas/tests/test_sensor_data_schema.py
@@ -1,5 +1,6 @@
from datetime import timedelta
from datetime import timedelta, datetime
import pytest
import pytz

from marshmallow import ValidationError
import pandas as pd
Expand All @@ -9,12 +10,16 @@
PostSensorDataSchema,
GetSensorDataSchema,
)
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.data.services.scheduling import create_scheduling_job
from flexmeasures.data.services.sensors import (
get_staleness,
get_status,
build_sensor_status_data,
build_asset_jobs_data,
)
from flexmeasures.data.schemas.reporting import StatusSchema
from flexmeasures.utils.time_utils import as_server_time


@pytest.mark.parametrize(
Expand Down Expand Up @@ -268,3 +273,46 @@ def test_build_asset_status_data(mock_get_status, add_weather_sensors):
"asset_name": asset.name,
},
]


def custom_model_params():
"""little training as we have little data, turn off transformations until they let this test run (TODO)"""
return dict(
training_and_testing_period=timedelta(hours=2),
outcome_var_transformation=None,
regressor_transformation={},
)


def test_build_asset_jobs_data(db, app, add_battery_assets):
# """Test we get both types of jobs for a battery asset."""
battery_asset = add_battery_assets["Test battery"]
battery = battery_asset.sensors[0]
tz = pytz.timezone("Europe/Amsterdam")
start, end = tz.localize(datetime(2015, 1, 2)), tz.localize(datetime(2015, 1, 3))

scheduling_job = create_scheduling_job(
asset_or_sensor=battery,
start=start,
end=end,
belief_time=start,
resolution=timedelta(minutes=15),
)
forecasting_jobs = create_forecasting_jobs(
start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
horizons=[timedelta(hours=1)],
sensor_id=battery.id,
custom_model_params=custom_model_params(),
)

jobs_data = build_asset_jobs_data(battery_asset)
assert sorted([j["queue"] for j in jobs_data]) == ["forecasting", "scheduling"]
for job_data in jobs_data:
if job_data["queue"] == "forecasting":
assert job_data["job_id"] == forecasting_jobs[0].id
else:
assert job_data["job_id"] == scheduling_job.id
assert job_data["status"] == "queued"
assert job_data["asset_or_sensor_type"] == "sensor"
assert job_data["asset_id"] == battery.id
3 changes: 3 additions & 0 deletions flexmeasures/app.py
Expand Up @@ -20,6 +20,8 @@
from redis import Redis
from rq import Queue

from flexmeasures.data.services.job_cache import JobCache


def create( # noqa C901
env: str | None = None,
Expand Down Expand Up @@ -100,6 +102,7 @@ def create( # noqa C901
# labelling=Queue(connection=redis_conn, name="labelling"),
# alerting=Queue(connection=redis_conn, name="alerting"),
)
app.job_cache = JobCache(app.redis_connection)

# Some basic security measures

Expand Down
1 change: 1 addition & 0 deletions flexmeasures/conftest.py
Expand Up @@ -1095,6 +1095,7 @@ def clean_redis(app):
app.queues["forecasting"].empty()
for job_id in failed.get_job_ids():
failed.remove(app.queues["forecasting"].fetch_job(job_id))
app.redis_connection.flushdb()


@pytest.fixture(scope="session", autouse=True)
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/data/services/forecasting.py
Expand Up @@ -119,6 +119,9 @@ def create_forecasting_jobs(
jobs.append(job)
if enqueue:
current_app.queues["forecasting"].enqueue_job(job)
current_app.job_cache.add(
sensor_id, job.id, queue="forecasting", asset_or_sensor_type="sensor"
)
return jobs


Expand Down
78 changes: 78 additions & 0 deletions flexmeasures/data/services/job_cache.py
@@ -0,0 +1,78 @@
"""
Logic around storing and retrieving jobs from redis cache.
"""

from __future__ import annotations

import redis

from redis.exceptions import ConnectionError
from rq.job import Job, NoSuchJobError


class NoRedisConfigured(Exception):
def __init__(self, message="Redis not configured"):
super().__init__(message)


class JobCache:
"""
Class is used for storing jobs and retrieving them from redis cache.
Need it to be able to get jobs for particular asset (and display them on status page).
Keeps cache up to date by removing jobs that are not found in redis - were removed by TTL.
Stores jobs by asset or sensor id, queue and asset or sensor type, cache key can look like this
- forecasting:sensor:1 (forecasting jobs can be stored by sensor only)
- scheduling:sensor:2
- scheduling:asset:3
"""

def __init__(self, connection: redis.Redis):
self.connection = connection

def _get_cache_key(
self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
) -> str:
return f"{queue}:{asset_or_sensor_type}:{asset_or_sensor_id}"

def _check_redis_connection(self):
try:
self.connection.ping() # Check if the Redis connection is okay
except (ConnectionError, ConnectionRefusedError):
raise NoRedisConfigured

def add(
self,
asset_or_sensor_id: int,
job_id: str,
queue: str = None,
asset_or_sensor_type: str = None,
):
self._check_redis_connection()
cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
self.connection.sadd(cache_key, job_id)

def _get_job(self, job_id: str) -> Job:
try:
job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError:
return None
return job

def get(
self, asset_or_sensor_id: int, queue: str, asset_or_sensor_type: str
) -> list[Job]:
self._check_redis_connection()

job_ids_to_remove, jobs = list(), list()
cache_key = self._get_cache_key(asset_or_sensor_id, queue, asset_or_sensor_type)
for job_id in self.connection.smembers(cache_key):
job_id = job_id.decode("utf-8")
job = self._get_job(job_id)
# remove job from cache if cant be found - was removed by TTL
if job is None:
job_ids_to_remove.append(job_id)
continue
jobs.append(job)
if job_ids_to_remove:
self.connection.srem(cache_key, *job_ids_to_remove)
return jobs
11 changes: 9 additions & 2 deletions flexmeasures/data/services/scheduling.py
Expand Up @@ -198,10 +198,11 @@ def create_scheduling_job(
)
scheduler.deserialize_config()

asset_or_sensor = get_asset_or_sensor_ref(asset_or_sensor)
job = Job.create(
make_schedule,
kwargs=dict(
asset_or_sensor=get_asset_or_sensor_ref(asset_or_sensor),
asset_or_sensor=asset_or_sensor,
scheduler_specs=scheduler_specs,
**scheduler_kwargs,
),
Expand All @@ -220,7 +221,7 @@ def create_scheduling_job(
on_failure=Callback(trigger_optional_fallback),
)

job.meta["asset_or_sensor"] = get_asset_or_sensor_ref(asset_or_sensor)
job.meta["asset_or_sensor"] = asset_or_sensor
job.meta["scheduler_kwargs"] = scheduler_kwargs
job.save_meta()

Expand All @@ -230,6 +231,12 @@ def create_scheduling_job(
# with job_status=None, we ensure that only fresh new jobs are enqueued (in the contrary they should be requeued)
if enqueue and not job_status:
current_app.queues["scheduling"].enqueue_job(job)
current_app.job_cache.add(
asset_or_sensor["id"],
job.id,
queue="scheduling",
asset_or_sensor_type=asset_or_sensor["class"].lower(),
)

return job

Expand Down

0 comments on commit 41dfb25

Please sign in to comment.