Skip to content

Commit

Permalink
Use timely-beliefs tooling to create forecast model specs (#154)
Browse files Browse the repository at this point in the history
Introduce TBSeriesSpecs (subclassed from SeriesSpecs) that makes use of the collect functionality introduced earlier in FlexMeasures. We need such timely-beliefs tooling to ensure our beliefs data is properly mapped to a time series data structure that ML models can work with (i.e. feature frames). Currently, the collect functionality makes sure that the regressor data is deterministic and single-sourced.

Also introduces the possibility to set the desired resolution of forecasts, plus some minor refactoring.


* Rename make_forecasts

* Fix and rename forecast period in CLI

* Be explicit about kwargs and rename forecast period in internal functions

* Make model parameters out of forecast resolution and remodel frequency

* Use tb tooling to collect time series to resolve cases like multi-sourced beliefs

* Upsample forecasts to sensor resolution

* Improve error message

* Fix case

* Fix asset_type cases

* flake8

* PR fixes

* Remove obsolete parameter

* Changelog entry

* Use timetomodel resampling functionality

* Update tutorial

* Update requirement

* Update compiled requirement

* Update requirement for handling instantaneous sensors
  • Loading branch information
Flix6x committed Nov 1, 2021
1 parent 05b0b6c commit c4d456a
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 50 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -14,6 +14,7 @@ Bugfixes
Infrastructure / Support
----------------------
* Account-based authorization, incl. new decorators for endpoints [see `PR #210 <http://www.github.com/SeitaBV/flexmeasures/pull/210>`_]
* Improve data specification for forecasting models using timely-beliefs data [see `PR #154 <http://www.github.com/SeitaBV/flexmeasures/pull/154>`_]


v0.7.0 | October 26, 2021
Expand Down Expand Up @@ -95,7 +96,6 @@ Infrastructure / Support
* The experimental parts of the data model can now be visualised, as well, via `make show-data-model` (add the --dev option in Makefile) [also in `PR #157 <https://github.com/SeitaBV/flexmeasures/pull/157>`_]



v0.5.0 | June 7, 2021
===========================

Expand Down
4 changes: 2 additions & 2 deletions documentation/tut/forecasting_scheduling.rst
Expand Up @@ -65,9 +65,9 @@ If you host FlexMeasures yourself, we provide a CLI task for adding forecasts fo

.. code-block:: bash
flexmeasures add forecasts --from_date 2020-01-02 --to_date 2020-6-30 --horizon_hours 6 --asset-id 2
flexmeasures add forecasts --from_date 2020-01-02 --to_date 2020-6-30 --horizon_hours 6 --resolution 60 --asset-id 2
Here, forecasts are being computed for asset 2, with one horizon (6 hours).
Here, forecasts are being computed for asset 2, with one horizon (6 hours) and a resolution of 60 minutes.
This is half a year of data, so it will take a while.
You can also queue this work to workers (see above) with the additional ``--as-job`` parameter (though in general we'd advise to dispatch this work in smaller chunks).

Expand Down
40 changes: 31 additions & 9 deletions flexmeasures/cli/data_add.py
Expand Up @@ -549,6 +549,11 @@ def add_beliefs(
default="2015-12-31",
help="Forecast to date (inclusive). Follow up with a date in the form yyyy-mm-dd.",
)
@click.option(
"--resolution",
type=int,
help="Resolution of forecast in minutes. If not set, resolution is determined from the asset to be forecasted",
)
@click.option(
"--horizon",
"horizons_as_hours",
Expand All @@ -570,6 +575,7 @@ def create_forecasts(
from_date_str: str = "2015-02-08",
to_date_str: str = "2015-12-31",
horizons_as_hours: List[str] = ["1"],
resolution: Optional[int] = None,
as_job: bool = False,
):
"""
Expand All @@ -579,25 +585,35 @@ def create_forecasts(
--from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6
This will create forecast values from 0am on May 2nd to 0am on May 4th,
This will create forecast values from 0am on May 2nd to 0am on May 5th,
based on a 6 hour horizon.
"""
# make horizons
horizons = [timedelta(hours=int(h)) for h in horizons_as_hours]

# apply timezone:
# apply timezone and set forecast_end to be an inclusive version of to_date
timezone = app.config.get("FLEXMEASURES_TIMEZONE")
from_date = pd.Timestamp(from_date_str).tz_localize(timezone)
to_date = pd.Timestamp(to_date_str).tz_localize(timezone)
forecast_start = pd.Timestamp(from_date_str).tz_localize(timezone)
forecast_end = (pd.Timestamp(to_date_str) + pd.Timedelta("1D")).tz_localize(
timezone
)

event_resolution: Optional[timedelta]
if resolution is not None:
event_resolution = timedelta(minutes=resolution)
else:
event_resolution = None

if as_job:
if asset_type == "Asset":
value_type = "Power"
if asset_type == "Market":
elif asset_type == "Market":
value_type = "Price"
if asset_type == "WeatherSensor":
elif asset_type == "WeatherSensor":
value_type = "Weather"
else:
raise TypeError(f"Unknown asset_type {asset_type}")

for horizon in horizons:
# Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs
Expand All @@ -606,14 +622,20 @@ def create_forecasts(
asset_id=asset_id,
timed_value_type=value_type,
horizons=[horizon],
start_of_roll=from_date - horizon,
end_of_roll=to_date - horizon,
start_of_roll=forecast_start - horizon,
end_of_roll=forecast_end - horizon,
)
else:
from flexmeasures.data.scripts.data_gen import populate_time_series_forecasts

populate_time_series_forecasts(
db, horizons, from_date, to_date, asset_type, asset_id
db=app.db,
horizons=horizons,
forecast_start=forecast_start,
forecast_end=forecast_end,
event_resolution=event_resolution,
generic_asset_type=asset_type,
generic_asset_id=asset_id,
)


Expand Down
132 changes: 113 additions & 19 deletions flexmeasures/data/models/forecasting/model_spec_factory.py
@@ -1,9 +1,23 @@
from typing import Union, Optional, List
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from datetime import datetime, timedelta, tzinfo
from pprint import pformat
import logging
import pytz

from flask import current_app
from timetomodel import DBSeriesSpecs, ModelSpecs
from timetomodel.transforming import BoxCoxTransformation, Transformation
from flexmeasures.data.queries.utils import (
simplify_index,
)
from timely_beliefs import BeliefsDataFrame
from timetomodel import ModelSpecs
from timetomodel.exceptions import MissingData, NaNData
from timetomodel.speccing import SeriesSpecs
from timetomodel.transforming import (
BoxCoxTransformation,
ReversibleTransformation,
Transformation,
)
import pandas as pd

from flexmeasures.data.models.assets import AssetType, Asset
from flexmeasures.data.models.markets import MarketType, Market
Expand All @@ -18,7 +32,6 @@
get_query_window,
)
from flexmeasures.data.services.resources import find_closest_weather_sensor
from flexmeasures.data.config import db

"""
Here we generate an initial version of timetomodel specs, given what asset and what timing
Expand All @@ -27,6 +40,82 @@
"""


logger = logging.getLogger(__name__)


class TBSeriesSpecs(SeriesSpecs):
"""Compatibility for using timetomodel.SeriesSpecs with timely_beliefs.BeliefsDataFrames.
This implements _load_series such that TimedValue.collect is called on the generic asset class,
with the parameters in collect_params.
The collect function is expected to return a BeliefsDataFrame.
"""

generic_asset_value_class: Any # with collect method
collect_params: dict

def __init__(
self,
generic_asset_value_class,
collect_params: dict,
name: str,
original_tz: Optional[tzinfo] = pytz.utc, # postgres stores naive datetimes
feature_transformation: Optional[ReversibleTransformation] = None,
post_load_processing: Optional[Transformation] = None,
resampling_config: Dict[str, Any] = None,
interpolation_config: Dict[str, Any] = None,
):
super().__init__(
name,
original_tz,
feature_transformation,
post_load_processing,
resampling_config,
interpolation_config,
)
self.generic_asset_value_class = generic_asset_value_class
self.collect_params = collect_params

def _load_series(self) -> pd.Series:
logger.info(
"Reading %s data from database" % self.generic_asset_value_class.__name__
)

bdf: BeliefsDataFrame = self.generic_asset_value_class.collect(
**self.collect_params
)
assert isinstance(bdf, BeliefsDataFrame)
df = simplify_index(bdf)
self.check_data(df)

if self.post_load_processing is not None:
df = self.post_load_processing.transform_dataframe(df)

return df["event_value"]

def check_data(self, df: pd.DataFrame):
"""Raise error if data is empty or contains nan values.
Here, other than in load_series, we can show the query, which is quite helpful."""
if df.empty:
raise MissingData(
"No values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)
if df.isnull().values.any():
raise NaNData(
"Nan values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)


def create_initial_model_specs( # noqa: C901
generic_asset: Union[Asset, Market, WeatherSensor],
forecast_start: datetime, # Start of forecast period
Expand Down Expand Up @@ -90,29 +179,35 @@ def create_initial_model_specs( # noqa: C901
if ex_post_horizon is None:
ex_post_horizon = timedelta(hours=0)

outcome_var_spec = DBSeriesSpecs(
outcome_var_spec = TBSeriesSpecs(
name=generic_asset_type.name,
db_engine=db.engine,
query=generic_asset_value_class.make_query(
asset_names=[generic_asset.name],
generic_asset_value_class=generic_asset_value_class,
collect_params=dict(
generic_asset_names=[generic_asset.name],
query_window=query_window,
belief_horizon_window=(None, ex_post_horizon),
session=db.session,
),
feature_transformation=params.get("outcome_var_transformation", None),
interpolation_config={"method": "time"},
)
# Set defaults if needed
if params.get("event_resolution", None) is None:
params["event_resolution"] = generic_asset.event_resolution
if params.get("remodel_frequency", None) is None:
params["remodel_frequency"] = timedelta(days=7)
specs = ModelSpecs(
outcome_var=outcome_var_spec,
model=None, # at least this will need to be configured still to make these specs usable!
frequency=generic_asset.event_resolution,
frequency=params[
"event_resolution"
], # todo: timetomodel doesn't distinguish frequency and resolution yet
horizon=forecast_horizon,
lags=[int(lag / generic_asset.event_resolution) for lag in lags],
lags=[int(lag / params["event_resolution"]) for lag in lags],
regressors=regressor_specs,
start_of_training=training_start,
end_of_testing=testing_end,
ratio_training_testing_data=params["ratio_training_testing_data"],
remodel_frequency=timedelta(days=7),
remodel_frequency=params["remodel_frequency"],
)

return specs
Expand Down Expand Up @@ -175,7 +270,7 @@ def configure_regressors_for_nearest_weather_sensor(
horizon,
regressor_transformation, # the regressor transformation can be passed in
transform_to_normal, # if not, it a normalization can be applied
) -> List[DBSeriesSpecs]:
) -> List[TBSeriesSpecs]:
"""For Assets, we use weather data as regressors. Here, we configure them."""
regressor_specs = []
if isinstance(generic_asset, Asset):
Expand Down Expand Up @@ -208,14 +303,13 @@ def configure_regressors_for_nearest_weather_sensor(
)
)
regressor_specs.append(
DBSeriesSpecs(
TBSeriesSpecs(
name=regressor_specs_name,
db_engine=db.engine,
query=Weather.make_query(
asset_names=[closest_sensor.name],
generic_asset_value_class=Weather,
collect_params=dict(
generic_asset_names=[closest_sensor.name],
query_window=query_window,
belief_horizon_window=(horizon, None),
session=db.session,
),
feature_transformation=regressor_transformation,
interpolation_config={"method": "time"},
Expand Down
28 changes: 19 additions & 9 deletions flexmeasures/data/scripts/data_gen.py
Expand Up @@ -6,6 +6,7 @@
from shutil import rmtree
from datetime import datetime, timedelta

import pandas as pd
from flask import current_app as app
from flask_sqlalchemy import SQLAlchemy
import click
Expand Down Expand Up @@ -200,8 +201,9 @@ def populate_structure(db: SQLAlchemy):
def populate_time_series_forecasts( # noqa: C901
db: SQLAlchemy,
horizons: List[timedelta],
start: datetime,
end: datetime,
forecast_start: datetime,
forecast_end: datetime,
event_resolution: Optional[timedelta] = None,
generic_asset_type: Optional[str] = None,
generic_asset_id: Optional[int] = None,
):
Expand Down Expand Up @@ -262,11 +264,12 @@ def populate_time_series_forecasts( # noqa: C901
default_model = lookup_model_specs_configurator()
model_specs, model_identifier, model_fallback = default_model(
generic_asset=generic_asset,
forecast_start=start,
forecast_end=end,
forecast_start=forecast_start,
forecast_end=forecast_end,
forecast_horizon=horizon,
custom_model_params=dict(
training_and_testing_period=training_and_testing_period
training_and_testing_period=training_and_testing_period,
event_resolution=event_resolution,
),
)
click.echo(
Expand All @@ -275,16 +278,23 @@ def populate_time_series_forecasts( # noqa: C901
% (
naturaldelta(horizon),
generic_asset.name,
start,
end,
forecast_start,
forecast_end,
naturaldelta(training_and_testing_period),
model_identifier,
)
)
model_specs.creation_time = start
model_specs.creation_time = forecast_start
forecasts, model_state = make_rolling_forecasts(
start=start, end=end, model_specs=model_specs
start=forecast_start, end=forecast_end, model_specs=model_specs
)
# Upsample to sensor resolution if needed
if forecasts.index.freq > pd.Timedelta(generic_asset.event_resolution):
forecasts = model_specs.outcome_var.resample_data(
forecasts,
time_window=(forecasts.index.min(), forecasts.index.max()),
expected_frequency=generic_asset.event_resolution,
)
except (NotEnoughDataException, MissingData, NaNData) as e:
click.echo(
"Skipping forecasts for asset %s: %s" % (generic_asset, str(e))
Expand Down

0 comments on commit c4d456a

Please sign in to comment.