Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 282 use pint to check and convert units #283

Merged
merged 15 commits into from Dec 27, 2021
Merged
39 changes: 34 additions & 5 deletions flexmeasures/api/common/schemas/sensor_data.py
Expand Up @@ -13,6 +13,10 @@
from flexmeasures.api.common.schemas.sensors import SensorField
from flexmeasures.api.common.utils.api_utils import upsample_values
from flexmeasures.data.schemas.times import AwareDateTimeField, DurationField
from flexmeasures.utils.unit_utils import (
determine_unit_conversion_multiplier,
units_are_convertible,
)


class SingleValueField(fields.Float):
Expand Down Expand Up @@ -81,12 +85,21 @@ def check_user_rights_against_sensor(self, data, **kwargs):

@validates_schema
def check_schema_unit_against_sensor_unit(self, data, **kwargs):
# TODO: technically, there are compatible units, like kWh and kW.
# They could be allowed here, and the SensorDataSchema could
# even convert values to the sensor's unit if possible.
if data["unit"] != data["sensor"].unit:
"""Allows units compatible with that of the sensor.
For example, a sensor with W units allows data to be posted with units:
- W, kW, MW, etc. (i.e. units with different prefixes)
- J/s, Nm/s, etc. (i.e. units that can be converted using some multiplier)
- Wh, kWh, etc. (i.e. units that represent a stock delta, which knowing the duration can be converted to a flow)
For compatible units, the SensorDataSchema converts values to the sensor's unit.
"""
posted_unit = data["unit"]
required_unit = data["sensor"].unit

if posted_unit != required_unit and not units_are_convertible(
posted_unit, required_unit
):
raise ValidationError(
f"Required unit for this sensor is {data['sensor'].unit}, got: {data['unit']}"
f"Required unit for this sensor is {data['sensor'].unit}, got incompatible unit: {data['unit']}"
)


Expand Down Expand Up @@ -120,6 +133,22 @@ def check_resolution_compatibility_of_values(self, data, **kwargs):
f"Resolution of {inferred_resolution} is incompatible with the sensor's required resolution of {required_resolution}."
)

@post_load()
def possibly_convert_units(self, data, **kwargs):
"""
Convert values if needed, to fit the sensor's unit.
Marshmallow runs this after validation.
"""
posted_unit = data["unit"]
required_unit = data["sensor"].unit

if posted_unit != required_unit:
multiplier = determine_unit_conversion_multiplier(
posted_unit, required_unit, data["sensor"].event_resolution
)
data["values"] = [multiplier * value for value in data["values"]]
return data

@post_load()
def possibly_upsample_values(self, data, **kwargs):
"""
Expand Down
27 changes: 20 additions & 7 deletions flexmeasures/api/dev/tests/test_sensor_data_fresh_db.py
Expand Up @@ -8,17 +8,29 @@


@pytest.mark.parametrize(
"num_values, expected_num_values",
"num_values, expected_num_values, unit, expected_value",
[
(6, 6),
(3, 6), # upsample
(1, 6), # upsample single value sent as float rather than list of floats
(6, 6, "m³/h", -11.28),
(6, 6, "m³", 6 * -11.28), # 6 * 10-min intervals per hour
(6, 6, "l/h", -11.28 / 1000), # 1 m³ = 1000 l
(3, 6, "m³/h", -11.28), # upsample to 20-min intervals
(
1,
6,
"m³/h",
-11.28,
), # upsample to single value for 1-hour interval, sent as float rather than list of floats
],
)
def test_post_sensor_data(
client, setup_api_fresh_test_data, num_values, expected_num_values
client,
setup_api_fresh_test_data,
num_values,
expected_num_values,
unit,
expected_value,
):
post_data = make_sensor_data_request(num_values=num_values)
post_data = make_sensor_data_request(num_values=num_values, unit=unit)
sensor = Sensor.query.filter(Sensor.name == "some gas sensor").one_or_none()
beliefs_before = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id).all()
print(f"BELIEFS BEFORE: {beliefs_before}")
Expand All @@ -35,4 +47,5 @@ def test_post_sensor_data(
beliefs = TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id).all()
print(f"BELIEFS AFTER: {beliefs}")
assert len(beliefs) == expected_num_values
assert beliefs[0].event_value == -11.28
# check that values are scaled to the sensor unit correctly
assert pytest.approx(beliefs[0].event_value - expected_value) == 0
6 changes: 4 additions & 2 deletions flexmeasures/api/dev/tests/utils.py
@@ -1,15 +1,17 @@
from flexmeasures.data.models.time_series import Sensor


def make_sensor_data_request(num_values: int = 6, duration: str = "PT1H") -> dict:
def make_sensor_data_request(
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
num_values: int = 6, duration: str = "PT1H", unit: str = "m³"
) -> dict:
sensor = Sensor.query.filter(Sensor.name == "some gas sensor").one_or_none()
message: dict = {
"type": "PostSensorDataRequest",
"sensor": f"ea1.2021-01.io.flexmeasures:fm1.{sensor.id}",
"values": num_values * [-11.28],
"start": "2021-06-07T00:00:00+02:00",
"duration": duration,
"unit": "m³/h",
"unit": unit,
}
if num_values == 1:
# flatten [<float>] to <float>
Expand Down
112 changes: 112 additions & 0 deletions flexmeasures/utils/tests/test_unit_utils.py
@@ -0,0 +1,112 @@
from datetime import timedelta
import pytest

from flexmeasures.utils.unit_utils import (
determine_flow_unit,
determine_stock_unit,
determine_unit_conversion_multiplier,
units_are_convertible,
is_energy_unit,
is_power_unit,
u,
)


@pytest.mark.parametrize(
"unit, time_unit, expected_unit",
[
("m³", None, "m³/h"),
("kWh", None, "kW"),
("km", "h", "km/h"),
("m", "s", "km/h"),
],
)
def test_determine_flow_unit(
unit,
time_unit,
expected_unit,
):
if time_unit is None:
assert determine_flow_unit(unit) == expected_unit
else:
assert determine_flow_unit(unit, time_unit) == expected_unit


@pytest.mark.parametrize(
"unit, time_unit, expected_unit",
[
("m³/h", None, "m³"),
("kW", None, "kWh"),
("m/s", "s", "m"),
("m/s", "h", "km"),
],
)
def test_determine_stock_unit(
unit,
time_unit,
expected_unit,
):
if time_unit is None:
assert determine_stock_unit(unit) == expected_unit
else:
assert determine_stock_unit(unit, time_unit) == expected_unit


def test_determine_unit_conversion_multiplier():
assert determine_unit_conversion_multiplier("kW", "W") == 1000
assert determine_unit_conversion_multiplier("J/s", "W") == 1
assert determine_unit_conversion_multiplier("Wh", "W", timedelta(minutes=10)) == 6
assert determine_unit_conversion_multiplier("kWh", "MJ") == 3.6
assert determine_unit_conversion_multiplier("°C", "K") == 274.15


def test_h_denotes_hour_and_not_planck_constant():
assert u.Quantity("h").dimensionality == u.Quantity("hour").dimensionality
assert (
u.Quantity("hbar").dimensionality
== u.Quantity("planck_constant").dimensionality
)


def test_units_are_convertible():
assert units_are_convertible("kW", "W") # units just have different prefixes
assert units_are_convertible(
"J/s", "W"
) # units can be converted using some multiplier
assert units_are_convertible(
"Wh", "W"
) # units that represent a stock delta can, knowing the duration, be converted to a flow
assert units_are_convertible("toe", "W") # tonne of oil equivalent
assert units_are_convertible("°C", "K") # offset unit to absolute unit
assert not units_are_convertible("°C", "W")
assert not units_are_convertible("EUR/MWh", "W")


@pytest.mark.parametrize(
"unit, power_unit",
[
("EUR/MWh", False),
("KRW/kWh", False),
("kWh", False),
("kW", True),
("watt", True),
("°C", False),
],
)
def test_is_power_unit(unit: str, power_unit: bool):
assert is_power_unit(unit) is power_unit


@pytest.mark.parametrize(
"unit, energy_unit",
[
("EUR/MWh", False),
("KRW/kWh", False),
("kWh", True),
("kW", False),
("watthour", True),
("°C", False),
],
)
def test_is_energy_unit(unit: str, energy_unit: bool):
assert is_energy_unit(unit) is energy_unit
126 changes: 107 additions & 19 deletions flexmeasures/utils/unit_utils.py
@@ -1,42 +1,130 @@
from datetime import timedelta
from typing import Optional

from moneyed import list_all_currencies
import importlib.resources as pkg_resources
import pint

# Edit constants template to stop using h to represent planck_constant
constants_template = (
pkg_resources.read_text(pint, "constants_en.txt")
.replace("= h ", " ")
.replace(" h ", " planck_constant ")
)

# Edit units template to use h to represent hour instead of planck_constant
units_template = (
pkg_resources.read_text(pint, "default_en.txt")
.replace("@import constants_en.txt", "")
.replace(" h ", " planck_constant ")
.replace("hour = 60 * minute = hr", "hour = 60 * minute = h = hr")
)

# Create custom template
custom_template = [f"{c} = [currency_{c}]" for c in list_all_currencies()]

# Join templates as iterable object
full_template = (
constants_template.split("\n") + units_template.split("\n") + custom_template
)

# Set up UnitRegistry with abbreviated scientific format
u = pint.UnitRegistry(full_template)
u.default_format = "~P"
Flix6x marked this conversation as resolved.
Show resolved Hide resolved


PREFERRED_UNITS = [
"m",
"h",
"kg",
"m/h",
"W",
"N",
"Wh",
"m**2",
"m**3",
"V",
"A",
"dimensionless",
] # todo: move to config setting, with these as a default (NB prefixes do not matter here, this is about SI base units, so km/h is equivalent to m/h)
PREFERRED_UNITS_DICT = dict([(u[x].dimensionality, x) for x in PREFERRED_UNITS])


def to_preferred(x):
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
"""From https://github.com/hgrecco/pint/issues/676#issuecomment-689157693"""
dim = x.dimensionality
if dim in PREFERRED_UNITS_DICT:
return x.to(PREFERRED_UNITS_DICT[dim]).to_compact()
return x


def determine_unit_conversion_multiplier(
from_unit: str, to_unit: str, duration: Optional[timedelta] = None
):
"""Determine the value multiplier for a given unit conversion.
If needed, requires a duration to convert from units of stock change to units of flow.
"""
scalar = u.Quantity(from_unit).to_base_units() / u.Quantity(to_unit).to_base_units()
if scalar.dimensionality == u.Quantity("h").dimensionality:
if duration is None:
raise ValueError(
f"Cannot convert units from {from_unit} to {to_unit} without known duration."
)
return scalar.to_timedelta() / duration
return scalar.to_reduced_units().magnitude


def determine_flow_unit(stock_unit: str, time_unit: str = "h"):
"""For example:
>>> determine_flow_unit("m3") # m3/h
>>> determine_flow_unit("") # /h
>>> determine_flow_unit("kWh") # kW
"""
return (
stock_unit.rpartition(time_unit)[0]
if stock_unit.endswith(time_unit)
else f"{stock_unit}/{time_unit}"
)
flow = to_preferred(u.Quantity(stock_unit) / u.Quantity(time_unit))
return "{:~P}".format(flow.units)


def determine_stock_unit(flow_unit: str, time_unit: str = "h"):
"""For example:
>>> determine_stock_unit("m3/h") # m3
>>> determine_stock_unit("/h") #
>>> determine_stock_unit("kW") # kWh
"""
return (
flow_unit.rpartition(f"/{time_unit}")[0]
if flow_unit.endswith(f"/{time_unit}")
else f"{flow_unit}{time_unit}"
)
stock = to_preferred(u.Quantity(flow_unit) * u.Quantity(time_unit))
return "{:~P}".format(stock.units)


def units_are_convertible(
from_unit: str, to_unit: str, duration_known: bool = True
) -> bool:
"""For example, a sensor with W units allows data to be posted with units:
>>> units_are_convertible("kW", "W") # True (units just have different prefixes)
>>> units_are_convertible("J/s", "W") # True (units can be converted using some multiplier)
>>> units_are_convertible("Wh", "W") # True (units that represent a stock delta can, knowing the duration, be converted to a flow)
>>> units_are_convertible("°C", "W") # False
"""
scalar = u.Quantity(from_unit).to_base_units() / u.Quantity(to_unit).to_base_units()
if duration_known:
return scalar.dimensionality in (
u.Quantity("h").dimensionality,
u.Quantity("dimensionless").dimensionality,
)
return scalar.dimensionality == u.Quantity("dimensionless").dimensionality


def is_power_unit(unit: str) -> bool:
"""For example:
>>> is_power_unit("kW") # True
>>> is_power_unit("°C") # False
>>> is_power_unit("kWh") # False
>>> is_power_unit("EUR/kWh") # False
>>> is_power_unit("EUR/MWh") # False
"""
return unit in ("W", "kW", "MW")
return u.Quantity(unit).dimensionality == u.Quantity("W").dimensionality


def is_energy_unit(unit: str) -> bool:
"""For example:
>>> is_power_unit("kW") # False
>>> is_power_unit("°C") # False
>>> is_power_unit("kWh") # True
>>> is_power_unit("EUR/kWh") # False
>>> is_energy_unit("kW") # False
>>> is_energy_unit("°C") # False
>>> is_energy_unit("kWh") # True
>>> is_energy_unit("EUR/MWh") # False
"""
return unit in ("Wh", "kWh", "MWh")
return u.Quantity(unit).dimensionality == u.Quantity("Wh").dimensionality
2 changes: 2 additions & 0 deletions requirements/app.in
Expand Up @@ -6,6 +6,8 @@ pscript
pandas
# pandas-bokeh 0.5 requires bokeh>=2.0, but bokeh still doesn't support sharing a legend across plots
pandas-bokeh==0.4.3
pint
py-moneyed
iso8601
xlrd
inflection
Expand Down