Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 284 Move sensor data from Power/Price/Weather to TimedBelief #286

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -23,6 +23,7 @@ Infrastructure / Support
* Allow plugins to register their custom config settings, so that FlexMeasures can check whether they are set up correctly [see `PR #230 <http://www.github.com/FlexMeasures/flexmeasures/pull/230>`_ and `PR #237 <http://www.github.com/FlexMeasures/flexmeasures/pull/237>`_]
* Add sensor method to obtain just its latest state (excl. forecasts) [see `PR #235 <http://www.github.com/FlexMeasures/flexmeasures/pull/235>`_]
* Migrate attributes of assets, markets and weather sensors to our new sensor model [see `PR #254 <http://www.github.com/FlexMeasures/flexmeasures/pull/254>`_ and `project 9 <http://www.github.com/FlexMeasures/flexmeasures/projects/9>`_]
* Migrate all time series data to our new sensor data model based on the `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib [see `PR #286 <http://www.github.com/FlexMeasures/flexmeasures/pull/286>`_ and `project 9 <http://www.github.com/FlexMeasures/flexmeasures/projects/9>`_]


v0.7.1 | November 08, 2021
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/api/__init__.py
Expand Up @@ -2,8 +2,10 @@
from flask_security.utils import verify_password
from flask_json import as_json
from flask_login import current_user
from sqlalchemy.exc import IntegrityError

from flexmeasures import __version__ as flexmeasures_version
from flexmeasures.api.common.utils.api_utils import catch_timed_belief_replacements
from flexmeasures.data.models.user import User
from flexmeasures.api.common.utils.args_parsing import (
validation_error_handler,
Expand Down Expand Up @@ -84,6 +86,7 @@ def register_at(app: Flask):

# handle API specific errors
app.register_error_handler(FMValidationError, validation_error_handler)
app.register_error_handler(IntegrityError, catch_timed_belief_replacements)
app.unauthorized_handler_api = invalid_sender

app.register_blueprint(
Expand Down
18 changes: 16 additions & 2 deletions flexmeasures/api/common/responses.py
Expand Up @@ -39,11 +39,25 @@ def deprecated_api_version(message: str) -> ResponseTuple:
def already_received_and_successfully_processed(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
results="PROCESSED",
status="ALREADY_RECEIVED_AND_SUCCESSFULLY_PROCESSED",
message=message,
),
400,
200,
)


@BaseMessage(
"Some of the data represents a replacement, which is reserved for servers in play mode. Enable play mode or update the prior in your request."
)
def invalid_replacement(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
status="INVALID_REPLACEMENT",
message=message,
),
403,
)


Expand Down
67 changes: 63 additions & 4 deletions flexmeasures/api/common/utils/api_utils.py
@@ -1,5 +1,4 @@
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from flexmeasures.data.models.time_series import TimedBelief
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
Expand All @@ -8,6 +7,7 @@
from flask import current_app
from inflection import pluralize
from numpy import array
from psycopg2.errors import UniqueViolation
from rq.job import Job
from sqlalchemy.exc import IntegrityError
import timely_beliefs as tb
Expand All @@ -16,16 +16,18 @@
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session
from flexmeasures.data.utils import save_to_session, save_to_db as modern_save_to_db
from flexmeasures.api.common.responses import (
invalid_replacement,
unrecognized_sensor,
ResponseTuple,
request_processed,
already_received_and_successfully_processed,
)
from flexmeasures.utils.error_utils import error_handling_router


def list_access(service_listing, service_name):
Expand Down Expand Up @@ -340,6 +342,40 @@ def get_sensor_by_generic_asset_type_and_location(
return sensor


def enqueue_forecasting_jobs(
forecasting_jobs: List[Job] = None,
):
"""Enqueue forecasting jobs.

:param forecasting_jobs: list of forecasting Jobs for redis queues.
"""
if forecasting_jobs is not None:
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]


def save_and_enqueue(
data: Union[BeliefsDataFrame, List[BeliefsDataFrame]],
forecasting_jobs: List[Job] = None,
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:

# Attempt to save
status = modern_save_to_db(
data, save_changed_beliefs_only=save_changed_beliefs_only
)

# Only enqueue forecasting jobs upon successfully saving new data
if status[:7] == "success":
enqueue_forecasting_jobs(forecasting_jobs)

# Pick a response
if status == "success":
return request_processed()
elif status == "success_with_unchanged_beliefs_skipped":
return already_received_and_successfully_processed()
return invalid_replacement()


def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
Expand All @@ -349,14 +385,22 @@ def save_to_db(

Data can only be replaced on servers in play mode.

TODO: remove options for Power, Price and Weather if we only handle beliefs one day.
TODO: remove this legacy function in its entirety (announced v0.8.0)

:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

import warnings

warnings.warn(
"The method api.common.utils.api_utils.save_to_db is deprecated. Check out the following replacements:"
"- [recommended option] to store BeliefsDataFrames only, switch to data.utils.save_to_db"
"- to store BeliefsDataFrames and enqueue jobs, switch to api.common.utils.api_utils.save_and_enqueue"
)

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
Expand Down Expand Up @@ -450,3 +494,18 @@ def determine_belief_timing(
]
return event_starts, belief_horizons
raise ValueError("Missing horizon or prior.")


def catch_timed_belief_replacements(error: IntegrityError):
"""Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key.

Return a more informative message.
"""
if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str(
error.orig
):
# Some beliefs represented replacements, which was forbidden
return invalid_replacement()

# Forward to our generic error handler
return error_handling_router(error)
2 changes: 1 addition & 1 deletion flexmeasures/api/common/utils/validators.py
Expand Up @@ -271,7 +271,7 @@ def get_meter_data(user_source_ids):
}

The source ids then include the user's own id,
and ids of other users that are registered as a Prosumer and/or Energy Service Company.
and ids of other users whose organisation account is registered as a Prosumer and/or Energy Service Company.
"""

def wrapper(fn):
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/dev/sensor_data.py
@@ -1,7 +1,7 @@
from webargs.flaskparser import use_args

from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema
from flexmeasures.api.common.utils.api_utils import save_to_db
from flexmeasures.api.common.utils.api_utils import save_and_enqueue


@use_args(
Expand All @@ -15,13 +15,13 @@ def post_data(sensor_data):
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
response, code = save_to_db(beliefs)
response, code = save_and_enqueue(beliefs)
response.update(type="PostSensorDataResponse")
return response, code


def get_data():
""" GET from /sensorData"""
"""GET from /sensorData"""
# - use data.models.time_series.Sensor::search_beliefs() - might need to add a belief_horizon parameter
# - create the serialize method on the schema, to turn the resulting BeliefsDataFrame
# to the JSON the API should respond with.
Expand Down
17 changes: 16 additions & 1 deletion flexmeasures/api/dev/tests/test_sensor_data.py
Expand Up @@ -65,17 +65,32 @@ def test_post_invalid_sensor_data(
def test_post_sensor_data_twice(client, setup_api_test_data):
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
post_data = make_sensor_data_request_for_gas_sensor()

# Check that 1st time posting the data succeeds
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
assert response.status_code == 200

# Check that 2nd time posting the same data succeeds informatively
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 400
assert response.status_code == 200
assert "data has already been received" in response.json["message"]

# Check that replacing data fails informatively
post_data["values"][0] = 100
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 403
assert "data represents a replacement" in response.json["message"]
39 changes: 22 additions & 17 deletions flexmeasures/api/v1/implementations.py
Expand Up @@ -11,9 +11,9 @@
parse_entity_address,
EntityAddressException,
)
from flexmeasures.data.models.assets import Power
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.services.resources import get_sensors
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.api.common.responses import (
Expand All @@ -26,7 +26,7 @@
)
from flexmeasures.api.common.utils.api_utils import (
groups_to_dict,
save_to_db,
save_and_enqueue,
)
from flexmeasures.api.common.utils.validators import (
type_accepted,
Expand Down Expand Up @@ -199,8 +199,8 @@ def collect_connection_and_value_groups(

# Get the power values
# TODO: fill NaN for non-existing values
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = Power.search(
old_sensor_names=sensor_names,
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = TimedBelief.search(
sensor_names,
event_starts_after=start,
event_ends_before=end,
resolution=resolution,
Expand All @@ -210,6 +210,8 @@ def collect_connection_and_value_groups(
beliefs_before=belief_time_window[1],
user_source_ids=user_source_ids,
source_types=source_types,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
sum_multiple=False,
)
# Todo: parse time window of power_bdf_dict, which will be different for requests that are not of the form:
Expand Down Expand Up @@ -251,7 +253,7 @@ def create_connection_and_value_groups( # noqa: C901
if not user_sensors:
current_app.logger.info("User doesn't seem to have any assets")
user_sensor_ids = [sensor.id for sensor in user_sensors]
power_measurements = []
power_df_per_connection = []
forecasting_jobs = []
for connection_group, value_group in zip(generic_asset_name_groups, value_groups):
for connection in connection_group:
Expand Down Expand Up @@ -291,7 +293,8 @@ def create_connection_and_value_groups( # noqa: C901
)
return power_value_too_big(extra_info)

# Create new Power objects
# Create a new BeliefsDataFrame
beliefs = []
for j, value in enumerate(value_group):
dt = start + j * duration / len(value_group)
if rolling:
Expand All @@ -300,29 +303,31 @@ def create_connection_and_value_groups( # noqa: C901
h = horizon - (
(start + duration) - (dt + duration / len(value_group))
)
p = Power(
datetime=dt,
value=value
p = TimedBelief(
event_start=dt,
event_value=value
* -1, # Reverse sign for FlexMeasures specs with positive production and negative consumption
horizon=h,
sensor_id=sensor_id,
data_source_id=data_source.id,
belief_horizon=h,
sensor=sensor,
source=data_source,
)
power_measurements.append(p)

assert p not in db.session
beliefs.append(p)
power_df_per_connection.append(tb.BeliefsDataFrame(beliefs))

# make forecasts, but only if the sent-in values are not forecasts themselves
if horizon <= timedelta(
hours=0
): # Todo: replace 0 hours with whatever the moment of switching from ex-ante to ex-post is for this sensor
forecasting_jobs.extend(
create_forecasting_jobs(
Power,
sensor_id,
start,
start + duration,
resolution=duration / len(value_group),
enqueue=False,
enqueue=False, # will enqueue later, after saving data
)
)

return save_to_db(power_measurements, forecasting_jobs)
return save_and_enqueue(power_df_per_connection, forecasting_jobs)