diff --git a/documentation/changelog.rst b/documentation/changelog.rst index d558d9bb4..dffe0a07a 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -13,6 +13,7 @@ New features Infrastructure / Support ---------------------- * Updated dependencies, including Flask-Security-Too [see `PR #82 `_] +* Integration with `timely beliefs `_ lib: Sensor data as TimedBeliefs [see `PR #79 `_] diff --git a/flexmeasures/conftest.py b/flexmeasures/conftest.py index aee4f38c3..ebde5931c 100644 --- a/flexmeasures/conftest.py +++ b/flexmeasures/conftest.py @@ -23,6 +23,7 @@ from flexmeasures.data.models.assets import AssetType, Asset, Power from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.markets import Market, Price +from flexmeasures.data.models.time_series import Sensor, TimedBelief """ @@ -175,9 +176,31 @@ def setup_assets(db, setup_roles_users, setup_markets): db.session.add(p) +@pytest.fixture(scope="function") +def setup_beliefs(db: SQLAlchemy, setup_markets) -> int: + """ + :returns: the number of beliefs set up + """ + sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none() + data_source = DataSource.query.filter_by( + name="Seita", type="demo script" + ).one_or_none() + beliefs = [ + TimedBelief( + sensor=sensor, + source=data_source, + event_value=21, + event_start="2021-03-28 16:00+01", + belief_horizon=timedelta(0), + ) + ] + db.session.add_all(beliefs) + return len(beliefs) + + @pytest.fixture(scope="function", autouse=True) def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets): - """Add one day of market prices for the EPEX day-ahead market.""" + """Add two days of market prices for the EPEX day-ahead market.""" epex_da = Market.query.filter(Market.name == "epex_da").one_or_none() data_source = DataSource.query.filter_by( name="Seita", type="demo script" diff --git a/flexmeasures/data/migrations/versions/e62ac5f519d7_create_table_for_timed_beliefs.py b/flexmeasures/data/migrations/versions/e62ac5f519d7_create_table_for_timed_beliefs.py new file mode 100644 index 000000000..95fb2ddb3 --- /dev/null +++ b/flexmeasures/data/migrations/versions/e62ac5f519d7_create_table_for_timed_beliefs.py @@ -0,0 +1,53 @@ +"""create table for timed beliefs + +Revision ID: e62ac5f519d7 +Revises: a528c3c81506 +Create Date: 2021-03-28 16:26:45.025994 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "e62ac5f519d7" +down_revision = "a528c3c81506" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "timed_belief", + sa.Column( + "event_start", sa.DateTime(timezone=True), nullable=False, index=True + ), + sa.Column("belief_horizon", sa.Interval(), nullable=False), + sa.Column("cumulative_probability", sa.Float(), nullable=False, default=0.5), + sa.Column("event_value", sa.Float(), nullable=False), + sa.Column("sensor_id", sa.Integer(), nullable=False, index=True), + sa.Column("source_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["sensor_id"], + ["sensor.id"], + name=op.f("timed_belief_sensor_id_sensor_fkey"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["source_id"], + ["data_source.id"], + name=op.f("timed_belief_source_id_source_fkey"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint( + "event_start", + "belief_horizon", + "cumulative_probability", + "sensor_id", + name=op.f("timed_belief_pkey"), + ), + ) + + +def downgrade(): + op.drop_table("timed_belief") diff --git a/flexmeasures/data/models/time_series.py b/flexmeasures/data/models/time_series.py index c1f940ebf..f77d3285f 100644 --- a/flexmeasures/data/models/time_series.py +++ b/flexmeasures/data/models/time_series.py @@ -4,6 +4,7 @@ from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import Query, Session import timely_beliefs as tb +import timely_beliefs.utils as tb_utils from marshmallow import Schema, fields from flexmeasures.data.config import db @@ -21,6 +22,112 @@ class Sensor(db.Model, tb.SensorDBMixin): """A sensor measures events. """ + def __init__(self, name: str, **kwargs): + tb.SensorDBMixin.__init__(self, name, **kwargs) + tb_utils.remove_class_init_kwargs(tb.SensorDBMixin, kwargs) + db.Model.__init__(self, **kwargs) + + def search_beliefs( + self, + event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = ( + None, + None, + ), + belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = ( + None, + None, + ), + source: Optional[Union[int, List[int], str, List[str]]] = None, + ): + """Search all beliefs about events for this sensor. + + :param event_time_window: search only events within this time window + :param belief_time_window: search only beliefs within this time window + :param source: search only beliefs by this source (pass its name or id) or list of sources""" + return TimedBelief.search( + sensor=self, + event_time_window=event_time_window, + belief_time_window=belief_time_window, + source=source, + ) + + def __repr__(self) -> str: + return f"" + + +class TimedBelief(db.Model, tb.TimedBeliefDBMixin): + """A timed belief holds a precisely timed record of a belief about an event. + + It also records the source of the belief, and the sensor that the event pertains to. + """ + + @declared_attr + def source_id(cls): + return db.Column(db.Integer, db.ForeignKey("data_source.id"), primary_key=True) + + sensor = db.relationship("Sensor", backref=db.backref("beliefs", lazy=True)) + source = db.relationship("DataSource", backref=db.backref("beliefs", lazy=True)) + + def __init__( + self, + sensor: tb.DBSensor, + source: tb.DBBeliefSource, + **kwargs, + ): + tb.TimedBeliefDBMixin.__init__(self, sensor, source, **kwargs) + tb_utils.remove_class_init_kwargs(tb.TimedBeliefDBMixin, kwargs) + db.Model.__init__(self, **kwargs) + + @classmethod + def search( + cls, + sensor: Sensor, + event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = ( + None, + None, + ), + belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = ( + None, + None, + ), + source: Optional[Union[int, List[int], str, List[str]]] = None, + ) -> tb.BeliefsDataFrame: + """Search all beliefs about events for a given sensor. + + :param sensor: search only this sensor + :param event_time_window: search only events within this time window + :param belief_time_window: search only beliefs within this time window + :param source: search only beliefs by this source (pass its name or id) or list of sources + """ + return cls.search_session( + session=db.session, + sensor=sensor, + event_before=event_time_window[1], + event_not_before=event_time_window[0], + belief_before=belief_time_window[1], + belief_not_before=belief_time_window[0], + source=source, + ) + + @classmethod + def add(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True): + """Add a BeliefsDataFrame as timed beliefs in the database. + + :param bdf: the BeliefsDataFrame to be persisted + :param commit_transaction: if True, the session is committed + if False, you can still add other data to the session + and commit it all within an atomic transaction + """ + return cls.add_to_session( + session=db.session, + beliefs_data_frame=bdf, + commit_transaction=commit_transaction, + ) + + def __repr__(self) -> str: + """timely-beliefs representation of timed beliefs.""" + return tb.TimedBelief.__repr__(self) + class SensorSchemaMixin(Schema): """ diff --git a/flexmeasures/data/scripts/data_gen.py b/flexmeasures/data/scripts/data_gen.py index 9848bdc42..bbb108e37 100644 --- a/flexmeasures/data/scripts/data_gen.py +++ b/flexmeasures/data/scripts/data_gen.py @@ -18,7 +18,7 @@ from humanize import naturaldelta import inflect -from flexmeasures.data.models.time_series import Sensor +from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.models.markets import MarketType, Market, Price from flexmeasures.data.models.assets import AssetType, Asset, Power from flexmeasures.data.models.data_sources import DataSource @@ -692,5 +692,5 @@ def get_affected_classes(structure: bool = True, data: bool = False) -> List: DataSource, ] if data: - affected_classes += [Power, Price, Weather] + affected_classes += [TimedBelief, Power, Price, Weather] return affected_classes diff --git a/flexmeasures/data/tests/test_queries.py b/flexmeasures/data/tests/test_queries.py index e432cd2d2..b510809ad 100644 --- a/flexmeasures/data/tests/test_queries.py +++ b/flexmeasures/data/tests/test_queries.py @@ -7,6 +7,8 @@ import timely_beliefs as tb from flexmeasures.data.models.assets import Asset, Power +from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.time_series import Sensor, TimedBelief from flexmeasures.data.queries.utils import ( multiply_dataframe_with_deterministic_beliefs, simplify_index, @@ -215,3 +217,43 @@ def test_simplify_index(): ) df = simplify_index(bdf) assert df.event_resolution == timedelta(minutes=15) + + +def test_query_beliefs(setup_beliefs): + """Check various ways of querying for beliefs.""" + sensor = Sensor.query.filter_by(name="epex_da").one_or_none() + source = DataSource.query.filter_by(name="Seita").one_or_none() + bdfs = [ + TimedBelief.search(sensor, source=source), + sensor.search_beliefs(source=source), + tb.BeliefsDataFrame(sensor.beliefs), # doesn't allow filtering + ] + for bdf in bdfs: + assert sensor.event_resolution == timedelta( + hours=0 + ) # todo change to 1 after migrating Markets to Sensors + assert bdf.event_resolution == timedelta( + hours=0 + ) # todo change to 1 after migrating Markets to Sensors + assert len(bdf) == setup_beliefs + + +def test_persist_beliefs(setup_beliefs): + """Check whether persisting beliefs works. + + We load the already set up beliefs, and form new beliefs an hour later. + """ + sensor = Sensor.query.filter_by(name="epex_da").one_or_none() + bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor) + + # Form new beliefs + df = bdf.reset_index() + df["belief_time"] = df["belief_time"] + timedelta(hours=1) + df["event_value"] = df["event_value"] * 10 + bdf = df.set_index( + ["event_start", "belief_time", "source", "cumulative_probability"] + ) + + TimedBelief.add(bdf) + bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor) + assert len(bdf) == setup_beliefs * 2 diff --git a/requirements/app.in b/requirements/app.in index 60a3f64a5..09d7da32f 100644 --- a/requirements/app.in +++ b/requirements/app.in @@ -32,7 +32,7 @@ netCDF4 siphon tables timetomodel>=0.6.8 -timely-beliefs>=1.2.1 +timely-beliefs>=1.3.0 python-dotenv # a backport, not needed in Python3.8 importlib_metadata