Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 273 add roundtrip efficiency to scheduler #291

Merged
merged 64 commits into from Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
8f4b724
Query TimedBelief rather than Power in api v1.3 tests
Flix6x Dec 23, 2021
61e2fd8
Query TimedBelief rather than Power in api v1.3 implementations
Flix6x Dec 23, 2021
07a8059
Query TimedBelief rather than Power in user services tests
Flix6x Dec 23, 2021
0b94664
Query TimedBelief rather than Power in query tests
Flix6x Dec 23, 2021
f3504bd
Query TimedBelief rather than Power in forecasting tests
Flix6x Dec 23, 2021
d196847
Query TimedBelief rather than Power in scheduling tests
Flix6x Dec 23, 2021
9a7f32e
Query TimedBelief rather than Power in api v1 tests
Flix6x Dec 23, 2021
0651806
Simplify data deletion, like, by a lot
Flix6x Dec 23, 2021
923406e
Count ex-ante TimedBeliefs after populating time series forecasts
Flix6x Dec 23, 2021
acfbc8a
Query TimedBelief rather than Price in api v1_1 tests
Flix6x Dec 23, 2021
d892995
Query TimedBelief rather than Power/Price/Weather in Resource.load_se…
Flix6x Dec 23, 2021
70bed39
Query TimedBelief rather than Power/Price/Weather in api v2.0 tests
Flix6x Dec 23, 2021
8289bba
Refactor: simplify duplicate query construction
Flix6x Dec 23, 2021
78ab21d
Add custom join target to get rid of SA warning
Flix6x Dec 24, 2021
109d547
Filter criteria should work for both TimedBeliefs and TimedValues
Flix6x Dec 26, 2021
ff45672
Clarify docstring
Flix6x Dec 26, 2021
47ed019
Query TimedBelief rather than Power in api v1 implementations
Flix6x Dec 26, 2021
4eb0856
Schedules should contain one deterministic belief per event
Flix6x Dec 23, 2021
45f5bb5
Fix type annotation
Flix6x Dec 27, 2021
28041c6
flake8
Flix6x Dec 27, 2021
613f591
Query TimedBelief rather than Price/Weather for analytics
Flix6x Dec 27, 2021
3ca9a8f
Query deterministic TimedBelief rather than Price for planning queries
Flix6x Dec 27, 2021
27dc9ef
Forecast TimedBelief rather than Power/Price/Weather
Flix6x Dec 27, 2021
66e6da0
Schedule TimedBelief rather than Power
Flix6x Dec 27, 2021
fb58ec7
Apparently, to initialize a TimedBelief is to save a TimedBelief, too
Flix6x Dec 27, 2021
b15a445
Create TimedBelief rather than Power/Price/Weather in data generation…
Flix6x Dec 27, 2021
6af5027
Bump timely-beliefs dependency
Flix6x Dec 27, 2021
809c6d0
Fix latest state query
Flix6x Dec 27, 2021
4c94a97
Revert "Apparently, to initialize a TimedBelief is to save a TimedBel…
Flix6x Dec 28, 2021
9f618ee
Prevent saving TimedBelief to session upon updating Sensor or Source
Flix6x Dec 28, 2021
fb5d311
Create only TimedBeliefs in conftests
Flix6x Dec 27, 2021
23e42a1
Use session.add_all calls instead of session.bulk_save_objects or ind…
Flix6x Dec 27, 2021
b942fa0
API directly creates TimedBeliefs
Flix6x Dec 28, 2021
24a785e
CLI uses TimedBeliefs only
Flix6x Dec 28, 2021
d5f181d
Data scripts use TimedBeliefs only
Flix6x Dec 28, 2021
314f700
One more conftest switched to creating TimedBeliefs instead of Weathe…
Flix6x Dec 28, 2021
9ffd449
Expand docstring note on forbidden replacements
Flix6x Dec 30, 2021
9b1fb22
Clarify docstring note on saving changed beliefs only
Flix6x Dec 30, 2021
229e230
Remove redundant flush
Flix6x Dec 30, 2021
fa563f3
Catch forbidden belief replacements with more specific exception
Flix6x Dec 30, 2021
95ccb15
Rename variable
Flix6x Dec 30, 2021
57875e4
One transaction per request
Flix6x Dec 30, 2021
c514cb5
Only enqueue forecasting jobs upon successfully saving new data
Flix6x Dec 30, 2021
dda69a9
Flush instead of commit
Flix6x Dec 30, 2021
2d5f721
Expand test for forbidden data replacement
Flix6x Dec 30, 2021
cd6b8c6
Simplify play mode excemption for replacing beliefs
Flix6x Dec 30, 2021
d7c0512
Add note about potential session rollback
Flix6x Dec 30, 2021
bb7171a
Typo
Flix6x Dec 30, 2021
fec17b7
Move UniqueViolation catching logic to error handler
Flix6x Dec 30, 2021
31ec00a
flake8
Flix6x Dec 30, 2021
a2fc26b
Rewrite solver to deal with asymmetry in up and down commitment prices
Flix6x Jan 1, 2022
9f3ab24
Add optional roundtrip_efficiency field to UDI events, and use it to …
Flix6x Jan 1, 2022
4f9f5c8
Add test cases for various round-trip efficiencies
Flix6x Jan 1, 2022
32034eb
Add changelog entries
Flix6x Jan 1, 2022
ae0c9cf
Add documentation for the new API field
Flix6x Jan 1, 2022
981ff8f
Grammar corrections
Flix6x Jan 1, 2022
48d98e7
Fix return value for empty EMS
Flix6x Jan 1, 2022
6c86207
Allow efficiencies per device for multi-device EMS, by stopping the a…
Flix6x Jan 1, 2022
3c9d6bf
Relax tests using some tolerance
Flix6x Jan 1, 2022
ba12570
Fix mistake
Flix6x Jan 2, 2022
b3cea6f
Add test docstring
Flix6x Jan 2, 2022
a95936f
Check round-trip efficiency for acceptable range
Flix6x Jan 2, 2022
527d926
Expand docstring
Flix6x Jan 3, 2022
28109ea
Merge remote-tracking branch 'origin/project-9' into Issue-273_Add_ro…
Flix6x Jan 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions documentation/api/change_log.rst
Expand Up @@ -39,6 +39,14 @@ v2.0-0 | 2020-11-14
- REST endpoints for managing assets: `/assets/` (GET, POST) and `/asset/<id>` (GET, PATCH, DELETE).


v1.3-11 | 2022-01-01
""""""""""""""""""""

*Affects all versions since v1.3*.

- Extended the *postUdiEvent* endpoint with an optional "roundtrip_efficiency" field, for use in scheduling.


v1.3-10 | 2021-11-08
""""""""""""""""""""

Expand Down
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -11,6 +11,7 @@ v0.8.0 | November XX, 2021
New features
-----------
* Charts with sensor data can be requested in one of the supported [`vega-lite themes <https://github.com/vega/vega-themes#included-themes>`_] (incl. a dark theme) [see `PR #221 <http://www.github.com/FlexMeasures/flexmeasures/pull/221>`_]
* Schedulers take into account round-trip efficiency if set [see `PR #291 <http://www.github.com/FlexMeasures/flexmeasures/pull/291>`_]

Bugfixes
-----------
Expand Down
21 changes: 20 additions & 1 deletion flexmeasures/api/__init__.py
Expand Up @@ -2,14 +2,17 @@
from flask_security.utils import verify_password
from flask_json import as_json
from flask_login import current_user
from psycopg2.errors import UniqueViolation
from sqlalchemy.exc import IntegrityError

from flexmeasures import __version__ as flexmeasures_version
from flexmeasures.data.models.user import User
from flexmeasures.api.common.utils.args_parsing import (
validation_error_handler,
)
from flexmeasures.api.common.responses import invalid_sender
from flexmeasures.api.common.responses import invalid_replacement, invalid_sender
from flexmeasures.data.schemas.utils import FMValidationError
from flexmeasures.utils.error_utils import error_handling_router

# The api blueprint. It is registered with the Flask app (see app.py)
flexmeasures_api = Blueprint("flexmeasures_api", __name__)
Expand Down Expand Up @@ -84,6 +87,7 @@ def register_at(app: Flask):

# handle API specific errors
app.register_error_handler(FMValidationError, validation_error_handler)
app.register_error_handler(IntegrityError, catch_timed_belief_replacements)
app.unauthorized_handler_api = invalid_sender

app.register_blueprint(
Expand Down Expand Up @@ -115,3 +119,18 @@ def register_at(app: Flask):
v1_3_register_at(app)
v2_0_register_at(app)
dev_register_at(app)


def catch_timed_belief_replacements(error: IntegrityError):
"""Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key.

Return a more informative message.
"""
if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str(
error.orig
):
# Some beliefs represented replacements, which was forbidden
return invalid_replacement()

# Forward to our generic error handler
return error_handling_router(error)
18 changes: 16 additions & 2 deletions flexmeasures/api/common/responses.py
Expand Up @@ -39,11 +39,25 @@ def deprecated_api_version(message: str) -> ResponseTuple:
def already_received_and_successfully_processed(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
results="PROCESSED",
status="ALREADY_RECEIVED_AND_SUCCESSFULLY_PROCESSED",
message=message,
),
400,
200,
)


@BaseMessage(
"Some of the data represents a replacement, which is reserved for servers in play mode. Enable play mode or update the prior in your request."
)
def invalid_replacement(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
status="INVALID_REPLACEMENT",
message=message,
),
403,
)


Expand Down
50 changes: 46 additions & 4 deletions flexmeasures/api/common/utils/api_utils.py
@@ -1,5 +1,4 @@
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from flexmeasures.data.models.time_series import TimedBelief
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
Expand All @@ -16,11 +15,12 @@
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session
from flexmeasures.data.utils import save_to_session, save_to_db as modern_save_to_db
from flexmeasures.api.common.responses import (
invalid_replacement,
unrecognized_sensor,
ResponseTuple,
request_processed,
Expand Down Expand Up @@ -340,6 +340,40 @@ def get_sensor_by_generic_asset_type_and_location(
return sensor


def enqueue_forecasting_jobs(
forecasting_jobs: List[Job] = None,
):
"""Enqueue forecasting jobs.

:param forecasting_jobs: list of forecasting Jobs for redis queues.
"""
if forecasting_jobs is not None:
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]


def save_and_enqueue(
data: Union[BeliefsDataFrame, List[BeliefsDataFrame]],
forecasting_jobs: List[Job] = None,
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:

# Attempt to save
status = modern_save_to_db(
data, save_changed_beliefs_only=save_changed_beliefs_only
)

# Only enqueue forecasting jobs upon successfully saving new data
if status[:7] == "success":
enqueue_forecasting_jobs(forecasting_jobs)

# Pick a response
if status == "success":
return request_processed()
elif status == "success_with_unchanged_beliefs_skipped":
return already_received_and_successfully_processed()
return invalid_replacement()


def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
Expand All @@ -349,14 +383,22 @@ def save_to_db(

Data can only be replaced on servers in play mode.

TODO: remove options for Power, Price and Weather if we only handle beliefs one day.
TODO: remove this legacy function in its entirety (announced v0.8.0)

:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

import warnings

warnings.warn(
"The method api.common.utils.api_utils.save_to_db is deprecated. Check out the following replacements:"
"- [recommended option] to store BeliefsDataFrames only, switch to data.utils.save_to_db"
"- to store BeliefsDataFrames and enqueue jobs, switch to api.common.utils.api_utils.save_and_enqueue"
)

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/common/utils/validators.py
Expand Up @@ -271,7 +271,7 @@ def get_meter_data(user_source_ids):
}

The source ids then include the user's own id,
and ids of other users that are registered as a Prosumer and/or Energy Service Company.
and ids of other users whose organisation account is registered as a Prosumer and/or Energy Service Company.
"""

def wrapper(fn):
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/dev/sensor_data.py
@@ -1,7 +1,7 @@
from webargs.flaskparser import use_args

from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema
from flexmeasures.api.common.utils.api_utils import save_to_db
from flexmeasures.api.common.utils.api_utils import save_and_enqueue


@use_args(
Expand All @@ -15,13 +15,13 @@ def post_data(sensor_data):
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
response, code = save_to_db(beliefs)
response, code = save_and_enqueue(beliefs)
response.update(type="PostSensorDataResponse")
return response, code


def get_data():
""" GET from /sensorData"""
"""GET from /sensorData"""
# - use data.models.time_series.Sensor::search_beliefs() - might need to add a belief_horizon parameter
# - create the serialize method on the schema, to turn the resulting BeliefsDataFrame
# to the JSON the API should respond with.
Expand Down
17 changes: 16 additions & 1 deletion flexmeasures/api/dev/tests/test_sensor_data.py
Expand Up @@ -65,17 +65,32 @@ def test_post_invalid_sensor_data(
def test_post_sensor_data_twice(client, setup_api_test_data):
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
post_data = make_sensor_data_request()

# Check that 1st time posting the data succeeds
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
assert response.status_code == 200

# Check that 2nd time posting the same data succeeds informatively
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 400
assert response.status_code == 200
assert "data has already been received" in response.json["message"]

# Check that replacing data fails informatively
post_data["values"][0] = 100
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 403
assert "data represents a replacement" in response.json["message"]
30 changes: 17 additions & 13 deletions flexmeasures/api/v1/implementations.py
Expand Up @@ -11,9 +11,9 @@
parse_entity_address,
EntityAddressException,
)
from flexmeasures.data.models.assets import Power
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.services.resources import get_sensors
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.api.common.responses import (
Expand All @@ -26,7 +26,7 @@
)
from flexmeasures.api.common.utils.api_utils import (
groups_to_dict,
save_to_db,
save_and_enqueue,
)
from flexmeasures.api.common.utils.validators import (
type_accepted,
Expand Down Expand Up @@ -199,8 +199,8 @@ def collect_connection_and_value_groups(

# Get the power values
# TODO: fill NaN for non-existing values
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = Power.search(
old_sensor_names=sensor_names,
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = TimedBelief.search(
sensor_names,
event_starts_after=start,
event_ends_before=end,
resolution=resolution,
Expand All @@ -210,6 +210,8 @@ def collect_connection_and_value_groups(
beliefs_before=belief_time_window[1],
user_source_ids=user_source_ids,
source_types=source_types,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
sum_multiple=False,
)
# Todo: parse time window of power_bdf_dict, which will be different for requests that are not of the form:
Expand Down Expand Up @@ -251,7 +253,7 @@ def create_connection_and_value_groups( # noqa: C901
if not user_sensors:
current_app.logger.info("User doesn't seem to have any assets")
user_sensor_ids = [sensor.id for sensor in user_sensors]
power_measurements = []
power_df_per_connection = []
forecasting_jobs = []
for connection_group, value_group in zip(generic_asset_name_groups, value_groups):
for connection in connection_group:
Expand Down Expand Up @@ -291,7 +293,8 @@ def create_connection_and_value_groups( # noqa: C901
)
return power_value_too_big(extra_info)

# Create new Power objects
# Create a new BeliefsDataFrame
beliefs = []
for j, value in enumerate(value_group):
dt = start + j * duration / len(value_group)
if rolling:
Expand All @@ -300,30 +303,31 @@ def create_connection_and_value_groups( # noqa: C901
h = horizon - (
(start + duration) - (dt + duration / len(value_group))
)
p = Power(
use_legacy_kwargs=False,
p = TimedBelief(
event_start=dt,
event_value=value
* -1, # Reverse sign for FlexMeasures specs with positive production and negative consumption
belief_horizon=h,
sensor=sensor,
source=data_source,
)
power_measurements.append(p)

assert p not in db.session
beliefs.append(p)
power_df_per_connection.append(tb.BeliefsDataFrame(beliefs))

# make forecasts, but only if the sent-in values are not forecasts themselves
if horizon <= timedelta(
hours=0
): # Todo: replace 0 hours with whatever the moment of switching from ex-ante to ex-post is for this sensor
forecasting_jobs.extend(
create_forecasting_jobs(
Power,
sensor_id,
start,
start + duration,
resolution=duration / len(value_group),
enqueue=False,
enqueue=False, # will enqueue later, after saving data
)
)

return save_to_db(power_measurements, forecasting_jobs)
return save_and_enqueue(power_df_per_connection, forecasting_jobs)