Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed beliefs #79

Merged
merged 12 commits into from Apr 2, 2021
25 changes: 24 additions & 1 deletion flexmeasures/conftest.py
Expand Up @@ -23,6 +23,7 @@
from flexmeasures.data.models.assets import AssetType, Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.time_series import Sensor, TimedBelief


"""
Expand Down Expand Up @@ -175,9 +176,31 @@ def setup_assets(db, setup_roles_users, setup_markets):
db.session.add(p)


@pytest.fixture(scope="function")
def setup_beliefs(db: SQLAlchemy, setup_markets) -> int:
"""
:returns: the number of beliefs set up
"""
sensor = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
).one_or_none()
beliefs = [
TimedBelief(
sensor=sensor,
source_id=data_source.id,
event_value=21,
event_start="2021-03-28 16:00",
belief_horizon=timedelta(0),
)
]
db.session.add_all(beliefs)
return len(beliefs)


@pytest.fixture(scope="function", autouse=True)
def add_market_prices(db: SQLAlchemy, setup_assets, setup_markets):
"""Add one day of market prices for the EPEX day-ahead market."""
"""Add two days of market prices for the EPEX day-ahead market."""
epex_da = Market.query.filter(Market.name == "epex_da").one_or_none()
data_source = DataSource.query.filter_by(
name="Seita", type="demo script"
Expand Down
@@ -0,0 +1,53 @@
"""create table for timed beliefs

Revision ID: e62ac5f519d7
Revises: a528c3c81506
Create Date: 2021-03-28 16:26:45.025994

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e62ac5f519d7"
down_revision = "a528c3c81506"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"timed_belief",
sa.Column(
"event_start", sa.DateTime(timezone=True), nullable=False, index=True
),
sa.Column("belief_horizon", sa.Interval(), nullable=False),
sa.Column("cumulative_probability", sa.Float(), nullable=False, default=0.5),
sa.Column("event_value", sa.Float(), nullable=False),
sa.Column("sensor_id", sa.Integer(), nullable=False, index=True),
sa.Column("source_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["sensor_id"],
["sensor.id"],
name=op.f("timed_belief_sensor_id_sensor_fkey"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["source_id"],
["data_source.id"],
name=op.f("timed_belief_source_id_source_fkey"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint(
"event_start",
"belief_horizon",
"cumulative_probability",
"sensor_id",
name=op.f("timed_belief_pkey"),
),
)


def downgrade():
op.drop_table("timed_belief")
88 changes: 88 additions & 0 deletions flexmeasures/data/models/time_series.py
Expand Up @@ -6,6 +6,7 @@
import timely_beliefs as tb

from flexmeasures.data.config import db
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.queries.utils import (
add_belief_timing_filter,
add_user_source_filter,
Expand All @@ -19,6 +20,93 @@
class Sensor(db.Model, tb.SensorDBMixin):
"""A sensor measures events. """

def search_beliefs(
self,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
):
"""Search all beliefs about events for this sensor.

:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources"""
return TimedBelief.search_all(
sensor=self,
event_time_window=event_time_window,
belief_time_window=belief_time_window,
source=source,
)

def __repr__(self) -> str:
return f"<Sensor {self.id}: {self.name}>"


class TimedBelief(db.Model, tb.TimedBeliefDBMixin):
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
"""A timed belief holds a precisely timed record of a belief about an event."""

@declared_attr
def source_id(cls):
return db.Column(db.Integer, db.ForeignKey("data_source.id"), primary_key=True)

sensor = db.relationship("Sensor", backref=db.backref("beliefs", lazy=True))
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
source = db.relationship("DataSource", backref=db.backref("beliefs", lazy=True))

@classmethod
def search_all(
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
cls,
sensor: Sensor,
event_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
belief_time_window: Tuple[Optional[datetime_type], Optional[datetime_type]] = (
None,
None,
),
source: Optional[Union[int, List[int], str, List[str]]] = None,
) -> tb.BeliefsDataFrame:
"""Search all beliefs about events for a given sensor.

:param sensor: search only this sensor
:param event_time_window: search only events within this time window
:param belief_time_window: search only beliefs within this time window
:param source: search only beliefs by this source (pass its name or id) or list of sources
"""
return cls.query_all(
session=db.session,
sensor=sensor,
event_before=event_time_window[1],
event_not_before=event_time_window[0],
belief_before=belief_time_window[1],
belief_not_before=belief_time_window[0],
source=source,
source_cls=DataSource,
)

@classmethod
def persist_all(cls, bdf: tb.BeliefsDataFrame, commit_transaction: bool = True):
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
"""Persist a BeliefsDataFrame as timed beliefs in the database.

:param bdf: the BeliefsDataFrame to be persisted
:param commit_transaction: set to False if you're interested in persisting other data as well within one atomic transaction
"""
return cls.add_all(
session=db.session,
beliefs_data_frame=bdf,
commit_transaction=commit_transaction,
)

def __repr__(self) -> str:
"""timely-beliefs representation of timed beliefs."""
return tb.TimedBelief.__repr__(self)


class TimedValue(object):
"""
Expand Down
42 changes: 42 additions & 0 deletions flexmeasures/data/tests/test_queries.py
Expand Up @@ -7,6 +7,8 @@
import timely_beliefs as tb

from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.queries.utils import (
multiply_dataframe_with_deterministic_beliefs,
simplify_index,
Expand Down Expand Up @@ -215,3 +217,43 @@ def test_simplify_index():
)
df = simplify_index(bdf)
assert df.event_resolution == timedelta(minutes=15)


def test_query_beliefs(setup_beliefs):
"""Check various ways of querying for beliefs."""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
source = DataSource.query.filter_by(name="Seita").one_or_none()
bdfs = [
TimedBelief.search_all(sensor, source=source.id),
sensor.search_beliefs(source=source.id),
tb.BeliefsDataFrame(sensor.beliefs), # doesn't allow filtering
]
for bdf in bdfs:
assert sensor.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert bdf.event_resolution == timedelta(
hours=0
) # todo change to 1 after migrating Markets to Sensors
assert len(bdf) == setup_beliefs


def test_persist_beliefs(setup_beliefs):
"""Check whether persisting beliefs works.

We load the already set up beliefs, and form new beliefs an hour later.
"""
sensor = Sensor.query.filter_by(name="epex_da").one_or_none()
bdf: tb.BeliefsDataFrame = TimedBelief.search_all(sensor)

# Form new beliefs
df = bdf.reset_index()
df["belief_time"] = df["belief_time"] + timedelta(hours=1)
df["event_value"] = df["event_value"] * 10
bdf = df.set_index(
["event_start", "belief_time", "source", "cumulative_probability"]
)

TimedBelief.persist_all(bdf)
bdf: tb.BeliefsDataFrame = TimedBelief.search_all(sensor)
assert len(bdf) == setup_beliefs * 2
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.2.1
timely-beliefs>=1.3.0.dev8856
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down