Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timely-beliefs tooling to create forecast model specs #154

Merged
merged 20 commits into from Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 112 additions & 16 deletions flexmeasures/data/models/forecasting/model_spec_factory.py
@@ -1,9 +1,23 @@
from typing import Union, Optional, List
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union
from datetime import datetime, timedelta, tzinfo
from pprint import pformat
import logging
import pytz

from flask import current_app
from timetomodel import DBSeriesSpecs, ModelSpecs
from timetomodel.transforming import BoxCoxTransformation, Transformation
from sqlalchemy.engine import Engine
from flexmeasures.data.queries.utils import (
simplify_index,
)
from timetomodel import ModelSpecs
from timetomodel.exceptions import MissingData, NaNData
from timetomodel.speccing import SeriesSpecs
from timetomodel.transforming import (
BoxCoxTransformation,
ReversibleTransformation,
Transformation,
)
import pandas as pd

from flexmeasures.data.models.assets import AssetType, Asset
from flexmeasures.data.models.markets import MarketType, Market
Expand All @@ -27,6 +41,81 @@
"""


logger = logging.getLogger(__name__)


class TBSeriesSpecs(SeriesSpecs):

"""Define how to collect timely beliefs.
This sets up a class which to call `collect` on, together with parameters to pass.
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
The collect function should return a BeliefsDataFrame.
"""

db: Engine
generic_asset_value_class: Any # with collect method
collect_params: dict

def __init__(
self,
db_engine: Engine,
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
generic_asset_value_class,
collect_params: dict,
name: str,
original_tz: Optional[tzinfo] = pytz.utc, # postgres stores naive datetimes
feature_transformation: Optional[ReversibleTransformation] = None,
post_load_processing: Optional[Transformation] = None,
resampling_config: Dict[str, Any] = None,
interpolation_config: Dict[str, Any] = None,
):
super().__init__(
name,
original_tz,
feature_transformation,
post_load_processing,
resampling_config,
interpolation_config,
)
self.db_engine = db_engine
self.generic_asset_value_class = generic_asset_value_class
self.collect_params = collect_params

def _load_series(self) -> pd.Series:
logger.info(
"Reading %s data from database" % self.generic_asset_value_class.__name__
)

bdf = self.generic_asset_value_class.collect(**self.collect_params)
self.check_data(bdf)
df = simplify_index(bdf)

if self.post_load_processing is not None:
df = self.post_load_processing.transform_dataframe(df)

return df["event_value"]

def check_data(self, df: pd.DataFrame):
"""Raise error if data is empty or contains nan values.
Here, other than in load_series, we can show the query, which is quite helpful."""
if df.empty:
raise MissingData(
"No values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)
if df.isnull().values.any():
raise NaNData(
"Nan values found in database for the requested %s data. It's no use to continue I'm afraid."
" Here's a print-out of what I tried to collect:\n\n%s\n\n"
% (
self.generic_asset_value_class.__name__,
pformat(self.collect_params, sort_dicts=False),
)
)


def create_initial_model_specs( # noqa: C901
generic_asset: Union[Asset, Market, WeatherSensor],
forecast_start: datetime, # Start of forecast period
Expand Down Expand Up @@ -90,29 +179,36 @@ def create_initial_model_specs( # noqa: C901
if ex_post_horizon is None:
ex_post_horizon = timedelta(hours=0)

outcome_var_spec = DBSeriesSpecs(
outcome_var_spec = TBSeriesSpecs(
name=generic_asset_type.name,
db_engine=db.engine,
query=generic_asset_value_class.make_query(
asset_names=[generic_asset.name],
generic_asset_value_class=generic_asset_value_class,
collect_params=dict(
generic_asset_names=[generic_asset.name],
query_window=query_window,
belief_horizon_window=(None, ex_post_horizon),
session=db.session,
),
feature_transformation=params.get("outcome_var_transformation", None),
interpolation_config={"method": "time"},
)
# Set defaults if needed
if params.get("event_resolution", None) is None:
params["event_resolution"] = generic_asset.event_resolution
if params.get("remodel_frequency", None) is None:
params["remodel_frequency"] = timedelta(days=7)
specs = ModelSpecs(
outcome_var=outcome_var_spec,
model=None, # at least this will need to be configured still to make these specs usable!
frequency=generic_asset.event_resolution,
frequency=params[
"event_resolution"
], # todo: timetomodel doesn't distinguish frequency and resolution yet
horizon=forecast_horizon,
lags=[int(lag / generic_asset.event_resolution) for lag in lags],
lags=[int(lag / params["event_resolution"]) for lag in lags],
regressors=regressor_specs,
start_of_training=training_start,
end_of_testing=testing_end,
ratio_training_testing_data=params["ratio_training_testing_data"],
remodel_frequency=timedelta(days=7),
remodel_frequency=params["remodel_frequency"],
)

return specs
Expand Down Expand Up @@ -175,7 +271,7 @@ def configure_regressors_for_nearest_weather_sensor(
horizon,
regressor_transformation, # the regressor transformation can be passed in
transform_to_normal, # if not, it a normalization can be applied
) -> List[DBSeriesSpecs]:
) -> List[TBSeriesSpecs]:
"""For Assets, we use weather data as regressors. Here, we configure them."""
regressor_specs = []
if isinstance(generic_asset, Asset):
Expand Down Expand Up @@ -208,14 +304,14 @@ def configure_regressors_for_nearest_weather_sensor(
)
)
regressor_specs.append(
DBSeriesSpecs(
TBSeriesSpecs(
name=regressor_specs_name,
db_engine=db.engine,
query=Weather.make_query(
asset_names=[closest_sensor.name],
generic_asset_value_class=Weather,
collect_params=dict(
generic_asset_names=[closest_sensor.name],
query_window=query_window,
belief_horizon_window=(horizon, None),
session=db.session,
),
feature_transformation=regressor_transformation,
interpolation_config={"method": "time"},
Expand Down
37 changes: 28 additions & 9 deletions flexmeasures/data/scripts/cli_tasks/data_add.py
Expand Up @@ -424,6 +424,11 @@ def add_beliefs(
default="2015-12-31",
help="Forecast to date (inclusive). Follow up with a date in the form yyyy-mm-dd.",
)
@click.option(
"--resolution",
type=int,
help="Resolution of forecast in minutes. If not set, resolution is determined from the asset to be forecasted",
)
@click.option(
"--horizon",
"horizons",
Expand All @@ -444,6 +449,7 @@ def create_forecasts(
asset_id: int = None,
from_date: str = "2015-02-08",
to_date: str = "2015-12-31",
resolution: Optional[int] = None,
horizons: List[str] = ["1"],
as_job: bool = False,
):
Expand All @@ -454,25 +460,32 @@ def create_forecasts(

--from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6

This will create forecast values from 0am on May 2nd to 0am on May 4th,
This will create forecast values from 0am on May 2nd to 0am on May 5th,
based on a 6 hour horizon.

"""
# make horizons
horizons = [timedelta(hours=int(h)) for h in horizons]

# apply timezone:
# apply timezone and set forecast_end to make to_date inclusive
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
timezone = app.config.get("FLEXMEASURES_TIMEZONE")
from_date = pd.Timestamp(from_date).tz_localize(timezone)
to_date = pd.Timestamp(to_date).tz_localize(timezone)
forecast_start = pd.Timestamp(from_date).tz_localize(timezone)
forecast_end = (pd.Timestamp(to_date) + pd.Timedelta("1D")).tz_localize(timezone)

if resolution is not None:
event_resolution = timedelta(minutes=resolution)
else:
event_resolution = None

if as_job:
if asset_type == "Asset":
value_type = "Power"
if asset_type == "Market":
elif asset_type == "Market":
value_type = "Price"
if asset_type == "WeatherSensor":
elif asset_type == "WeatherSensor":
value_type = "Weather"
else:
raise TypeError(f"Unknown asset_type {asset_type}")

for horizon in horizons:
# Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs
Expand All @@ -481,14 +494,20 @@ def create_forecasts(
asset_id=asset_id,
timed_value_type=value_type,
horizons=[horizon],
start_of_roll=from_date - horizon,
end_of_roll=to_date - horizon,
start_of_roll=forecast_start - horizon,
end_of_roll=forecast_end - horizon,
)
else:
from flexmeasures.data.scripts.data_gen import populate_time_series_forecasts

populate_time_series_forecasts(
app.db, horizons, from_date, to_date, asset_type, asset_id
db=app.db,
horizons=horizons,
forecast_start=forecast_start,
forecast_end=forecast_end,
event_resolution=event_resolution,
generic_asset_type=asset_type,
generic_asset_id=asset_id,
)


Expand Down
30 changes: 21 additions & 9 deletions flexmeasures/data/scripts/data_gen.py
Expand Up @@ -6,6 +6,7 @@
from shutil import rmtree
from datetime import datetime, timedelta

import pandas as pd
from flask import current_app as app
from flask_sqlalchemy import SQLAlchemy
import click
Expand Down Expand Up @@ -200,8 +201,9 @@ def populate_structure(db: SQLAlchemy):
def populate_time_series_forecasts( # noqa: C901
db: SQLAlchemy,
horizons: List[timedelta],
start: datetime,
end: datetime,
forecast_start: datetime,
forecast_end: datetime,
event_resolution: Optional[timedelta] = None,
generic_asset_type: Optional[str] = None,
generic_asset_id: Optional[int] = None,
):
Expand Down Expand Up @@ -262,11 +264,12 @@ def populate_time_series_forecasts( # noqa: C901
default_model = lookup_model_specs_configurator()
model_specs, model_identifier, model_fallback = default_model(
generic_asset=generic_asset,
forecast_start=start,
forecast_end=end,
forecast_start=forecast_start,
forecast_end=forecast_end,
forecast_horizon=horizon,
custom_model_params=dict(
training_and_testing_period=training_and_testing_period
training_and_testing_period=training_and_testing_period,
event_resolution=event_resolution,
),
)
click.echo(
Expand All @@ -275,16 +278,25 @@ def populate_time_series_forecasts( # noqa: C901
% (
naturaldelta(horizon),
generic_asset.name,
start,
end,
forecast_start,
forecast_end,
naturaldelta(training_and_testing_period),
model_identifier,
)
)
model_specs.creation_time = start
model_specs.creation_time = forecast_start
forecasts, model_state = make_rolling_forecasts(
start=start, end=end, model_specs=model_specs
start=forecast_start, end=forecast_end, model_specs=model_specs
)
# Upsample to sensor resolution if needed
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
if forecasts.index.freq > pd.Timedelta(generic_asset.event_resolution):
index = pd.date_range(
forecast_start,
forecast_end,
freq=generic_asset.event_resolution,
closed="left",
)
forecasts = forecasts.reindex(index).fillna(method="pad")
except (NotEnoughDataException, MissingData, NaNData) as e:
click.echo(
"Skipping forecasts for asset %s: %s" % (generic_asset, str(e))
Expand Down
8 changes: 4 additions & 4 deletions flexmeasures/data/services/forecasting.py
Expand Up @@ -30,7 +30,7 @@
"""
The life cycle of a forecasting job:
1. A forecasting job is born in create_forecasting_jobs.
2. It is run in make_forecasts which writes results to the db.
2. It is run in make_rolling_viewpoint_forecasts or make_fixed_viewpoint_forecasts, which write results to the db.
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
This is also where model specs are configured and a possible fallback model is stored for step 3.
3. If an error occurs (and the worker is configured accordingly), handle_forecasting_exception comes in.
This might re-enqueue the job or try a different model (which creates a new job).
Expand Down Expand Up @@ -98,7 +98,7 @@ def create_forecasting_jobs(
jobs: List[Job] = []
for horizon in horizons:
job = Job.create(
make_forecasts,
make_rolling_viewpoint_forecasts,
kwargs=dict(
asset_id=asset_id,
timed_value_type=timed_value_type,
Expand All @@ -117,7 +117,7 @@ def create_forecasting_jobs(
return jobs


def make_forecasts(
def make_rolling_viewpoint_forecasts(
Flix6x marked this conversation as resolved.
Show resolved Hide resolved
asset_id: int,
timed_value_type: str,
horizon: timedelta,
Expand Down Expand Up @@ -262,7 +262,7 @@ def handle_forecasting_exception(job, exc_type, exc_value, traceback):
if "fallback_model_search_term" in job.meta:
if job.meta["fallback_model_search_term"] is not None:
new_job = Job.create(
make_forecasts,
make_rolling_viewpoint_forecasts,
args=job.args,
kwargs=job.kwargs,
connection=current_app.queues["forecasting"].connection,
Expand Down