Skip to content

Commit

Permalink
post sensor data endpoint (modern) (#147)
Browse files Browse the repository at this point in the history
Add (experimental) endpoint to post sensor data for any sensor. Also supports our ongoing integration with data internally represented using the timely beliefs library. More of the data validation is being done using Marshmallow.


* Create draft PR for #145

* endpoint receives data and makes a BeliefsDataFrame

* save beliefs, first test running

* validation and upsampling in schema

* changelog entry

* test with upsampling, fix test data freshness

* more tests

* get test with non-existant sensor to work

* test sending in the wrong unit

* move unit check to SensorDataDescriptionSchema and comment on future work

* move BeliefsDataFrame creation to Schema

* clarity about type field, simpler date_range creation

* some endpoint documentation

* add some tests as Felix requested

* save beliefs with api_utils.save_to_db, test sending data multiple times

* add optional horizon field

* Reverse if-else statement

* Add placeholder for ownership check

Co-authored-by: nhoening <nhoening@users.noreply.github.com>
Co-authored-by: Nicolas Höning <nicolas@seita.nl>
Co-authored-by: Nicolas Höning <iam@nicolashoening.de>
Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
5 people committed Jul 20, 2021
1 parent 5a9a649 commit dc6b768
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 8 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -8,7 +8,7 @@ v0.6.0 | July XX, 2021
New features
-----------
* Analytics view offers grouping of all assets by location [see `PR #148 <http://www.github.com/SeitaBV/flexmeasures/pull/148>`_]

* Add (experimental) endpoint to post sensor data for any sensor. Also supports our ongoing integration with data internally represented using the `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib [see `PR #147 <http://www.github.com/SeitaBV/flexmeasures/pull/147>`_]

Bugfixes
-----------
Expand Down
126 changes: 126 additions & 0 deletions flexmeasures/api/common/schemas/sensor_data.py
@@ -0,0 +1,126 @@
from datetime import timedelta

from flask_login import current_user
from marshmallow import fields, post_load, validates_schema, ValidationError
from marshmallow.validate import Equal, OneOf
from timely_beliefs import BeliefsDataFrame
import pandas as pd

from flexmeasures.data import ma
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.api.common.schemas.sensors import SensorField
from flexmeasures.api.common.utils.api_utils import upsample_values
from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField


class SensorDataDescriptionSchema(ma.Schema):
"""
Describing sensor data (i.e. in a GET request).
TODO: when we want to support other entity types with this
schema (assets/weather/markets or actuators), we'll need some re-design.
"""

type = fields.Str(required=True, validate=Equal("GetSensorDataRequest"))
sensor = SensorField(required=True, entity_type="sensor", fm_scheme="fm1")
start = AwareDateTimeField(required=True, format="iso")
duration = DurationField(required=True)
horizon = DurationField(
required=False, missing=timedelta(hours=0), default=timedelta(hours=0)
)
unit = fields.Str(required=True)

@validates_schema
def check_user_rights_against_sensor(self, data, **kwargs):
"""If the user is a Prosumer and the sensor belongs to an asset
over which the Prosumer has no ownership, raise a ValidationError.
"""
# todo: implement check once sensors can belong to an asset
# https://github.com/SeitaBV/flexmeasures/issues/155
pass

@validates_schema
def check_schema_unit_against_sensor_unit(self, data, **kwargs):
# TODO: technically, there are compatible units, like kWh and kW.
# They could be allowed here, and the SensorDataSchema could
# even convert values to the sensor's unit if possible.
if data["unit"] != data["sensor"].unit:
raise ValidationError(
f"Required unit for this sensor is {data['sensor'].unit}, got: {data['unit']}"
)


class SensorDataSchema(SensorDataDescriptionSchema):
"""
This schema includes data, so it can be used for POST requests
or GET responses.
TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
(sets a resolution parameter which we can pass to the data collection function).
"""

type = fields.Str(
validate=OneOf(["PostSensorDataRequest", "GetSensorDataResponse"])
)
values = fields.List(fields.Float())

@validates_schema
def check_resolution_compatibility_of_values(self, data, **kwargs):
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution
# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)

@post_load()
def possibly_upsample_values(self, data, **kwargs):
"""
Upsample the data if needed, to fit to the sensor's resolution.
Marshmallow runs this after validation.
"""
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution

# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return data

# we already know resolutions are compatible (see validation)
if inferred_resolution != required_resolution:
data["values"] = upsample_values(
data["values"],
from_resolution=inferred_resolution,
to_resolution=required_resolution,
)
return data

def load_bdf(sensor_data) -> BeliefsDataFrame:
"""
Turn the de-serialized and validated data into a BeliefsDataFrame.
"""
source = DataSource.query.filter(
DataSource.user_id == current_user.id
).one_or_none()
if not source:
raise ValidationError(
f"User {current_user.id} is not an accepted data source."
)

num_values = len(sensor_data["values"])
event_resolution = sensor_data["duration"] / num_values
dt_index = pd.date_range(
sensor_data["start"],
periods=num_values,
freq=event_resolution,
)
s = pd.Series(sensor_data["values"], index=dt_index)
return BeliefsDataFrame(
s,
source=source,
sensor=sensor_data["sensor"],
belief_horizon=sensor_data["horizon"],
)
25 changes: 21 additions & 4 deletions flexmeasures/api/common/utils/api_utils.py
@@ -1,3 +1,5 @@
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from flexmeasures.data.models.time_series import TimedBelief
from typing import List, Sequence, Tuple, Union
import copy
from datetime import datetime, timedelta
Expand Down Expand Up @@ -334,19 +336,27 @@ def get_weather_sensor_by(


def save_to_db(
timed_values: List[Union[Power, Price, Weather]], forecasting_jobs: List[Job]
timed_values: Union[BeliefsDataFrame, List[Union[Power, Price, Weather]]],
forecasting_jobs: List[Job] = [],
) -> ResponseTuple:
"""Put the timed values into the database and enqueue forecasting jobs.
Data can only be replaced on servers in play mode.
:param timed_values: list of Power, Price or Weather values to be saved
TODO: remove options for Power, Price and Weather if we only handle beliefs one day.
:param timed_values: BeliefsDataFrame or a list of Power, Price or Weather values to be saved
:param forecasting_jobs: list of forecasting Jobs for redis queues.
:returns: ResponseTuple
"""
current_app.logger.info("SAVING TO DB AND QUEUEING...")
try:
save_to_session(timed_values)
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session, beliefs_data_frame=timed_values
)
else:
save_to_session(timed_values)
db.session.flush()
[current_app.queues["forecasting"].enqueue_job(job) for job in forecasting_jobs]
db.session.commit()
Expand All @@ -357,7 +367,14 @@ def save_to_db(

# Allow data to be replaced only in play mode
if current_app.config.get("FLEXMEASURES_MODE", "") == "play":
save_to_session(timed_values, overwrite=True)
if isinstance(timed_values, BeliefsDataFrame):
TimedBelief.add_to_session(
session=db.session,
beliefs_data_frame=timed_values,
allow_overwrite=True,
)
else:
save_to_session(timed_values, overwrite=True)
[
current_app.queues["forecasting"].enqueue_job(job)
for job in forecasting_jobs
Expand Down
8 changes: 8 additions & 0 deletions flexmeasures/api/common/utils/validators.py
Expand Up @@ -47,6 +47,14 @@
from flexmeasures.data.services.users import get_users
from flexmeasures.utils.time_utils import server_now

"""
This module has validators used by API endpoints <= 2.0 to describe
acceptable parameters.
We aim to make this module obsolete by using Marshmallow.
Marshmallow is a better format to describe valid data.
There is some actual logic in here, which we still need. It can usually be ported to Marshmallow validators.
"""


p = inflect.engine()

Expand Down
32 changes: 32 additions & 0 deletions flexmeasures/api/dev/__init__.py
@@ -1,9 +1,41 @@
from flask import Flask
from flask_security import auth_token_required, roles_accepted


def register_at(app: Flask):
"""This can be used to register FlaskViews."""

from flexmeasures.api.dev.sensors import SensorAPI
from flexmeasures.api.dev.sensor_data import post_data as post_sensor_data_impl

SensorAPI.register(app, route_prefix="/api/dev")

@app.route("/sensorData", methods=["POST"])
@auth_token_required
@roles_accepted("admin", "MDC", "Prosumer")
def post_sensor_data():
"""
Post sensor data to FlexMeasures.
For example:
{
"type": "PostSensorDataRequest",
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"values": [-11.28, -11.28, -11.28, -11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": "PT1H",
"unit": "m³/h",
}
The above request posts four values for a duration of one hour, where the first
event start is at the given start time, and subsequent values start in 15 minute intervals throughout the one hour duration.
The sensor is the one with ID=1.
The unit has to match the sensor's required unit.
The resolution of the data has to match the sensor's required resolution, but
FlexMeasures will attempt to upsample lower resolutions.
"""
return post_sensor_data_impl()

# TODO: add GET /sensorData
28 changes: 28 additions & 0 deletions flexmeasures/api/dev/sensor_data.py
@@ -0,0 +1,28 @@
from webargs.flaskparser import use_args

from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema
from flexmeasures.api.common.utils.api_utils import save_to_db


@use_args(
SensorDataSchema(),
location="json",
)
def post_data(sensor_data):
"""POST to /sensorData
Experimental dev feature which uses timely-beliefs
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
response, code = save_to_db(beliefs)
response.update(type="PostSensorDataResponse")
return response, code


def get_data():
""" GET from /sensorData"""
# - use data.models.time_series.Sensor::search_beliefs() - might need to add a belief_horizon parameter
# - create the serialize method on the schema, to turn the resulting BeliefsDataFrame
# to the JSON the API should respond with.
pass
48 changes: 48 additions & 0 deletions flexmeasures/api/dev/tests/conftest.py
@@ -0,0 +1,48 @@
from datetime import timedelta

from flask_security import SQLAlchemySessionUserDatastore
import pytest

from flexmeasures.data.models.time_series import Sensor


@pytest.fixture(scope="module", autouse=True)
def setup_api_test_data(db, setup_roles_users):
"""
Set up data for API dev tests.
"""
print("Setting up data for API v2.0 tests on %s" % db.engine)
add_gas_sensor(db, setup_roles_users["Test Supplier"])
give_prosumer_the_MDC_role(db)


@pytest.fixture(scope="function")
def setup_api_fresh_test_data(fresh_db, setup_roles_users_fresh_db):
"""
Set up fresh data for API dev tests.
"""
print("Setting up fresh data for API dev tests on %s" % fresh_db.engine)
for sensor in Sensor.query.all():
fresh_db.delete(sensor)
add_gas_sensor(fresh_db, setup_roles_users_fresh_db["Test Supplier"])
give_prosumer_the_MDC_role(fresh_db)


def add_gas_sensor(the_db, test_supplier):
gas_sensor = Sensor(
name="some gas sensor",
unit="m³/h",
event_resolution=timedelta(minutes=10),
)
the_db.session.add(gas_sensor)
gas_sensor.owner = test_supplier


def give_prosumer_the_MDC_role(db):

from flexmeasures.data.models.user import User, Role

user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
test_prosumer = user_datastore.find_user(email="test_prosumer@seita.nl")
mdc_role = user_datastore.create_role(name="MDC", description="Meter Data Company")
user_datastore.add_role_to_user(test_prosumer, mdc_role)

0 comments on commit dc6b768

Please sign in to comment.