Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed beliefs #79

Merged
merged 12 commits into from Apr 2, 2021
25 changes: 24 additions & 1 deletion flexmeasures/conftest.py
Expand Up @@ -23,6 +23,7 @@
from flexmeasures.data.models.assets import AssetType, Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.time_series import Sensor, TimedBelief


"""
Expand Down Expand Up @@ -175,9 +176,31 @@ def setup_assets(db, setup_roles_users, setup_markets):
db.session.add(p)


@pytest.fixture(scope="function")
def setup_beliefs(db: SQLAlchemy, setup_markets) -> int:
"""
:returns: the number of beliefs set up
"""
sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
).one_or_none()
beliefs = [
TimedBelief(
sensor=sensor,
source=data_source,
event_value=21,
event_start="2021-03-28 16:00+01",
belief_horizon=timedelta(0),
)
]
db.session.add_all(beliefs)
return len(beliefs)


@pytest.fixture(scope="function", autouse=True)
def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets):
"""Add one day of market prices for the EPEX day-ahead market."""
"""Add two days of market prices for the EPEX day-ahead market."""
epex_da = Market.query.filter(Market.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
Expand Down
@@ -0,0 +1,53 @@
"""create table for timed beliefs

Revision ID: e62ac5f519d7
Revises: a528c3c81506
Create Date: 2021-03-28 16:26:45.025994

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e62ac5f519d7"
down_revision = "a528c3c81506"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"timed_belief",
sa.Column(
"event_start", sa.DateTime(timezone=True), nullable=False, index=True
),
sa.Column("belief_horizon", sa.Interval(), nullable=False),
sa.Column("cumulative_probability", sa.Float(), nullable=False, default=0.5),
sa.Column("event_value", sa.Float(), nullable=False),
sa.Column("sensor_id", sa.Integer(), nullable=False, index=True),
sa.Column("source_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["sensor_id"],
["sensor.id"],
name=op.f("timed_belief_sensor_id_sensor_fkey"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_id"],
["data_source.id"],
name=op.f("timed_belief_source_id_source_fkey"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint(
"event_start",
"belief_horizon",
"cumulative_probability",
"sensor_id",
name=op.f("timed_belief_pkey"),
),
)


def downgrade():
op.drop_table("timed_belief")
107 changes: 107 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import Query, Session
import timely_beliefs as tb
import timely_beliefs.utils as tb_utils

from flexmeasures.data.config import db
from flexmeasures.data.queries.utils import (
Expand All @@ -19,6 +20,112 @@
class Sensor(db.Model, tb.SensorDBMixin):
"""A sensor measures events. """

def __init__(self, name: str, **kwargs):
tb.SensorDBMixin.__init__(self, name, **kwargs)
tb_utils.remove_class_init_kwargs(tb.SensorDBMixin, kwargs)
db.Model.__init__(self, **kwargs)

def search_beliefs(
self,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
):
"""Search all beliefs about events for this sensor.

:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources"""
return TimedBelief.search(
sensor=self,
event_time_window=event_time_window,
belief_time_window=belief_time_window,
source=source,
)

def __repr__(self) -> str:
return f"<Sensor {self.id}: {self.name}>"


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
"""A timed belief holds a precisely timed record of a belief about an event.

It also records the source of the belief, and the sensor that the event pertains to.
"""

@declared_attr
def source_id(cls):
return db.Column(db.Integer, db.ForeignKey("data_source.id"), primary_key=True)

sensor = db.relationship("Sensor", backref=db.backref("beliefs", lazy=True))
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
source = db.relationship("DataSource", backref=db.backref("beliefs", lazy=True))

def __init__(
self,
sensor: tb.DBSensor,
source: tb.DBBeliefSource,
**kwargs,
):
tb.TimedBeliefDBMixin.__init__(self, sensor, source, **kwargs)
tb_utils.remove_class_init_kwargs(tb.TimedBeliefDBMixin, kwargs)
db.Model.__init__(self, **kwargs)

@classmethod
def search(
cls,
sensor: Sensor,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
) -> tb.BeliefsDataFrame:
"""Search all beliefs about events for a given sensor.

:param sensor: search only this sensor
:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources
"""
return cls.search_session(
session=db.session,
sensor=sensor,
event_before=event_time_window[1],
event_not_before=event_time_window[0],
belief_before=belief_time_window[1],
belief_not_before=belief_time_window[0],
source=source,
)

@classmethod
def add(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True):
"""Add a BeliefsDataFrame as timed beliefs in the database.

:param bdf: the BeliefsDataFrame to be persisted
:param commit_transaction: if True, the session is committed
if False, you can still add other data to the session
and commit it all within an atomic transaction
"""
return cls.add_to_session(
session=db.session,
beliefs_data_frame=bdf,
commit_transaction=commit_transaction,
)

def __repr__(self) -> str:
"""timely-beliefs representation of timed beliefs."""
return tb.TimedBelief.__repr__(self)


class TimedValue(object):
"""
Expand Down
3 changes: 2 additions & 1 deletion flexmeasures/data/scripts/data_gen.py
Expand Up @@ -17,6 +17,7 @@
import inflect

from flexmeasures.data.models.markets import MarketType, Market, Price
from flexmeasures.data.models.time_series import TimedBelief
from flexmeasures.data.models.assets import AssetType, Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.weather import WeatherSensorType, WeatherSensor, Weather
Expand Down Expand Up @@ -625,5 +626,5 @@ def get_affected_classes(structure: bool = True, data: bool = False):
DataSource,
]
if data:
affected_classes += [Power, Price, Weather]
affected_classes += [TimedBelief, Power, Price, Weather]
return affected_classes
42 changes: 42 additions & 0 deletions flexmeasures/data/tests/test_queries.py
Expand Up @@ -7,6 +7,8 @@
import timely_beliefs as tb

from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.queries.utils import (
multiply_dataframe_with_deterministic_beliefs,
simplify_index,
Expand Down Expand Up @@ -215,3 +217,43 @@ def test_simplify_index():
)
df = simplify_index(bdf)
assert df.event_resolution == timedelta(minutes=15)


def test_query_beliefs(setup_beliefs):
"""Check various ways of querying for beliefs."""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
source = DataSource.query.filter_by(name="Seita").one_or_none()
bdfs = [
TimedBelief.search(sensor, source=source),
sensor.search_beliefs(source=source),
tb.BeliefsDataFrame(sensor.beliefs), # doesn't allow filtering
]
for bdf in bdfs:
assert sensor.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert bdf.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert len(bdf) == setup_beliefs


def test_persist_beliefs(setup_beliefs):
"""Check whether persisting beliefs works.

We load the already set up beliefs, and form new beliefs an hour later.
"""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor)

# Form new beliefs
df = bdf.reset_index()
df["belief_time"] = df["belief_time"] + timedelta(hours=1)
df["event_value"] = df["event_value"] * 10
bdf = df.set_index(
["event_start", "belief_time", "source", "cumulative_probability"]
)

TimedBelief.add(bdf)
bdf: tb.BeliefsDataFrame = TimedBelief.search(sensor)
assert len(bdf) == setup_beliefs * 2
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.2.1
timely-beliefs==1.3.0.dev2664
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down