Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

post sensor data endpoint (modern) #147

Merged
merged 19 commits into from Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -8,7 +8,7 @@ v0.6.0 | July XX, 2021
New features
-----------
* Analytics view offers grouping of all assets by location [see `PR #148 <http://www.github.com/SeitaBV/flexmeasures/pull/148>`_]

* Add (experimental) endpoint to post sensor data for any sensor. Also supports our ongoing integration with data internally represented using the `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib [see `PR #147 <http://www.github.com/SeitaBV/flexmeasures/pull/147>`_]

Bugfixes
-----------
Expand Down
110 changes: 110 additions & 0 deletions flexmeasures/api/common/schemas/sensor_data.py
@@ -0,0 +1,110 @@
from datetime import timedelta

from flask_login import current_user
from marshmallow import fields, post_load, validates_schema, ValidationError
from timely_beliefs import BeliefsDataFrame
import pandas as pd

from flexmeasures.data import ma
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.api.common.schemas.sensors import SensorField
from flexmeasures.api.common.utils.api_utils import upsample_values
from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField
from flexmeasures.utils.time_utils import timedelta_to_pandas_freq_str


class SensorDataDescriptionSchema(ma.Schema):
"""
Describing sensor data (i.e. in a GET request).

TODO: when we want to support other entity types with this
schema (assets/weather/markets or actuators), we'll need some re-design.
"""

nhoening marked this conversation as resolved.
Show resolved Hide resolved
type = fields.Str() # type of request or response
sensor = SensorField(entity_type="sensor", fm_scheme="fm1")
start = AwareDateTimeField(format="iso")
duration = DurationField()
unit = fields.Str()

@validates_schema
def check_schema_unit_against_sensor_unit(self, data, **kwargs):
# TODO: technically, there are compatible units, like kWh and kW.
# They could be allowed here, and the SensorDataSchema could
# even convert values to the sensor's unit if possible.
if data["unit"] != data["sensor"].unit:
raise ValidationError(
f"Required unit for this sensor is {data['sensor'].unit}, got: {data['unit']}"
)


class SensorDataSchema(SensorDataDescriptionSchema):
"""
This schema includes data, so it can be used for POST requests
or GET responses.

TODO: For the GET use case, look at api/common/validators.py::get_data_downsampling_allowed
(sets a resolution parameter which we can pass to the data collection function).
"""

values = fields.List(fields.Float())

@validates_schema
def check_resolution_compatibility_of_values(self, data, **kwargs):
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution
# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return
if inferred_resolution % required_resolution != timedelta(hours=0):
raise ValidationError(
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)

@post_load()
def possibly_upsample_values(self, data, **kwargs):
"""
Upsample the data if needed, to fit to the sensor's resolution.
Marshmallow runs this after validation.
"""
inferred_resolution = data["duration"] / len(data["values"])
required_resolution = data["sensor"].event_resolution

# TODO: we don't yet have a good policy w.r.t. zero-resolution (direct measurement)
if required_resolution == timedelta(hours=0):
return data

# we already know resolutions are compatible (see validation)
if inferred_resolution != required_resolution:
data["values"] = upsample_values(
data["values"],
from_resolution=inferred_resolution,
to_resolution=required_resolution,
)
return data # TODO: what should we return here?
nhoening marked this conversation as resolved.
Show resolved Hide resolved

def load_bdf(sensor_data) -> BeliefsDataFrame:
"""
Turn the de-serialized and validated data into a BeliefsDataFrame.
"""
source = DataSource.query.get(current_user.id)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
if not source:
raise ValidationError(
f"User {current_user.id} is not an accepted data source."
)

num_values = len(sensor_data["values"])
step_duration = sensor_data["duration"] / num_values
dt_index = pd.date_range(
sensor_data["start"],
periods=num_values,
freq=timedelta_to_pandas_freq_str(step_duration),
nhoening marked this conversation as resolved.
Show resolved Hide resolved
tz=sensor_data["start"].tzinfo,
nhoening marked this conversation as resolved.
Show resolved Hide resolved
)
s = pd.Series(sensor_data["values"], index=dt_index)
return BeliefsDataFrame(
s,
source=source,
sensor=sensor_data["sensor"],
belief_horizon=timedelta(hours=0),
nhoening marked this conversation as resolved.
Show resolved Hide resolved
)
8 changes: 8 additions & 0 deletions flexmeasures/api/common/utils/validators.py
Expand Up @@ -47,6 +47,14 @@
from flexmeasures.data.services.users import get_users
from flexmeasures.utils.time_utils import server_now

"""
This module has validators used by API endpoints <= 2.0 to describe
acceptable parameters.
We aim to make this module obsolete by using Marshmallow.
Marshmallow is a better format to describe valid data.
There is some actual logic in here, which we still need. It can usually be ported to Marshmallow validators.
"""


p = inflect.engine()

Expand Down
10 changes: 10 additions & 0 deletions flexmeasures/api/dev/__init__.py
@@ -1,9 +1,19 @@
from flask import Flask
from flask_security import auth_token_required, roles_accepted


def register_at(app: Flask):
"""This can be used to register FlaskViews."""

from flexmeasures.api.dev.sensors import SensorAPI
from flexmeasures.api.dev.sensor_data import post_data as send_sensor_data

SensorAPI.register(app, route_prefix="/api/dev")

@app.route("/sensorData", methods=["POST"])
@auth_token_required
@roles_accepted("admin", "MDC")
nhoening marked this conversation as resolved.
Show resolved Hide resolved
def post_sensor_data():
return send_sensor_data()
nhoening marked this conversation as resolved.
Show resolved Hide resolved

# TODO: add GET /sensorData
26 changes: 26 additions & 0 deletions flexmeasures/api/dev/sensor_data.py
@@ -0,0 +1,26 @@
from webargs.flaskparser import use_args

from flexmeasures.data.config import db
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.api.common.schemas.sensor_data import SensorDataSchema


@use_args(
SensorDataSchema(),
location="json",
)
def post_data(sensor_data):
"""POST to /sensorData
nhoening marked this conversation as resolved.
Show resolved Hide resolved

Experimental dev feature which uses timely-beliefs
to create and save the data structure.
"""
beliefs = SensorDataSchema.load_bdf(sensor_data)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
TimedBelief.add_to_session(session=db.session, beliefs_data_frame=beliefs)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
db.session.commit()
return dict(status="ok")


def get_data():
# use data.models.time_series.Sensor::search_beliefs()
pass
nhoening marked this conversation as resolved.
Show resolved Hide resolved
43 changes: 43 additions & 0 deletions flexmeasures/api/dev/tests/conftest.py
@@ -0,0 +1,43 @@
from datetime import timedelta

from flask_security import SQLAlchemySessionUserDatastore
import pytest

from flexmeasures.data.models.time_series import Sensor


@pytest.fixture(scope="module", autouse=True)
def setup_api_test_data(db, setup_roles_users):
"""
Set up data for API dev tests.
"""
print("Setting up data for API v2.0 tests on %s" % db.engine)
give_prosumer_the_MDC_role(db)
Flix6x marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="function")
def setup_api_fresh_test_data(fresh_db, setup_roles_users_fresh_db):
"""
Set up fresh data for API dev tests.
"""
print("Setting up data for API dev tests on %s" % fresh_db.engine)

gas_sensor = Sensor(
name="some gas sensor",
unit="m³/h",
event_resolution=timedelta(minutes=10),
)
fresh_db.session.add(gas_sensor)
gas_sensor.owner = setup_roles_users_fresh_db["Test Supplier"]

give_prosumer_the_MDC_role(fresh_db)


def give_prosumer_the_MDC_role(db):

from flexmeasures.data.models.user import User, Role

user_datastore = SQLAlchemySessionUserDatastore(db.session, User, Role)
test_prosumer = user_datastore.find_user(email="test_prosumer@seita.nl")
mdc_role = user_datastore.create_role(name="MDC", description="Meter Data Company")
user_datastore.add_role_to_user(test_prosumer, mdc_role)
55 changes: 55 additions & 0 deletions flexmeasures/api/dev/tests/test_sensor_data.py
@@ -0,0 +1,55 @@
from flask import url_for
import pytest

from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.api.dev.tests.utils import make_sensor_data_request


@pytest.mark.parametrize("use_auth", [False, True])
def test_post_sensor_data_bad_auth(client, use_auth):
"""
Attempt to post sensor data with insufficient or missing auth.
"""
# the case without auth: authentication will fail
headers = {"content-type": "application/json"}
if use_auth:
# in this case, we successfully authenticate,
# but fail authorization (no admin or MDC role)
headers["Authorization"] = get_auth_token(
client, "test_supplier@seita.nl", "testtest"
)

post_data_response = client.post(
url_for("post_sensor_data"),
headers=headers,
)
print("Server responded with:\n%s" % post_data_response.data)
if use_auth:
assert post_data_response.status_code == 403
else:
assert post_data_response.status_code == 401


@pytest.mark.parametrize(
"request_field, new_value, error_field, error_text",
[
("duration", "PT30M", "_schema", "Resolution of 0:05:00 is incompatible"),
nhoening marked this conversation as resolved.
Show resolved Hide resolved
("sensor", "ea1.2021-01.io.flexmeasures:fm1.666", "sensor", "doesn't exist"),
("unit", "m", "_schema", "Required unit"),
],
)
nhoening marked this conversation as resolved.
Show resolved Hide resolved
def test_post_invalid_sensor_data(
client, setup_api_fresh_test_data, request_field, new_value, error_field, error_text
):
post_data = make_sensor_data_request()
post_data[request_field] = new_value
# this guy is allowed to post sensorData
auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest")
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 422
assert error_text in response.json["message"]["json"][error_field][0]
34 changes: 34 additions & 0 deletions flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py
@@ -0,0 +1,34 @@
import pytest

from flask import url_for

from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.api.dev.tests.utils import make_sensor_data_request
from flexmeasures.data.models.time_series import TimedBelief


@pytest.mark.parametrize(
"post_data, expected_num_values",
[
(make_sensor_data_request(), 6),
(make_sensor_data_request(num_values=3), 6), # upsample
],
)
def test_post_sensor_data(
client, setup_api_fresh_test_data, post_data, expected_num_values
):
beliefs_before = TimedBelief.query.filter(TimedBelief.sensor_id == 1).all()
print(f"BELIEFS BEFORE: {beliefs_before}")
assert len(beliefs_before) == 0
auth_token = get_auth_token(client, "test_prosumer@seita.nl", "testtest")
response = client.post(
url_for("post_sensor_data"),
json=post_data,
headers={"Authorization": auth_token},
)
print(response.json)
assert response.status_code == 200
beliefs = TimedBelief.query.filter(TimedBelief.sensor_id == 1).all()
print(f"BELIEFS AFTER: {beliefs}")
assert len(beliefs) == expected_num_values
assert beliefs[0].event_value == -11.28
9 changes: 9 additions & 0 deletions flexmeasures/api/dev/tests/utils.py
@@ -0,0 +1,9 @@
def make_sensor_data_request(num_values: int = 6, duration: str = "PT1H") -> dict:
return {
"type": "PostSensorDataRequest",
"sensor": "ea1.2021-01.io.flexmeasures:fm1.1",
"values": num_values * [-11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": duration,
"unit": "m³/h",
}
3 changes: 1 addition & 2 deletions flexmeasures/api/v2_0/tests/test_api_v2_0_users.py
@@ -1,7 +1,6 @@
from flask import url_for
import pytest

# from flexmeasures.data.models.user import User
from flexmeasures.data.services.users import find_user_by_email
from flexmeasures.api.tests.utils import get_auth_token, UserContext

Expand Down Expand Up @@ -92,7 +91,7 @@ def test_edit_user(client):
)
assert user_edit_response.status_code == 401
# admin can deactivate supplier, other changes will be ignored
# (id is in the Userschema of the API, but we ignore it)
# (id is in the User schema of the API, but we ignore it)
headers = {"content-type": "application/json", "Authorization": prosumer_auth_token}
user_edit_response = client.patch(
url_for("flexmeasures_api_v2_0.patch_user", id=supplier_id),
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/data/models/time_series.py
Expand Up @@ -148,7 +148,7 @@ def timerange(self) -> Dict[str, datetime_type]:
return dict(start=least_recent.event_start, end=most_recent.event_end)

def __repr__(self) -> str:
return f"<Sensor {self.id}: {self.name}>"
return f"<Sensor {self.id}: {self.name}, unit: {self.unit} res.: {self.event_resolution}>"


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
Expand Down