Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

post sensor data endpoint (modern) #147

Merged
merged 19 commits into from Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -8,7 +8,7 @@ v0.6.0 | July XX, 2021
New features
-----------
* Analytics view offers grouping of all assets by location [see `PR #148 <http://www.github.com/SeitaBV/flexmeasures/pull/148>`_]

* Add (experimental) endpoint to post sensor data for any sensor. Also supports our ongoing integration with data internally represented using the `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib [see `PR #147 <http://www.github.com/SeitaBV/flexmeasures/pull/147>`_]

Bugfixes
-----------
Expand Down
115 changes: 115 additions & 0 deletions flexmeasures/api/common/schemas/sensor_data.py
@@ -0,0 +1,115 @@
from datetime import timedelta

from flask_login import current_user
from marshmallow import fields, post_load, validates_schema, ValidationError
from marshmallow.validate import Equal, OneOf
from timely_beliefs import BeliefsDataFrame
import pandas as pd

from flexmeasures.data import ma
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.api.common.schemas.sensors import SensorField
from flexmeasures.api.common.utils.api_utils import upsample_values
from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField


class SensorDataDescriptionSchema(ma.Schema):
"""
Describing sensor data (i.e. in a GET request).

TODO: Add an optional horizon field.
TODO: when we want to support other entity types with this
schema (assets/weather/markets or actuators), we'll need some re-design.
"""

nhoening marked this conversation as resolved.
Show resolved Hide resolved
type = fields.Str(validate=Equal("GetSensorDataRequest"))
sensor = SensorField(entity_type="sensor", fm_scheme="fm1")
start = AwareDateTimeField(format="iso")
duration = DurationField()
unit = fields.Str()

@validates_schema
def check_schema_unit_against_sensor_unit(self, data, **kwargs):
# TODO: technically, there are compatible units, like kWh and kW.
# They could be allowed here, and the SensorDataSchema could
# even convert values to the sensor's unit if possible.
if data["unit"] != data["sensor"].unit:
raise ValidationError(
f"Required unit for this sensor is {data['sensor'].unit}, got: {data['unit']}"
)


class SensorDataSchema(SensorDataDescriptionSchema):
"""
This schema includes data, so it can be used for POST requests
or GET responses.

TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
(sets a resolution parameter which we can pass to the data collection function).
"""

type = fields.Str(
validate=OneOf(["PostSensorDataRequest", "GetSensorDataResponse"])
)
values = fields.List(fields.Float())

@validates_schema
def check_resolution_compatibility_of_values(self, data, **kwargs):
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution
# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)

@post_load()
def possibly_upsample_values(self, data, **kwargs):
"""
Upsample the data if needed, to fit to the sensor's resolution.
Marshmallow runs this after validation.
"""
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution

# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return data

# we already know resolutions are compatible (see validation)
if inferred_resolution != required_resolution:
data["values"] = upsample_values(
data["values"],
from_resolution=inferred_resolution,
to_resolution=required_resolution,
)
return data

def load_bdf(sensor_data) -> BeliefsDataFrame:
"""
Turn the de-serialized and validated data into a BeliefsDataFrame.
"""
source = DataSource.query.filter(
DataSource.user_id == current_user.id
).one_or_none()
if not source:
raise ValidationError(
f"User {current_user.id} is not an accepted data source."
)

num_values = len(sensor_data["values"])
event_resolution = sensor_data["duration"] / num_values
dt_index = pd.date_range(
sensor_data["start"],
periods=num_values,
freq=event_resolution,
)
s = pd.Series(sensor_data["values"], index=dt_index)
return BeliefsDataFrame(
s,
source=source,
sensor=sensor_data["sensor"],
belief_horizon=timedelta(hours=0),
nhoening marked this conversation as resolved.
Show resolved Hide resolved
)
8 changes: 8 additions & 0 deletions flexmeasures/api/common/utils/validators.py
Expand Up @@ -47,6 +47,14 @@
from flexmeasures.data.services.users import get_users
from flexmeasures.utils.time_utils import server_now

"""
This module has validators used by API endpoints <= 2.0 to describe
acceptable parameters.
We aim to make this module obsolete by using Marshmallow.
Marshmallow is a better format to describe valid data.
There is some actual logic in here, which we still need. It can usually be ported to Marshmallow validators.
"""


p = inflect.engine()

Expand Down
32 changes: 32 additions & 0 deletions flexmeasures/api/dev/__init__.py
@@ -1,9 +1,41 @@
from flask import Flask
from flask_security import auth_token_required, roles_accepted


def register_at(app: Flask):
"""This can be used to register FlaskViews."""

from flexmeasures.api.dev.sensors import SensorAPI
from flexmeasures.api.dev.sensor_data import post_data as post_sensor_data_impl

SensorAPI.register(app, route_prefix="/api/dev")

@app.route("/sensorData", methods=["POST"])
@auth_token_required
@roles_accepted("admin", "MDC", "Prosumer")
def post_sensor_data():
"""
Post sensor data to FlexMeasures.

For example:

{
"type": "PostSensorDataRequest",
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"values": [-11.28, -11.28, -11.28, -11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": "PT1H",
"unit": "m³/h",
}

The above request posts four values for a duration of one hour, where the first
event start is at the given start time, and subsequent values start in 15 minute intervals throughout the one hour duration.

The sensor is the one with ID=1.
The unit has to match the sensor's required unit.
The resolution of the data has to match the sensor's required resolution, but
FlexMeasures will attempt to upsample lower resolutions.
"""
return post_sensor_data_impl()

# TODO: add GET /sensorData
29 changes: 29 additions & 0 deletions flexmeasures/api/dev/sensor_data.py
@@ -0,0 +1,29 @@
from webargs.flaskparser import use_args

from flexmeasures.data.config import db
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema


@use_args(
SensorDataSchema(),
location="json",
)
def post_data(sensor_data):
"""POST to /sensorData
nhoening marked this conversation as resolved.
Show resolved Hide resolved

Experimental dev feature which uses timely-beliefs
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
TimedBelief.add_to_session(session=db.session, beliefs_data_frame=beliefs)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
db.session.commit()
return dict(type="PostSensorDataResponse", status="ok")


def get_data():
""" GET from /sensorData"""
# - use data.models.time_series.Sensor::search_beliefs()
# - create the serialize method on the schema, to turn the resulting BeliefsDataFrame
# to the JSON the API should respond with.
pass
nhoening marked this conversation as resolved.
Show resolved Hide resolved
43 changes: 43 additions & 0 deletions flexmeasures/api/dev/tests/conftest.py
@@ -0,0 +1,43 @@
from datetime import timedelta

from flask_security import SQLAlchemySessionUserDatastore
import pytest

from flexmeasures.data.models.time_series import Sensor


@pytest.fixture(scope="module", autouse=True)
def setup_api_test_data(db, setup_roles_users):
"""
Set up data for API dev tests.
"""
print("Setting up data for API v2.0 tests on %s" % db.engine)
give_prosumer_the_MDC_role(db)
Flix6x marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="function")
def setup_api_fresh_test_data(fresh_db, setup_roles_users_fresh_db):
"""
Set up fresh data for API dev tests.
"""
print("Setting up data for API dev tests on %s" % fresh_db.engine)

gas_sensor = Sensor(
name="some gas sensor",
unit="m³/h",
event_resolution=timedelta(minutes=10),
)
fresh_db.session.add(gas_sensor)
gas_sensor.owner = setup_roles_users_fresh_db["Test Supplier"]

give_prosumer_the_MDC_role(fresh_db)


def give_prosumer_the_MDC_role(db):

from flexmeasures.data.models.user import User, Role

user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
test_prosumer = user_datastore.find_user(email="test_prosumer@seita.nl")
mdc_role = user_datastore.create_role(name="MDC", description="Meter Data Company")
user_datastore.add_role_to_user(test_prosumer, mdc_role)
55 changes: 55 additions & 0 deletions flexmeasures/api/dev/tests/test_sensor_data.py
@@ -0,0 +1,55 @@
from flask import url_for
import pytest

from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.api.dev.tests.utils import make_sensor_data_request


@pytest.mark.parametrize("use_auth", [False, True])
def test_post_sensor_data_bad_auth(client, use_auth):
"""
Attempt to post sensor data with insufficient or missing auth.
"""
# the case without auth: authentication will fail
headers = {"content-type": "application/json"}
if use_auth:
# in this case, we successfully authenticate,
# but fail authorization (no admin or MDC role)
headers["Authorization"] = get_auth_token(
client, "test_supplier@seita.nl", "testtest"
)

post_data_response = client.post(
url_for("post_sensor_data"),
headers=headers,
)
print("Server responded with:\n%s" % post_data_response.data)
if use_auth:
assert post_data_response.status_code == 403
else:
assert post_data_response.status_code == 401


@pytest.mark.parametrize(
"request_field, new_value, error_field, error_text",
[
("duration", "PT30M", "_schema", "Resolution of 0:05:00 is incompatible"),
nhoening marked this conversation as resolved.
Show resolved Hide resolved
("sensor", "ea1.2021-01.io.flexmeasures:fm1.666", "sensor", "doesn't exist"),
("unit", "m", "_schema", "Required unit"),
],
)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
def test_post_invalid_sensor_data(
client, setup_api_fresh_test_data, request_field, new_value, error_field, error_text
):
post_data = make_sensor_data_request()
post_data[request_field] = new_value
# this guy is allowed to post sensorData
auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest")
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 422
assert error_text in response.json["message"]["json"][error_field][0]
34 changes: 34 additions & 0 deletions flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py
@@ -0,0 +1,34 @@
import pytest

from flask import url_for

from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.api.dev.tests.utils import make_sensor_data_request
from flexmeasures.data.models.time_series import TimedBelief


@pytest.mark.parametrize(
"post_data, expected_num_values",
[
(make_sensor_data_request(), 6),
(make_sensor_data_request(num_values=3), 6), # upsample
],
)
def test_post_sensor_data(
client, setup_api_fresh_test_data, post_data, expected_num_values
):
beliefs_before = TimedBelief.query.filter(TimedBelief.sensor_id == 1).all()
print(f"BELIEFS BEFORE: {beliefs_before}")
assert len(beliefs_before) == 0
auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest")
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 200
beliefs = TimedBelief.query.filter(TimedBelief.sensor_id == 1).all()
print(f"BELIEFS AFTER: {beliefs}")
assert len(beliefs) == expected_num_values
assert beliefs[0].event_value == -11.28
9 changes: 9 additions & 0 deletions flexmeasures/api/dev/tests/utils.py
@@ -0,0 +1,9 @@
def make_sensor_data_request(num_values: int = 6, duration: str = "PT1H") -> dict:
return {
"type": "PostSensorDataRequest",
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"values": num_values * [-11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": duration,
"unit": "m³/h",
}
3 changes: 1 addition & 2 deletions flexmeasures/api/v2_0/tests/test_api_v2_0_users.py
@@ -1,7 +1,6 @@
from flask import url_for
import pytest

# from flexmeasures.data.models.user import User
from flexmeasures.data.services.users import find_user_by_email
from flexmeasures.api.tests.utils import get_auth_token, UserContext

Expand Down Expand Up @@ -92,7 +91,7 @@ def test_edit_user(client):
)
assert user_edit_response.status_code == 401
# admin can deactivate supplier, other changes will be ignored
# (id is in the Userschema of the API, but we ignore it)
# (id is in the User schema of the API, but we ignore it)
headers = {"content-type": "application/json", "Authorization": prosumer_auth_token}
user_edit_response = client.patch(
url_for("flexmeasures_api_v2_0.patch_user", id=supplier_id),
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/models/time_series.py
Expand Up @@ -148,7 +148,7 @@ def timerange(self) -> Dict[str, datetime_type]:
return dict(start=least_recent.event_start, end=most_recent.event_end)

def __repr__(self) -> str:
return f"<Sensor {self.id}: {self.name}>"
return f"<Sensor {self.id}: {self.name}, unit: {self.unit} res.: {self.event_resolution}>"


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
Expand Down