diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 28555a600..f599f29f9 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -8,7 +8,7 @@ v0.6.0 | July XX, 2021 New features ----------- * Analytics view offers grouping of all assets by location [see `PR #148 `_] - +* Add (experimental) endpoint to post sensor data for any sensor. Also supports our ongoing integration with data internally represented using the `timely beliefs `_ lib [see `PR #147 `_] Bugfixes ----------- diff --git a/flexmeasures/api/common/schemas/sensor_data.py b/flexmeasures/api/common/schemas/sensor_data.py new file mode 100644 index 000000000..5b8184aac --- /dev/null +++ b/flexmeasures/api/common/schemas/sensor_data.py @@ -0,0 +1,126 @@ +from datetime import timedelta + +from flask_login import current_user +from marshmallow import fields, post_load, validates_schema, ValidationError +from marshmallow.validate import Equal, OneOf +from timely_beliefs import BeliefsDataFrame +import pandas as pd + +from flexmeasures.data import ma +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.api.common.schemas.sensors import SensorField +from flexmeasures.api.common.utils.api_utils import upsample_values +from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField + + +class SensorDataDescriptionSchema(ma.Schema): + """ + Describing sensor data (i.e. in a GET request). + + TODO: when we want to support other entity types with this + schema (assets/weather/markets or actuators), we'll need some re-design. + """ + + type = fields.Str(required=True, validate=Equal("GetSensorDataRequest")) + sensor = SensorField(required=True, entity_type="sensor", fm_scheme="fm1") + start = AwareDateTimeField(required=True, format="iso") + duration = DurationField(required=True) + horizon = DurationField( + required=False, missing=timedelta(hours=0), default=timedelta(hours=0) + ) + unit = fields.Str(required=True) + + @validates_schema + def check_user_rights_against_sensor(self, data, **kwargs): + """If the user is a Prosumer and the sensor belongs to an asset + over which the Prosumer has no ownership, raise a ValidationError. + """ + # todo: implement check once sensors can belong to an asset + # https://github.com/SeitaBV/flexmeasures/issues/155 + pass + + @validates_schema + def check_schema_unit_against_sensor_unit(self, data, **kwargs): + # TODO: technically, there are compatible units, like kWh and kW. + # They could be allowed here, and the SensorDataSchema could + # even convert values to the sensor's unit if possible. + if data["unit"] != data["sensor"].unit: + raise ValidationError( + f"Required unit for this sensor is {data['sensor'].unit}, got: {data['unit']}" + ) + + +class SensorDataSchema(SensorDataDescriptionSchema): + """ + This schema includes data, so it can be used for POST requests + or GET responses. + + TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed + (sets a resolution parameter which we can pass to the data collection function). + """ + + type = fields.Str( + validate=OneOf(["PostSensorDataRequest", "GetSensorDataResponse"]) + ) + values = fields.List(fields.Float()) + + @validates_schema + def check_resolution_compatibility_of_values(self, data, **kwargs): + inferred_resolution = data["duration"] / len(data["values"]) + required_resolution = data["sensor"].event_resolution + # TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement) + if required_resolution == timedelta(hours=0): + return + if inferred_resolution % required_resolution != timedelta(hours=0): + raise ValidationError( + f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}." + ) + + @post_load() + def possibly_upsample_values(self, data, **kwargs): + """ + Upsample the data if needed, to fit to the sensor's resolution. + Marshmallow runs this after validation. + """ + inferred_resolution = data["duration"] / len(data["values"]) + required_resolution = data["sensor"].event_resolution + + # TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement) + if required_resolution == timedelta(hours=0): + return data + + # we already know resolutions are compatible (see validation) + if inferred_resolution != required_resolution: + data["values"] = upsample_values( + data["values"], + from_resolution=inferred_resolution, + to_resolution=required_resolution, + ) + return data + + def load_bdf(sensor_data) -> BeliefsDataFrame: + """ + Turn the de-serialized and validated data into a BeliefsDataFrame. + """ + source = DataSource.query.filter( + DataSource.user_id == current_user.id + ).one_or_none() + if not source: + raise ValidationError( + f"User {current_user.id} is not an accepted data source." + ) + + num_values = len(sensor_data["values"]) + event_resolution = sensor_data["duration"] / num_values + dt_index = pd.date_range( + sensor_data["start"], + periods=num_values, + freq=event_resolution, + ) + s = pd.Series(sensor_data["values"], index=dt_index) + return BeliefsDataFrame( + s, + source=source, + sensor=sensor_data["sensor"], + belief_horizon=sensor_data["horizon"], + ) diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index 1bebc73fe..c097fef84 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -1,3 +1,5 @@ +from timely_beliefs.beliefs.classes import BeliefsDataFrame +from flexmeasures.data.models.time_series import TimedBelief from typing import List, Sequence, Tuple, Union import copy from datetime import datetime, timedelta @@ -334,19 +336,27 @@ def get_weather_sensor_by( def save_to_db( - timed_values: List[Union[Power, Price, Weather]], forecasting_jobs: List[Job] + timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]], + forecasting_jobs: List[Job] = [], ) -> ResponseTuple: """Put the timed values into the database and enqueue forecasting jobs. Data can only be replaced on servers in play mode. - :param timed_values: list of Power, Price or Weather values to be saved + TODO: remove options for Power, Price and Weather if we only handle beliefs one day. + + :param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved :param forecasting_jobs: list of forecasting Jobs for redis queues. :returns: ResponseTuple """ current_app.logger.info("SAVING TO DB AND QUEUEING...") try: - save_to_session(timed_values) + if isinstance(timed_values, BeliefsDataFrame): + TimedBelief.add_to_session( + session=db.session, beliefs_data_frame=timed_values + ) + else: + save_to_session(timed_values) db.session.flush() [current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs] db.session.commit() @@ -357,7 +367,14 @@ def save_to_db( # Allow data to be replaced only in play mode if current_app.config.get("FLEXMEASURES_MODE", "") == "play": - save_to_session(timed_values, overwrite=True) + if isinstance(timed_values, BeliefsDataFrame): + TimedBelief.add_to_session( + session=db.session, + beliefs_data_frame=timed_values, + allow_overwrite=True, + ) + else: + save_to_session(timed_values, overwrite=True) [ current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs diff --git a/flexmeasures/api/common/utils/validators.py b/flexmeasures/api/common/utils/validators.py index 027e4d079..774d02b7e 100644 --- a/flexmeasures/api/common/utils/validators.py +++ b/flexmeasures/api/common/utils/validators.py @@ -47,6 +47,14 @@ from flexmeasures.data.services.users import get_users from flexmeasures.utils.time_utils import server_now +""" +This module has validators used by API endpoints <= 2.0 to describe +acceptable parameters. +We aim to make this module obsolete by using Marshmallow. +Marshmallow is a better format to describe valid data. +There is some actual logic in here, which we still need. It can usually be ported to Marshmallow validators. +""" + p = inflect.engine() diff --git a/flexmeasures/api/dev/__init__.py b/flexmeasures/api/dev/__init__.py index c175be741..a302935a3 100644 --- a/flexmeasures/api/dev/__init__.py +++ b/flexmeasures/api/dev/__init__.py @@ -1,9 +1,41 @@ from flask import Flask +from flask_security import auth_token_required, roles_accepted def register_at(app: Flask): """This can be used to register FlaskViews.""" from flexmeasures.api.dev.sensors import SensorAPI + from flexmeasures.api.dev.sensor_data import post_data as post_sensor_data_impl SensorAPI.register(app, route_prefix="/api/dev") + + @app.route("/sensorData", methods=["POST"]) + @auth_token_required + @roles_accepted("admin", "MDC", "Prosumer") + def post_sensor_data(): + """ + Post sensor data to FlexMeasures. + + For example: + + { + "type": "PostSensorDataRequest", + "sensor": "ea1.2021-01.io.flexmeasures:fm1.1", + "values": [-11.28, -11.28, -11.28, -11.28], + "start": "2021-06-07T00:00:00+02:00", + "duration": "PT1H", + "unit": "m³/h", + } + + The above request posts four values for a duration of one hour, where the first + event start is at the given start time, and subsequent values start in 15 minute intervals throughout the one hour duration. + + The sensor is the one with ID=1. + The unit has to match the sensor's required unit. + The resolution of the data has to match the sensor's required resolution, but + FlexMeasures will attempt to upsample lower resolutions. + """ + return post_sensor_data_impl() + + # TODO: add GET /sensorData diff --git a/flexmeasures/api/dev/sensor_data.py b/flexmeasures/api/dev/sensor_data.py new file mode 100644 index 000000000..fbb70f44b --- /dev/null +++ b/flexmeasures/api/dev/sensor_data.py @@ -0,0 +1,28 @@ +from webargs.flaskparser import use_args + +from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema +from flexmeasures.api.common.utils.api_utils import save_to_db + + +@use_args( + SensorDataSchema(), + location="json", +) +def post_data(sensor_data): + """POST to /sensorData + + Experimental dev feature which uses timely-beliefs + to create and save the data structure. + """ + beliefs = SensorDataSchema.load_bdf(sensor_data) + response, code = save_to_db(beliefs) + response.update(type="PostSensorDataResponse") + return response, code + + +def get_data(): + """ GET from /sensorData""" + # - use data.models.time_series.Sensor::search_beliefs() - might need to add a belief_horizon parameter + # - create the serialize method on the schema, to turn the resulting BeliefsDataFrame + # to the JSON the API should respond with. + pass diff --git a/flexmeasures/api/dev/tests/conftest.py b/flexmeasures/api/dev/tests/conftest.py new file mode 100644 index 000000000..1fe88e2b7 --- /dev/null +++ b/flexmeasures/api/dev/tests/conftest.py @@ -0,0 +1,48 @@ +from datetime import timedelta + +from flask_security import SQLAlchemySessionUserDatastore +import pytest + +from flexmeasures.data.models.time_series import Sensor + + +@pytest.fixture(scope="module", autouse=True) +def setup_api_test_data(db, setup_roles_users): + """ + Set up data for API dev tests. + """ + print("Setting up data for API v2.0 tests on %s" % db.engine) + add_gas_sensor(db, setup_roles_users["Test Supplier"]) + give_prosumer_the_MDC_role(db) + + +@pytest.fixture(scope="function") +def setup_api_fresh_test_data(fresh_db, setup_roles_users_fresh_db): + """ + Set up fresh data for API dev tests. + """ + print("Setting up fresh data for API dev tests on %s" % fresh_db.engine) + for sensor in Sensor.query.all(): + fresh_db.delete(sensor) + add_gas_sensor(fresh_db, setup_roles_users_fresh_db["Test Supplier"]) + give_prosumer_the_MDC_role(fresh_db) + + +def add_gas_sensor(the_db, test_supplier): + gas_sensor = Sensor( + name="some gas sensor", + unit="m³/h", + event_resolution=timedelta(minutes=10), + ) + the_db.session.add(gas_sensor) + gas_sensor.owner = test_supplier + + +def give_prosumer_the_MDC_role(db): + + from flexmeasures.data.models.user import User, Role + + user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role) + test_prosumer = user_datastore.find_user(email="test_prosumer@seita.nl") + mdc_role = user_datastore.create_role(name="MDC", description="Meter Data Company") + user_datastore.add_role_to_user(test_prosumer, mdc_role) diff --git a/flexmeasures/api/dev/tests/test_sensor_data.py b/flexmeasures/api/dev/tests/test_sensor_data.py new file mode 100644 index 000000000..69de69432 --- /dev/null +++ b/flexmeasures/api/dev/tests/test_sensor_data.py @@ -0,0 +1,81 @@ +from flask import url_for +import pytest + +from flexmeasures.api.tests.utils import get_auth_token +from flexmeasures.api.dev.tests.utils import make_sensor_data_request + + +@pytest.mark.parametrize("use_auth", [False, True]) +def test_post_sensor_data_bad_auth(client, use_auth): + """ + Attempt to post sensor data with insufficient or missing auth. + """ + # the case without auth: authentication will fail + headers = {"content-type": "application/json"} + if use_auth: + # in this case, we successfully authenticate, + # but fail authorization (no admin or MDC role) + headers["Authorization"] = get_auth_token( + client, "test_supplier@seita.nl", "testtest" + ) + + post_data_response = client.post( + url_for("post_sensor_data"), + headers=headers, + ) + print("Server responded with:\n%s" % post_data_response.data) + if use_auth: + assert post_data_response.status_code == 403 + else: + assert post_data_response.status_code == 401 + + +@pytest.mark.parametrize( + "request_field, new_value, error_field, error_text", + [ + ("start", "2021-06-07T00:00:00", "start", "Not a valid aware datetime"), + ( + "duration", + "PT30M", + "_schema", + "Resolution of 0:05:00 is incompatible", + ), # downsampling not supported + ("sensor", "ea1.2021-01.io.flexmeasures:fm1.666", "sensor", "doesn't exist"), + ("unit", "m", "_schema", "Required unit"), + ("type", "GetSensorDataRequest", "type", "Must be one of"), + ], +) +def test_post_invalid_sensor_data( + client, setup_api_test_data, request_field, new_value, error_field, error_text +): + post_data = make_sensor_data_request() + post_data[request_field] = new_value + # this guy is allowed to post sensorData + auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest") + response = client.post( + url_for("post_sensor_data"), + json=post_data, + headers={"Authorization": auth_token}, + ) + print(response.json) + assert response.status_code == 422 + assert error_text in response.json["message"]["json"][error_field][0] + + +def test_post_sensor_data_twice(client, setup_api_test_data): + auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest") + post_data = make_sensor_data_request() + response = client.post( + url_for("post_sensor_data"), + json=post_data, + headers={"Authorization": auth_token}, + ) + assert response.status_code == 200 + response = client.post( + url_for("post_sensor_data"), + json=post_data, + headers={"Authorization": auth_token}, + ) + print(response.json) + assert response.status_code == 400 + assert "data has already been received" in response.json["message"] diff --git a/flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py b/flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py new file mode 100644 index 000000000..55f7ac2fa --- /dev/null +++ b/flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py @@ -0,0 +1,37 @@ +import pytest + +from flask import url_for + +from flexmeasures.api.tests.utils import get_auth_token +from flexmeasures.api.dev.tests.utils import make_sensor_data_request +from flexmeasures.data.models.time_series import TimedBelief, Sensor + + +@pytest.mark.parametrize( + "num_values, expected_num_values", + [ + (6, 6), + (3, 6), # upsample + ], +) +def test_post_sensor_data( + client, setup_api_fresh_test_data, num_values, expected_num_values +): + post_data = make_sensor_data_request(num_values=num_values) + sensor = Sensor.query.filter(Sensor.name == "some gas sensor").one_or_none() + beliefs_before = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id).all() + print(f"BELIEFS BEFORE: {beliefs_before}") + assert len(beliefs_before) == 0 + + auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest") + response = client.post( + url_for("post_sensor_data"), + json=post_data, + headers={"Authorization": auth_token}, + ) + print(response.json) + assert response.status_code == 200 + beliefs = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id).all() + print(f"BELIEFS AFTER: {beliefs}") + assert len(beliefs) == expected_num_values + assert beliefs[0].event_value == -11.28 diff --git a/flexmeasures/api/dev/tests/utils.py b/flexmeasures/api/dev/tests/utils.py new file mode 100644 index 000000000..43ce6cd67 --- /dev/null +++ b/flexmeasures/api/dev/tests/utils.py @@ -0,0 +1,13 @@ +from flexmeasures.data.models.time_series import Sensor + + +def make_sensor_data_request(num_values: int = 6, duration: str = "PT1H") -> dict: + sensor = Sensor.query.filter(Sensor.name == "some gas sensor").one_or_none() + return { + "type": "PostSensorDataRequest", + "sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}", + "values": num_values * [-11.28], + "start": "2021-06-07T00:00:00+02:00", + "duration": duration, + "unit": "m³/h", + } diff --git a/flexmeasures/api/v2_0/tests/test_api_v2_0_users.py b/flexmeasures/api/v2_0/tests/test_api_v2_0_users.py index 771d8a8f7..7a921daf5 100644 --- a/flexmeasures/api/v2_0/tests/test_api_v2_0_users.py +++ b/flexmeasures/api/v2_0/tests/test_api_v2_0_users.py @@ -1,7 +1,6 @@ from flask import url_for import pytest -# from flexmeasures.data.models.user import User from flexmeasures.data.services.users import find_user_by_email from flexmeasures.api.tests.utils import get_auth_token, UserContext @@ -92,7 +91,7 @@ def test_edit_user(client): ) assert user_edit_response.status_code == 401 # admin can deactivate supplier, other changes will be ignored - # (id is in the Userschema of the API, but we ignore it) + # (id is in the User schema of the API, but we ignore it) headers = {"content-type": "application/json", "Authorization": prosumer_auth_token} user_edit_response = client.patch( url_for("flexmeasures_api_v2_0.patch_user", id=supplier_id), diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index 4e5284270..5f2a4d601 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -148,7 +148,7 @@ def timerange(self) -> Dict[str, datetime_type]: return dict(start=least_recent.event_start, end=most_recent.event_end) def __repr__(self) -> str: - return f"" + return f"" class TimedBelief(db.Model, tb.TimedBeliefDBMixin):