Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timely-beliefs tooling to create forecast model specs #154

Merged
merged 20 commits into from Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Expand Up @@ -19,6 +19,7 @@ Infrastructure / Support
* Add possibility to send errors to Sentry [see `PR #143 <http://www.github.com/SeitaBV/flexmeasures/pull/143>`_]
* Add CLI task to monitor if tasks ran successfully and recently enough [see `PR #146 <http://www.github.com/SeitaBV/flexmeasures/pull/146>`_]
* Document how to use a custom favicon in plugins [see `PR #152 <http://www.github.com/SeitaBV/flexmeasures/pull/152>`_]
* Improve data specification for forecasting models using timely-beliefs data [see `PR #154 <http://www.github.com/SeitaBV/flexmeasures/pull/154>`_]


v0.5.0 | June 7, 2021
Expand Down
4 changes: 2 additions & 2 deletions documentation/tut/forecasting_scheduling.rst
Expand Up @@ -65,9 +65,9 @@ If you host FlexMeasures yourself, we provide a CLI task for adding forecasts fo

.. code-block:: bash

flexmeasures add forecasts --from_date 2020-01-02 --to_date 2020-6-30 --horizon_hours 6 --asset-id 2
flexmeasures add forecasts --from_date 2020-01-02 --to_date 2020-6-30 --horizon_hours 6 --resolution 60 --asset-id 2

Here, forecasts are being computed for asset 2, with one horizon (6 hours).
Here, forecasts are being computed for asset 2, with one horizon (6 hours) and a resolution of 60 minutes.
This is half a year of data, so it will take a while.
You can also queue this work to workers (see above) with the additional ``--as-job`` parameter (though in general we'd advise to dispatch this work in smaller chunks).

Expand Down
132 changes: 113 additions & 19 deletions flexmeasures/data/models/forecasting/model_spec_factory.py
@@ -1,9 +1,23 @@
from typing import Union, Optional, List
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from datetime import datetime, timedelta, tzinfo
from pprint import pformat
import logging
import pytz

from flask import current_app
from timetomodel import DBSeriesSpecs, ModelSpecs
from timetomodel.transforming import BoxCoxTransformation, Transformation
from flexmeasures.data.queries.utils import (
simplify_index,
)
from timely_beliefs import BeliefsDataFrame
from timetomodel import ModelSpecs
from timetomodel.exceptions import MissingData, NaNData
from timetomodel.speccing import SeriesSpecs
from timetomodel.transforming import (
BoxCoxTransformation,
ReversibleTransformation,
Transformation,
)
import pandas as pd

from flexmeasures.data.models.assets import AssetType, Asset
from flexmeasures.data.models.markets import MarketType, Market
Expand All @@ -18,7 +32,6 @@
get_query_window,
)
from flexmeasures.data.services.resources import find_closest_weather_sensor
from flexmeasures.data.config import db

"""
Here we generate an initial version of timetomodel specs, given what asset and what timing
Expand All @@ -27,6 +40,82 @@
"""


logger = logging.getLogger(__name__)


class TBSeriesSpecs(SeriesSpecs):
"""Compatibility for using timetomodel.SeriesSpecs with timely_beliefs.BeliefsDataFrames.

This implements _load_series such that TimedValue.collect is called on the generic asset class,
with the parameters in collect_params.
The collect function is expected to return a BeliefsDataFrame.
"""

generic_asset_value_class: Any # with collect method
collect_params: dict

def __init__(
self,
generic_asset_value_class,
collect_params: dict,
name: str,
original_tz: Optional[tzinfo] = pytz.utc, # postgres stores naive datetimes
feature_transformation: Optional[ReversibleTransformation] = None,
post_load_processing: Optional[Transformation] = None,
resampling_config: Dict[str, Any] = None,
interpolation_config: Dict[str, Any] = None,
):
super().__init__(
name,
original_tz,
feature_transformation,
post_load_processing,
resampling_config,
interpolation_config,
)
self.generic_asset_value_class = generic_asset_value_class
self.collect_params = collect_params

def _load_series(self) -> pd.Series:
logger.info(
"Reading %s data from database" % self.generic_asset_value_class.__name__
)

bdf: BeliefsDataFrame = self.generic_asset_value_class.collect(
**self.collect_params
)
assert isinstance(bdf, BeliefsDataFrame)
df = simplify_index(bdf)
self.check_data(df)

if self.post_load_processing is not None:
df = self.post_load_processing.transform_dataframe(df)

return df["event_value"]

def check_data(self, df: pd.DataFrame):
"""Raise error if data is empty or contains nan values.
Here, other than in load_series, we can show the query, which is quite helpful."""
if df.empty:
raise MissingData(
"No values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)
if df.isnull().values.any():
raise NaNData(
"Nan values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)


def create_initial_model_specs( # noqa: C901
generic_asset: Union[Asset, Market, WeatherSensor],
forecast_start: datetime, # Start of forecast period
Expand Down Expand Up @@ -90,29 +179,35 @@ def create_initial_model_specs( # noqa: C901
if ex_post_horizon is None:
ex_post_horizon = timedelta(hours=0)

outcome_var_spec = DBSeriesSpecs(
outcome_var_spec = TBSeriesSpecs(
name=generic_asset_type.name,
db_engine=db.engine,
query=generic_asset_value_class.make_query(
asset_names=[generic_asset.name],
generic_asset_value_class=generic_asset_value_class,
collect_params=dict(
generic_asset_names=[generic_asset.name],
query_window=query_window,
belief_horizon_window=(None, ex_post_horizon),
session=db.session,
),
feature_transformation=params.get("outcome_var_transformation", None),
interpolation_config={"method": "time"},
)
# Set defaults if needed
if params.get("event_resolution", None) is None:
params["event_resolution"] = generic_asset.event_resolution
if params.get("remodel_frequency", None) is None:
params["remodel_frequency"] = timedelta(days=7)
specs = ModelSpecs(
outcome_var=outcome_var_spec,
model=None, # at least this will need to be configured still to make these specs usable!
frequency=generic_asset.event_resolution,
frequency=params[
"event_resolution"
], # todo: timetomodel doesn't distinguish frequency and resolution yet
horizon=forecast_horizon,
lags=[int(lag / generic_asset.event_resolution) for lag in lags],
lags=[int(lag / params["event_resolution"]) for lag in lags],
regressors=regressor_specs,
start_of_training=training_start,
end_of_testing=testing_end,
ratio_training_testing_data=params["ratio_training_testing_data"],
remodel_frequency=timedelta(days=7),
remodel_frequency=params["remodel_frequency"],
)

return specs
Expand Down Expand Up @@ -175,7 +270,7 @@ def configure_regressors_for_nearest_weather_sensor(
horizon,
regressor_transformation, # the regressor transformation can be passed in
transform_to_normal, # if not, it a normalization can be applied
) -> List[DBSeriesSpecs]:
) -> List[TBSeriesSpecs]:
"""For Assets, we use weather data as regressors. Here, we configure them."""
regressor_specs = []
if isinstance(generic_asset, Asset):
Expand Down Expand Up @@ -208,14 +303,13 @@ def configure_regressors_for_nearest_weather_sensor(
)
)
regressor_specs.append(
DBSeriesSpecs(
TBSeriesSpecs(
name=regressor_specs_name,
db_engine=db.engine,
query=Weather.make_query(
asset_names=[closest_sensor.name],
generic_asset_value_class=Weather,
collect_params=dict(
generic_asset_names=[closest_sensor.name],
query_window=query_window,
belief_horizon_window=(horizon, None),
session=db.session,
),
feature_transformation=regressor_transformation,
interpolation_config={"method": "time"},
Expand Down
37 changes: 28 additions & 9 deletions flexmeasures/data/scripts/cli_tasks/data_add.py
Expand Up @@ -424,6 +424,11 @@ def add_beliefs(
default="2015-12-31",
help="Forecast to date (inclusive). Follow up with a date in the form yyyy-mm-dd.",
)
@click.option(
"--resolution",
type=int,
help="Resolution of forecast in minutes. If not set, resolution is determined from the asset to be forecasted",
)
@click.option(
"--horizon",
"horizons",
Expand All @@ -444,6 +449,7 @@ def create_forecasts(
asset_id: int = None,
from_date: str = "2015-02-08",
to_date: str = "2015-12-31",
resolution: Optional[int] = None,
horizons: List[str] = ["1"],
as_job: bool = False,
):
Expand All @@ -454,25 +460,32 @@ def create_forecasts(

--from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6

This will create forecast values from 0am on May 2nd to 0am on May 4th,
This will create forecast values from 0am on May 2nd to 0am on May 5th,
based on a 6 hour horizon.

"""
# make horizons
horizons = [timedelta(hours=int(h)) for h in horizons]

# apply timezone:
# apply timezone and set forecast_end to be an inclusive version of to_date
timezone = app.config.get("FLEXMEASURES_TIMEZONE")
from_date = pd.Timestamp(from_date).tz_localize(timezone)
to_date = pd.Timestamp(to_date).tz_localize(timezone)
forecast_start = pd.Timestamp(from_date).tz_localize(timezone)
forecast_end = (pd.Timestamp(to_date) + pd.Timedelta("1D")).tz_localize(timezone)

if resolution is not None:
event_resolution = timedelta(minutes=resolution)
else:
event_resolution = None

if as_job:
if asset_type == "Asset":
value_type = "Power"
if asset_type == "Market":
elif asset_type == "Market":
value_type = "Price"
if asset_type == "WeatherSensor":
elif asset_type == "WeatherSensor":
value_type = "Weather"
else:
raise TypeError(f"Unknown asset_type {asset_type}")

for horizon in horizons:
# Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs
Expand All @@ -481,14 +494,20 @@ def create_forecasts(
asset_id=asset_id,
timed_value_type=value_type,
horizons=[horizon],
start_of_roll=from_date - horizon,
end_of_roll=to_date - horizon,
start_of_roll=forecast_start - horizon,
end_of_roll=forecast_end - horizon,
)
else:
from flexmeasures.data.scripts.data_gen import populate_time_series_forecasts

populate_time_series_forecasts(
app.db, horizons, from_date, to_date, asset_type, asset_id
db=app.db,
horizons=horizons,
forecast_start=forecast_start,
forecast_end=forecast_end,
event_resolution=event_resolution,
generic_asset_type=asset_type,
generic_asset_id=asset_id,
)


Expand Down
28 changes: 19 additions & 9 deletions flexmeasures/data/scripts/data_gen.py
Expand Up @@ -6,6 +6,7 @@
from shutil import rmtree
from datetime import datetime, timedelta

import pandas as pd
from flask import current_app as app
from flask_sqlalchemy import SQLAlchemy
import click
Expand Down Expand Up @@ -200,8 +201,9 @@ def populate_structure(db: SQLAlchemy):
def populate_time_series_forecasts( # noqa: C901
db: SQLAlchemy,
horizons: List[timedelta],
start: datetime,
end: datetime,
forecast_start: datetime,
forecast_end: datetime,
event_resolution: Optional[timedelta] = None,
generic_asset_type: Optional[str] = None,
generic_asset_id: Optional[int] = None,
):
Expand Down Expand Up @@ -262,11 +264,12 @@ def populate_time_series_forecasts( # noqa: C901
default_model = lookup_model_specs_configurator()
model_specs, model_identifier, model_fallback = default_model(
generic_asset=generic_asset,
forecast_start=start,
forecast_end=end,
forecast_start=forecast_start,
forecast_end=forecast_end,
forecast_horizon=horizon,
custom_model_params=dict(
training_and_testing_period=training_and_testing_period
training_and_testing_period=training_and_testing_period,
event_resolution=event_resolution,
),
)
click.echo(
Expand All @@ -275,16 +278,23 @@ def populate_time_series_forecasts( # noqa: C901
% (
naturaldelta(horizon),
generic_asset.name,
start,
end,
forecast_start,
forecast_end,
naturaldelta(training_and_testing_period),
model_identifier,
)
)
model_specs.creation_time = start
model_specs.creation_time = forecast_start
forecasts, model_state = make_rolling_forecasts(
start=start, end=end, model_specs=model_specs
start=forecast_start, end=forecast_end, model_specs=model_specs
)
# Upsample to sensor resolution if needed
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
if forecasts.index.freq > pd.Timedelta(generic_asset.event_resolution):
forecasts = model_specs.outcome_var.resample_data(
forecasts,
time_window=(forecasts.index.min(), forecasts.index.max()),
expected_frequency=generic_asset.event_resolution,
)
except (NotEnoughDataException, MissingData, NaNData) as e:
click.echo(
"Skipping forecasts for asset %s: %s" % (generic_asset, str(e))
Expand Down