Skip to content

Commit

Permalink
Timed beliefs (#79)
Browse files Browse the repository at this point in the history
New TimedBelief table and functionality to turn its entries into a BeliefsDataFrame and vice versa.


* Fix docstring

* Added TimedBelief table with i/o functionality

* Move some functionality to timely-beliefs

* Rename class methods

* Source db class now derived from instance

* Fix mixin usage by initializing superclasses in a specific order with specific args and kwargs

* Set dependency dev version

* Register TimedBelief as a data table for db-obs

* Set minimum tb dependency to released version

* Changelog entry

Co-authored-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x and Flix6x committed Apr 2, 2021
1 parent 3f9abbf commit a60ed29
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 4 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -13,6 +13,7 @@ New features
Infrastructure / Support
----------------------
* Updated dependencies, including Flask-Security-Too [see `PR #82 <http://www.github.com/SeitaBV/flexmeasures/pull/82>`_]
* Integration with `timely beliefs <https://github.com/SeitaBV/timely-beliefs>`_ lib: Sensor data as TimedBeliefs [see `PR #79 <http://www.github.com/SeitaBV/flexmeasures/pull/79>`_]



Expand Down
25 changes: 24 additions & 1 deletion flexmeasures/conftest.py
Expand Up @@ -23,6 +23,7 @@
from flexmeasures.data.models.assets import AssetType, Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.time_series import Sensor, TimedBelief


"""
Expand Down Expand Up @@ -175,9 +176,31 @@ def setup_assets(db, setup_roles_users, setup_markets):
db.session.add(p)


@pytest.fixture(scope="function")
def setup_beliefs(db: SQLAlchemy, setup_markets) -> int:
"""
:returns: the number of beliefs set up
"""
sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
).one_or_none()
beliefs = [
TimedBelief(
sensor=sensor,
source=data_source,
event_value=21,
event_start="2021-03-28 16:00+01",
belief_horizon=timedelta(0),
)
]
db.session.add_all(beliefs)
return len(beliefs)


@pytest.fixture(scope="function", autouse=True)
def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets):
"""Add one day of market prices for the EPEX day-ahead market."""
"""Add two days of market prices for the EPEX day-ahead market."""
epex_da = Market.query.filter(Market.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
Expand Down
@@ -0,0 +1,53 @@
"""create table for timed beliefs
Revision ID: e62ac5f519d7
Revises: a528c3c81506
Create Date: 2021-03-28 16:26:45.025994
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e62ac5f519d7"
down_revision = "a528c3c81506"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"timed_belief",
sa.Column(
"event_start", sa.DateTime(timezone=True), nullable=False, index=True
),
sa.Column("belief_horizon", sa.Interval(), nullable=False),
sa.Column("cumulative_probability", sa.Float(), nullable=False, default=0.5),
sa.Column("event_value", sa.Float(), nullable=False),
sa.Column("sensor_id", sa.Integer(), nullable=False, index=True),
sa.Column("source_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["sensor_id"],
["sensor.id"],
name=op.f("timed_belief_sensor_id_sensor_fkey"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_id"],
["data_source.id"],
name=op.f("timed_belief_source_id_source_fkey"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint(
"event_start",
"belief_horizon",
"cumulative_probability",
"sensor_id",
name=op.f("timed_belief_pkey"),
),
)


def downgrade():
op.drop_table("timed_belief")
107 changes: 107 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import Query, Session
import timely_beliefs as tb
import timely_beliefs.utils as tb_utils
from marshmallow import Schema, fields

from flexmeasures.data.config import db
Expand All @@ -21,6 +22,112 @@
class Sensor(db.Model, tb.SensorDBMixin):
"""A sensor measures events. """

def __init__(self, name: str, **kwargs):
tb.SensorDBMixin.__init__(self, name, **kwargs)
tb_utils.remove_class_init_kwargs(tb.SensorDBMixin, kwargs)
db.Model.__init__(self, **kwargs)

def search_beliefs(
self,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
):
"""Search all beliefs about events for this sensor.
:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources"""
return TimedBelief.search(
sensor=self,
event_time_window=event_time_window,
belief_time_window=belief_time_window,
source=source,
)

def __repr__(self) -> str:
return f"<Sensor {self.id}: {self.name}>"


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
"""A timed belief holds a precisely timed record of a belief about an event.
It also records the source of the belief, and the sensor that the event pertains to.
"""

@declared_attr
def source_id(cls):
return db.Column(db.Integer, db.ForeignKey("data_source.id"), primary_key=True)

sensor = db.relationship("Sensor", backref=db.backref("beliefs", lazy=True))
source = db.relationship("DataSource", backref=db.backref("beliefs", lazy=True))

def __init__(
self,
sensor: tb.DBSensor,
source: tb.DBBeliefSource,
**kwargs,
):
tb.TimedBeliefDBMixin.__init__(self, sensor, source, **kwargs)
tb_utils.remove_class_init_kwargs(tb.TimedBeliefDBMixin, kwargs)
db.Model.__init__(self, **kwargs)

@classmethod
def search(
cls,
sensor: Sensor,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
) -> tb.BeliefsDataFrame:
"""Search all beliefs about events for a given sensor.
:param sensor: search only this sensor
:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources
"""
return cls.search_session(
session=db.session,
sensor=sensor,
event_before=event_time_window[1],
event_not_before=event_time_window[0],
belief_before=belief_time_window[1],
belief_not_before=belief_time_window[0],
source=source,
)

@classmethod
def add(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True):
"""Add a BeliefsDataFrame as timed beliefs in the database.
:param bdf: the BeliefsDataFrame to be persisted
:param commit_transaction: if True, the session is committed
if False, you can still add other data to the session
and commit it all within an atomic transaction
"""
return cls.add_to_session(
session=db.session,
beliefs_data_frame=bdf,
commit_transaction=commit_transaction,
)

def __repr__(self) -> str:
"""timely-beliefs representation of timed beliefs."""
return tb.TimedBelief.__repr__(self)


class SensorSchemaMixin(Schema):
"""
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/data/scripts/data_gen.py
Expand Up @@ -18,7 +18,7 @@
from humanize import naturaldelta
import inflect

from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.markets import MarketType, Market, Price
from flexmeasures.data.models.assets import AssetType, Asset, Power
from flexmeasures.data.models.data_sources import DataSource
Expand Down Expand Up @@ -692,5 +692,5 @@ def get_affected_classes(structure: bool = True, data: bool = False) -> List:
DataSource,
]
if data:
affected_classes += [Power, Price, Weather]
affected_classes += [TimedBelief, Power, Price, Weather]
return affected_classes
42 changes: 42 additions & 0 deletions flexmeasures/data/tests/test_queries.py
Expand Up @@ -7,6 +7,8 @@
import timely_beliefs as tb

from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.queries.utils import (
multiply_dataframe_with_deterministic_beliefs,
simplify_index,
Expand Down Expand Up @@ -215,3 +217,43 @@ def test_simplify_index():
)
df = simplify_index(bdf)
assert df.event_resolution == timedelta(minutes=15)


def test_query_beliefs(setup_beliefs):
"""Check various ways of querying for beliefs."""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
source = DataSource.query.filter_by(name="Seita").one_or_none()
bdfs = [
TimedBelief.search(sensor, source=source),
sensor.search_beliefs(source=source),
tb.BeliefsDataFrame(sensor.beliefs), # doesn't allow filtering
]
for bdf in bdfs:
assert sensor.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert bdf.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert len(bdf) == setup_beliefs


def test_persist_beliefs(setup_beliefs):
"""Check whether persisting beliefs works.
We load the already set up beliefs, and form new beliefs an hour later.
"""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor)

# Form new beliefs
df = bdf.reset_index()
df["belief_time"] = df["belief_time"] + timedelta(hours=1)
df["event_value"] = df["event_value"] * 10
bdf = df.set_index(
["event_start", "belief_time", "source", "cumulative_probability"]
)

TimedBelief.add(bdf)
bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor)
assert len(bdf) == setup_beliefs * 2
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.2.1
timely-beliefs>=1.3.0
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down

0 comments on commit a60ed29

Please sign in to comment.