Skip to content

Commit

Permalink
Clean up references to old sensor ids (#342)
Browse files Browse the repository at this point in the history
* Clean up CLI function to create forecasts

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Replace remaining references to old_sensor_id with sensor_id

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Disallow forecasting all sensors at once, but allow forecasting multiple sensors at once by explicitly passing their ids

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Update docstring

Signed-off-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x committed Jan 31, 2022
1 parent 43a0173 commit d39abc7
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 93 deletions.
2 changes: 1 addition & 1 deletion flexmeasures/api/v1/tests/test_api_v1_fresh_db.py
Expand Up @@ -73,7 +73,7 @@ def test_post_and_get_meter_data(
for asset_name in ("CS 1", "CS 2", "CS 3"):
if asset_name in str(post_message):
sensor = Sensor.query.filter_by(name=asset_name).one_or_none()
assert sensor.id in [job.kwargs["old_sensor_id"] for job in jobs]
assert sensor.id in [job.kwargs["sensor_id"] for job in jobs]

# get meter data
get_meter_data_response = client.get(
Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/api/v1_1/tests/test_api_v1_1.py
Expand Up @@ -154,7 +154,7 @@ def test_post_price_data(setup_api_test_data, db, app, clean_redis, post_message
for job, horizon in zip(jobs, horizons):
assert job.kwargs["horizon"] == horizon
assert job.kwargs["start"] == parse_date(post_message["start"]) + horizon
assert job.kwargs["old_sensor_id"] == market.id
assert job.kwargs["sensor_id"] == market.id


@pytest.mark.parametrize(
Expand Down
Expand Up @@ -59,4 +59,4 @@ def test_post_price_data_2_0(
for job, horizon in zip(jobs, horizons):
assert job.kwargs["horizon"] == horizon
assert job.kwargs["start"] == parse_date(post_message["start"]) + horizon
assert job.kwargs["old_sensor_id"] == market.id
assert job.kwargs["sensor_id"] == market.id
45 changes: 20 additions & 25 deletions flexmeasures/cli/data_add.py
Expand Up @@ -483,15 +483,11 @@ def add_beliefs(
@fm_add_data.command("forecasts")
@with_appcontext
@click.option(
"--asset-type",
type=click.Choice(["Asset", "Market", "WeatherSensor"]),
help="The generic asset type for which to generate forecasts."
" Follow up with Asset, Market or WeatherSensor.",
)
@click.option(
"--asset-id",
help="Populate (time series) data for a single asset only. Follow up with the asset's ID. "
"We still need --asset-type, as well, so we know where to look this ID up.",
"--sensor-id",
"sensor_ids",
multiple=True,
required=True,
help="Create forecasts for this sensor. Follow up with the sensor's ID. This argument can be given multiple times.",
)
@click.option(
"--from-date",
Expand All @@ -508,7 +504,7 @@ def add_beliefs(
@click.option(
"--resolution",
type=int,
help="Resolution of forecast in minutes. If not set, resolution is determined from the asset to be forecasted",
help="Resolution of forecast in minutes. If not set, resolution is determined from the sensor to be forecasted",
)
@click.option(
"--horizon",
Expand All @@ -526,8 +522,7 @@ def add_beliefs(
" config settings to that of the remote server. To process the job, run a worker to process the forecasting queue. Defaults to False.",
)
def create_forecasts(
asset_type: str = None,
asset_id: int = None,
sensor_ids: List[int],
from_date_str: str = "2015-02-08",
to_date_str: str = "2015-12-31",
horizons_as_hours: List[str] = ["1"],
Expand All @@ -539,10 +534,10 @@ def create_forecasts(
For example:
--from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6
--from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6 --sensor-id 12 --sensor-id 14
This will create forecast values from 0am on May 2nd to 0am on May 5th,
based on a 6 hour horizon.
based on a 6-hour horizon, for sensors 12 and 14.
"""
# make horizons
Expand All @@ -562,26 +557,26 @@ def create_forecasts(
event_resolution = None

if as_job:
for horizon in horizons:
# Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs
# the time period refers to the period of belief_times, therefore we are subtracting the horizon.
create_forecasting_jobs(
old_sensor_id=asset_id,
horizons=[horizon],
start_of_roll=forecast_start - horizon,
end_of_roll=forecast_end - horizon,
)
for sensor_id in sensor_ids:
for horizon in horizons:
# Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs
# the time period refers to the period of belief_times, therefore we are subtracting the horizon.
create_forecasting_jobs(
sensor_id=sensor_id,
horizons=[horizon],
start_of_roll=forecast_start - horizon,
end_of_roll=forecast_end - horizon,
)
else:
from flexmeasures.data.scripts.data_gen import populate_time_series_forecasts

populate_time_series_forecasts(
db=app.db,
sensor_ids=sensor_ids,
horizons=horizons,
forecast_start=forecast_start,
forecast_end=forecast_end,
event_resolution=event_resolution,
old_sensor_class_name=asset_type,
old_sensor_id=asset_id,
)


Expand Down
2 changes: 1 addition & 1 deletion flexmeasures/cli/testing.py
Expand Up @@ -53,7 +53,7 @@ def test_making_forecasts():
click.echo("Forecasts found before : %d" % forecast_filter.count())

create_forecasting_jobs(
old_sensor_id=sensor_id,
sensor_id=sensor_id,
horizons=[timedelta(hours=6)],
start_of_roll=as_server_time(datetime(2015, 4, 1)),
end_of_roll=as_server_time(datetime(2015, 4, 3)),
Expand Down
64 changes: 15 additions & 49 deletions flexmeasures/data/scripts/data_gen.py
Expand Up @@ -202,12 +202,11 @@ def populate_structure(db: SQLAlchemy):
@as_transaction # noqa: C901
def populate_time_series_forecasts( # noqa: C901
db: SQLAlchemy,
sensor_ids: List[int],
horizons: List[timedelta],
forecast_start: datetime,
forecast_end: datetime,
event_resolution: Optional[timedelta] = None,
old_sensor_class_name: Optional[str] = None,
old_sensor_id: Optional[int] = None,
):
training_and_testing_period = timedelta(days=30)

Expand All @@ -221,51 +220,20 @@ def populate_time_series_forecasts( # noqa: C901
name="Seita", type="demo script"
).one_or_none()

# List all old sensors for which to forecast.
# Look into their type if no name is given. If a name is given,
old_sensors = []
if old_sensor_id is None:
if old_sensor_class_name is None or old_sensor_class_name == "WeatherSensor":
sensors = WeatherSensor.query.all()
old_sensors.extend(sensors)
if old_sensor_class_name is None or old_sensor_class_name == "Asset":
assets = Asset.query.all()
old_sensors.extend(assets)
if old_sensor_class_name is None or old_sensor_class_name == "Market":
markets = Market.query.all()
old_sensors.extend(markets)
else:
if old_sensor_class_name is None:
click.echo(
"If you specify --asset-name, please also specify --asset-type, so we can look it up."
)
return
if old_sensor_class_name == "WeatherSensor":
sensors = WeatherSensor.query.filter(
WeatherSensor.id == old_sensor_id
).one_or_none()
if sensors is not None:
old_sensors.append(sensors)
if old_sensor_class_name == "Asset":
assets = Asset.query.filter(Asset.id == old_sensor_id).one_or_none()
if assets is not None:
old_sensors.append(assets)
if old_sensor_class_name == "Market":
markets = Market.query.filter(Market.id == old_sensor_id).one_or_none()
if markets is not None:
old_sensors.append(markets)
if not old_sensors:
click.echo("No such assets in db, so I will not add any forecasts.")
# List all sensors for which to forecast.
sensors = [Sensor.query.filter(Sensor.id.in_(sensor_ids)).one_or_none()]
if not sensors:
click.echo("No such sensors in db, so I will not add any forecasts.")
return

# Make a model for each old sensor and horizon, make rolling forecasts and save to database.
# Make a model for each sensor and horizon, make rolling forecasts and save to database.
# We cannot use (faster) bulk save, as forecasts might become regressors in other forecasts.
for old_sensor in old_sensors:
for sensor in sensors:
for horizon in horizons:
try:
default_model = lookup_model_specs_configurator()
model_specs, model_identifier, model_fallback = default_model(
sensor=old_sensor.corresponding_sensor,
sensor=sensor,
forecast_start=forecast_start,
forecast_end=forecast_end,
forecast_horizon=horizon,
Expand All @@ -275,11 +243,11 @@ def populate_time_series_forecasts( # noqa: C901
),
)
click.echo(
"Computing forecasts of %s ahead for %s, "
"Computing forecasts of %s ahead for sensor %s, "
"from %s to %s with a training and testing period of %s, using %s ..."
% (
naturaldelta(horizon),
old_sensor.name,
sensor.id,
forecast_start,
forecast_end,
naturaldelta(training_and_testing_period),
Expand All @@ -291,16 +259,14 @@ def populate_time_series_forecasts( # noqa: C901
start=forecast_start, end=forecast_end, model_specs=model_specs
)
# Upsample to sensor resolution if needed
if forecasts.index.freq > pd.Timedelta(old_sensor.event_resolution):
if forecasts.index.freq > pd.Timedelta(sensor.event_resolution):
forecasts = model_specs.outcome_var.resample_data(
forecasts,
time_window=(forecasts.index.min(), forecasts.index.max()),
expected_frequency=old_sensor.event_resolution,
expected_frequency=sensor.event_resolution,
)
except (NotEnoughDataException, MissingData, NaNData) as e:
click.echo(
"Skipping forecasts for old sensor %s: %s" % (old_sensor, str(e))
)
click.echo("Skipping forecasts for sensor %s: %s" % (sensor, str(e)))
continue
"""
import matplotlib.pyplot as plt
Expand All @@ -320,15 +286,15 @@ def populate_time_series_forecasts( # noqa: C901
event_start=ensure_local_timezone(dt, tz_name=LOCAL_TIME_ZONE),
belief_horizon=horizon,
event_value=value,
sensor=old_sensor.corresponding_sensor,
sensor=sensor,
source=data_source,
)
for dt, value in forecasts.items()
]

print(
"Saving %s %s-forecasts for %s..."
% (len(beliefs), naturaldelta(horizon), old_sensor.name)
% (len(beliefs), naturaldelta(horizon), sensor.id)
)
for belief in beliefs:
db.session.add(belief)
Expand Down
14 changes: 7 additions & 7 deletions flexmeasures/data/services/forecasting.py
Expand Up @@ -43,7 +43,7 @@ class MisconfiguredForecastingJobException(Exception):


def create_forecasting_jobs(
old_sensor_id: int,
sensor_id: int,
start_of_roll: datetime,
end_of_roll: datetime,
resolution: timedelta = None,
Expand Down Expand Up @@ -96,7 +96,7 @@ def create_forecasting_jobs(
job = Job.create(
make_rolling_viewpoint_forecasts,
kwargs=dict(
old_sensor_id=old_sensor_id,
sensor_id=sensor_id,
horizon=horizon,
start=start_of_roll + horizon,
end=end_of_roll + horizon,
Expand All @@ -118,7 +118,7 @@ def create_forecasting_jobs(


def make_fixed_viewpoint_forecasts(
old_sensor_id: int,
sensor_id: int,
horizon: timedelta,
start: datetime,
end: datetime,
Expand All @@ -135,7 +135,7 @@ def make_fixed_viewpoint_forecasts(


def make_rolling_viewpoint_forecasts(
old_sensor_id: int,
sensor_id: int,
horizon: timedelta,
start: datetime,
end: datetime,
Expand All @@ -150,8 +150,8 @@ def make_rolling_viewpoint_forecasts(
Parameters
----------
:param old_sensor_id: int
To identify which old sensor to forecast (note: old_sensor_id == sensor_id)
:param sensor_id: int
To identify which sensor to forecast
:param horizon: timedelta
duration between the end of each interval and the time at which the belief about that interval is formed
:param start: datetime
Expand All @@ -173,7 +173,7 @@ def make_rolling_viewpoint_forecasts(
model_search_term = rq_job.meta.get("model_search_term", "linear-OLS")

# find sensor
sensor = Sensor.query.filter_by(id=old_sensor_id).one_or_none()
sensor = Sensor.query.filter_by(id=sensor_id).one_or_none()

click.echo(
"Running Forecasting Job %s: %s for %s on model '%s', from %s to %s"
Expand Down
10 changes: 5 additions & 5 deletions flexmeasures/data/tests/test_forecasting_jobs.py
Expand Up @@ -61,7 +61,7 @@ def test_forecasting_an_hour_of_wind(db, app, setup_test_data):
start_of_roll=as_server_time(datetime(2015, 1, 1, 6)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 7)),
horizons=[horizon],
old_sensor_id=wind_device_1.id,
sensor_id=wind_device_1.id,
custom_model_params=custom_model_params(),
)

Expand Down Expand Up @@ -109,7 +109,7 @@ def test_forecasting_two_hours_of_solar_at_edge_of_data_set(db, app, setup_test_
horizons=[
timedelta(hours=6)
], # so we want forecasts for 11.15pm (Jan 1st) to 0.15am (Jan 2nd)
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
custom_model_params=custom_model_params(),
)
print("Job: %s" % job[0].id)
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_failed_forecasting_insufficient_data(app, clean_redis, setup_test_data)
start_of_roll=as_server_time(datetime(2016, 1, 1, 20)),
end_of_roll=as_server_time(datetime(2016, 1, 1, 22)),
horizons=[timedelta(hours=1)],
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
custom_model_params=custom_model_params(),
)
work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception)
Expand All @@ -190,7 +190,7 @@ def test_failed_forecasting_invalid_horizon(app, clean_redis, setup_test_data):
start_of_roll=as_server_time(datetime(2015, 1, 1, 21)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 23)),
horizons=[timedelta(hours=18)],
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
custom_model_params=custom_model_params(),
)
work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception)
Expand All @@ -209,7 +209,7 @@ def test_failed_unknown_model(app, clean_redis, setup_test_data):
start_of_roll=as_server_time(datetime(2015, 1, 1, 12)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 14)),
horizons=[horizon],
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
model_search_term="no-one-knows-this",
custom_model_params=cmp,
)
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py
Expand Up @@ -27,7 +27,7 @@ def test_forecasting_three_hours_of_wind(app, setup_fresh_test_data, clean_redis
start_of_roll=as_server_time(datetime(2015, 1, 1, 10)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 13)),
horizons=[horizon],
old_sensor_id=wind_device2.id,
sensor_id=wind_device2.id,
custom_model_params=custom_model_params(),
)
print("Job: %s" % job[0].id)
Expand Down Expand Up @@ -59,7 +59,7 @@ def test_forecasting_two_hours_of_solar(app, setup_fresh_test_data, clean_redis)
start_of_roll=as_server_time(datetime(2015, 1, 1, 12)),
end_of_roll=as_server_time(datetime(2015, 1, 1, 14)),
horizons=[horizon],
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
custom_model_params=custom_model_params(),
)
print("Job: %s" % job[0].id)
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_failed_model_with_too_much_training_then_succeed_with_fallback(
start_of_roll=as_server_time(datetime(2015, 1, 1, hour_start)),
end_of_roll=as_server_time(datetime(2015, 1, 1, hour_start + 2)),
horizons=[horizon],
old_sensor_id=solar_device1.id,
sensor_id=solar_device1.id,
model_search_term=model_to_start_with,
custom_model_params=cmp,
)
Expand Down

0 comments on commit d39abc7

Please sign in to comment.