Skip to content

Commit

Permalink
Merge pull request #286 from FlexMeasures/Issue-284_Move_sensor_data_…
Browse files Browse the repository at this point in the history
…from_Power/Price/Weather_to_TimedBelief

Issue 284 Move sensor data from Power/Price/Weather to TimedBelief
  • Loading branch information
Flix6x committed Jan 3, 2022
2 parents 2414292 + dd54ebc commit d788d9d
Show file tree
Hide file tree
Showing 54 changed files with 938 additions and 862 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -23,6 +23,7 @@ Infrastructure / Support
* Allow plugins to register their custom config settings, so that FlexMeasures can check whether they are set up correctly [see `PR #230 <http://www.github.com/FlexMeasures/flexmeasures/pull/230>`_ and `PR #237 <http://www.github.com/FlexMeasures/flexmeasures/pull/237>`_]
* Add sensor method to obtain just its latest state (excl. forecasts) [see `PR #235 <http://www.github.com/FlexMeasures/flexmeasures/pull/235>`_]
* Migrate attributes of assets, markets and weather sensors to our new sensor model [see `PR #254 <http://www.github.com/FlexMeasures/flexmeasures/pull/254>`_ and `project 9 <http://www.github.com/FlexMeasures/flexmeasures/projects/9>`_]
* Migrate all time series data to our new sensor data model based on the `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib [see `PR #286 <http://www.github.com/FlexMeasures/flexmeasures/pull/286>`_ and `project 9 <http://www.github.com/FlexMeasures/flexmeasures/projects/9>`_]


v0.7.1 | November 08, 2021
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/api/__init__.py
Expand Up @@ -2,8 +2,10 @@
from flask_security.utils import verify_password
from flask_json import as_json
from flask_login import current_user
from sqlalchemy.exc import IntegrityError

from flexmeasures import __version__ as flexmeasures_version
from flexmeasures.api.common.utils.api_utils import catch_timed_belief_replacements
from flexmeasures.data.models.user import User
from flexmeasures.api.common.utils.args_parsing import (
validation_error_handler,
Expand Down Expand Up @@ -84,6 +86,7 @@ def register_at(app: Flask):

# handle API specific errors
app.register_error_handler(FMValidationError, validation_error_handler)
app.register_error_handler(IntegrityError, catch_timed_belief_replacements)
app.unauthorized_handler_api = invalid_sender

app.register_blueprint(
Expand Down
18 changes: 16 additions & 2 deletions flexmeasures/api/common/responses.py
Expand Up @@ -39,11 +39,25 @@ def deprecated_api_version(message: str) -> ResponseTuple:
def already_received_and_successfully_processed(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
results="PROCESSED",
status="ALREADY_RECEIVED_AND_SUCCESSFULLY_PROCESSED",
message=message,
),
400,
200,
)


@BaseMessage(
"Some of the data represents a replacement, which is reserved for servers in play mode. Enable play mode or update the prior in your request."
)
def invalid_replacement(message: str) -> ResponseTuple:
return (
dict(
results="Rejected",
status="INVALID_REPLACEMENT",
message=message,
),
403,
)


Expand Down
67 changes: 63 additions & 4 deletions flexmeasures/api/common/utils/api_utils.py
@@ -1,5 +1,4 @@
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from flexmeasures.data.models.time_series import TimedBelief
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
Expand All @@ -8,6 +7,7 @@
from flask import current_app
from inflection import pluralize
from numpy import array
from psycopg2.errors import UniqueViolation
from rq.job import Job
from sqlalchemy.exc import IntegrityError
import timely_beliefs as tb
Expand All @@ -16,16 +16,18 @@
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.markets import Price
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.services.time_series import drop_unchanged_beliefs
from flexmeasures.data.utils import save_to_session
from flexmeasures.data.utils import save_to_session, save_to_db as modern_save_to_db
from flexmeasures.api.common.responses import (
invalid_replacement,
unrecognized_sensor,
ResponseTuple,
request_processed,
already_received_and_successfully_processed,
)
from flexmeasures.utils.error_utils import error_handling_router


def list_access(service_listing, service_name):
Expand Down Expand Up @@ -340,6 +342,40 @@ def get_sensor_by_generic_asset_type_and_location(
return sensor


def enqueue_forecasting_jobs(
forecasting_jobs: List[Job] = None,
):
"""Enqueue forecasting jobs.
:param forecasting_jobs: list of forecasting Jobs for redis queues.
"""
if forecasting_jobs is not None:
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]


def save_and_enqueue(
data: Union[BeliefsDataFrame, List[BeliefsDataFrame]],
forecasting_jobs: List[Job] = None,
save_changed_beliefs_only: bool = True,
) -> ResponseTuple:

# Attempt to save
status = modern_save_to_db(
data, save_changed_beliefs_only=save_changed_beliefs_only
)

# Only enqueue forecasting jobs upon successfully saving new data
if status[:7] == "success":
enqueue_forecasting_jobs(forecasting_jobs)

# Pick a response
if status == "success":
return request_processed()
elif status == "success_with_unchanged_beliefs_skipped":
return already_received_and_successfully_processed()
return invalid_replacement()


def save_to_db(
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
Expand All @@ -349,14 +385,22 @@ def save_to_db(
Data can only be replaced on servers in play mode.
TODO: remove options for Power, Price and Weather if we only handle beliefs one day.
TODO: remove this legacy function in its entirety (announced v0.8.0)
:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:param save_changed_beliefs_only: if True, beliefs that are already stored in the database with an earlier belief time are dropped.
:returns: ResponseTuple
"""

import warnings

warnings.warn(
"The method api.common.utils.api_utils.save_to_db is deprecated. Check out the following replacements:"
"- [recommended option] to store BeliefsDataFrames only, switch to data.utils.save_to_db"
"- to store BeliefsDataFrames and enqueue jobs, switch to api.common.utils.api_utils.save_and_enqueue"
)

if isinstance(timed_values, BeliefsDataFrame):

if save_changed_beliefs_only:
Expand Down Expand Up @@ -450,3 +494,18 @@ def determine_belief_timing(
]
return event_starts, belief_horizons
raise ValueError("Missing horizon or prior.")


def catch_timed_belief_replacements(error: IntegrityError):
"""Catch IntegrityErrors due to a UniqueViolation on the TimedBelief primary key.
Return a more informative message.
"""
if isinstance(error.orig, UniqueViolation) and "timed_belief_pkey" in str(
error.orig
):
# Some beliefs represented replacements, which was forbidden
return invalid_replacement()

# Forward to our generic error handler
return error_handling_router(error)
2 changes: 1 addition & 1 deletion flexmeasures/api/common/utils/validators.py
Expand Up @@ -271,7 +271,7 @@ def get_meter_data(user_source_ids):
}
The source ids then include the user's own id,
and ids of other users that are registered as a Prosumer and/or Energy Service Company.
and ids of other users whose organisation account is registered as a Prosumer and/or Energy Service Company.
"""

def wrapper(fn):
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/dev/sensor_data.py
@@ -1,7 +1,7 @@
from webargs.flaskparser import use_args

from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema
from flexmeasures.api.common.utils.api_utils import save_to_db
from flexmeasures.api.common.utils.api_utils import save_and_enqueue


@use_args(
Expand All @@ -15,13 +15,13 @@ def post_data(sensor_data):
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
response, code = save_to_db(beliefs)
response, code = save_and_enqueue(beliefs)
response.update(type="PostSensorDataResponse")
return response, code


def get_data():
""" GET from /sensorData"""
"""GET from /sensorData"""
# - use data.models.time_series.Sensor::search_beliefs() - might need to add a belief_horizon parameter
# - create the serialize method on the schema, to turn the resulting BeliefsDataFrame
# to the JSON the API should respond with.
Expand Down
17 changes: 16 additions & 1 deletion flexmeasures/api/dev/tests/test_sensor_data.py
Expand Up @@ -65,17 +65,32 @@ def test_post_invalid_sensor_data(
def test_post_sensor_data_twice(client, setup_api_test_data):
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
post_data = make_sensor_data_request_for_gas_sensor()

# Check that 1st time posting the data succeeds
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
assert response.status_code == 200

# Check that 2nd time posting the same data succeeds informatively
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 400
assert response.status_code == 200
assert "data has already been received" in response.json["message"]

# Check that replacing data fails informatively
post_data["values"][0] = 100
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 403
assert "data represents a replacement" in response.json["message"]
39 changes: 22 additions & 17 deletions flexmeasures/api/v1/implementations.py
Expand Up @@ -11,9 +11,9 @@
parse_entity_address,
EntityAddressException,
)
from flexmeasures.data.models.assets import Power
from flexmeasures.data import db
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.services.resources import get_sensors
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.api.common.responses import (
Expand All @@ -26,7 +26,7 @@
)
from flexmeasures.api.common.utils.api_utils import (
groups_to_dict,
save_to_db,
save_and_enqueue,
)
from flexmeasures.api.common.utils.validators import (
type_accepted,
Expand Down Expand Up @@ -199,8 +199,8 @@ def collect_connection_and_value_groups(

# Get the power values
# TODO: fill NaN for non-existing values
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = Power.search(
old_sensor_names=sensor_names,
power_bdf_dict: Dict[str, tb.BeliefsDataFrame] = TimedBelief.search(
sensor_names,
event_starts_after=start,
event_ends_before=end,
resolution=resolution,
Expand All @@ -210,6 +210,8 @@ def collect_connection_and_value_groups(
beliefs_before=belief_time_window[1],
user_source_ids=user_source_ids,
source_types=source_types,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
sum_multiple=False,
)
# Todo: parse time window of power_bdf_dict, which will be different for requests that are not of the form:
Expand Down Expand Up @@ -251,7 +253,7 @@ def create_connection_and_value_groups( # noqa: C901
if not user_sensors:
current_app.logger.info("User doesn't seem to have any assets")
user_sensor_ids = [sensor.id for sensor in user_sensors]
power_measurements = []
power_df_per_connection = []
forecasting_jobs = []
for connection_group, value_group in zip(generic_asset_name_groups, value_groups):
for connection in connection_group:
Expand Down Expand Up @@ -291,7 +293,8 @@ def create_connection_and_value_groups( # noqa: C901
)
return power_value_too_big(extra_info)

# Create new Power objects
# Create a new BeliefsDataFrame
beliefs = []
for j, value in enumerate(value_group):
dt = start + j * duration / len(value_group)
if rolling:
Expand All @@ -300,29 +303,31 @@ def create_connection_and_value_groups( # noqa: C901
h = horizon - (
(start + duration) - (dt + duration / len(value_group))
)
p = Power(
datetime=dt,
value=value
p = TimedBelief(
event_start=dt,
event_value=value
* -1, # Reverse sign for FlexMeasures specs with positive production and negative consumption
horizon=h,
sensor_id=sensor_id,
data_source_id=data_source.id,
belief_horizon=h,
sensor=sensor,
source=data_source,
)
power_measurements.append(p)

assert p not in db.session
beliefs.append(p)
power_df_per_connection.append(tb.BeliefsDataFrame(beliefs))

# make forecasts, but only if the sent-in values are not forecasts themselves
if horizon <= timedelta(
hours=0
): # Todo: replace 0 hours with whatever the moment of switching from ex-ante to ex-post is for this sensor
forecasting_jobs.extend(
create_forecasting_jobs(
Power,
sensor_id,
start,
start + duration,
resolution=duration / len(value_group),
enqueue=False,
enqueue=False, # will enqueue later, after saving data
)
)

return save_to_db(power_measurements, forecasting_jobs)
return save_and_enqueue(power_df_per_connection, forecasting_jobs)

0 comments on commit d788d9d

Please sign in to comment.