From d39abc76445d0916b569fcdb5444f9eee0ed1091 Mon Sep 17 00:00:00 2001 From: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Date: Mon, 31 Jan 2022 13:20:04 +0100 Subject: [PATCH] Clean up references to old sensor ids (#342) * Clean up CLI function to create forecasts Signed-off-by: F.N. Claessen * Replace remaining references to old_sensor_id with sensor_id Signed-off-by: F.N. Claessen * Disallow forecasting all sensors at once, but allow forecasting multiple sensors at once by explicitly passing their ids Signed-off-by: F.N. Claessen * Update docstring Signed-off-by: F.N. Claessen --- .../api/v1/tests/test_api_v1_fresh_db.py | 2 +- flexmeasures/api/v1_1/tests/test_api_v1_1.py | 2 +- .../tests/test_api_v2_0_sensors_fresh_db.py | 2 +- flexmeasures/cli/data_add.py | 45 ++++++------- flexmeasures/cli/testing.py | 2 +- flexmeasures/data/scripts/data_gen.py | 64 +++++-------------- flexmeasures/data/services/forecasting.py | 14 ++-- .../data/tests/test_forecasting_jobs.py | 10 +-- .../tests/test_forecasting_jobs_fresh_db.py | 6 +- 9 files changed, 54 insertions(+), 93 deletions(-) diff --git a/flexmeasures/api/v1/tests/test_api_v1_fresh_db.py b/flexmeasures/api/v1/tests/test_api_v1_fresh_db.py index 127ca2c82..02059c747 100644 --- a/flexmeasures/api/v1/tests/test_api_v1_fresh_db.py +++ b/flexmeasures/api/v1/tests/test_api_v1_fresh_db.py @@ -73,7 +73,7 @@ def test_post_and_get_meter_data( for asset_name in ("CS 1", "CS 2", "CS 3"): if asset_name in str(post_message): sensor = Sensor.query.filter_by(name=asset_name).one_or_none() - assert sensor.id in [job.kwargs["old_sensor_id"] for job in jobs] + assert sensor.id in [job.kwargs["sensor_id"] for job in jobs] # get meter data get_meter_data_response = client.get( diff --git a/flexmeasures/api/v1_1/tests/test_api_v1_1.py b/flexmeasures/api/v1_1/tests/test_api_v1_1.py index 440b37abc..2ab523113 100644 --- a/flexmeasures/api/v1_1/tests/test_api_v1_1.py +++ b/flexmeasures/api/v1_1/tests/test_api_v1_1.py @@ -154,7 +154,7 @@ def test_post_price_data(setup_api_test_data, db, app, clean_redis, post_message for job, horizon in zip(jobs, horizons): assert job.kwargs["horizon"] == horizon assert job.kwargs["start"] == parse_date(post_message["start"]) + horizon - assert job.kwargs["old_sensor_id"] == market.id + assert job.kwargs["sensor_id"] == market.id @pytest.mark.parametrize( diff --git a/flexmeasures/api/v2_0/tests/test_api_v2_0_sensors_fresh_db.py b/flexmeasures/api/v2_0/tests/test_api_v2_0_sensors_fresh_db.py index 0436693c9..712bd3ece 100644 --- a/flexmeasures/api/v2_0/tests/test_api_v2_0_sensors_fresh_db.py +++ b/flexmeasures/api/v2_0/tests/test_api_v2_0_sensors_fresh_db.py @@ -59,4 +59,4 @@ def test_post_price_data_2_0( for job, horizon in zip(jobs, horizons): assert job.kwargs["horizon"] == horizon assert job.kwargs["start"] == parse_date(post_message["start"]) + horizon - assert job.kwargs["old_sensor_id"] == market.id + assert job.kwargs["sensor_id"] == market.id diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 33471a2ac..eb8bdf0cd 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -483,15 +483,11 @@ def add_beliefs( @fm_add_data.command("forecasts") @with_appcontext @click.option( - "--asset-type", - type=click.Choice(["Asset", "Market", "WeatherSensor"]), - help="The generic asset type for which to generate forecasts." - " Follow up with Asset, Market or WeatherSensor.", -) -@click.option( - "--asset-id", - help="Populate (time series) data for a single asset only. Follow up with the asset's ID. " - "We still need --asset-type, as well, so we know where to look this ID up.", + "--sensor-id", + "sensor_ids", + multiple=True, + required=True, + help="Create forecasts for this sensor. Follow up with the sensor's ID. This argument can be given multiple times.", ) @click.option( "--from-date", @@ -508,7 +504,7 @@ def add_beliefs( @click.option( "--resolution", type=int, - help="Resolution of forecast in minutes. If not set, resolution is determined from the asset to be forecasted", + help="Resolution of forecast in minutes. If not set, resolution is determined from the sensor to be forecasted", ) @click.option( "--horizon", @@ -526,8 +522,7 @@ def add_beliefs( " config settings to that of the remote server. To process the job, run a worker to process the forecasting queue. Defaults to False.", ) def create_forecasts( - asset_type: str = None, - asset_id: int = None, + sensor_ids: List[int], from_date_str: str = "2015-02-08", to_date_str: str = "2015-12-31", horizons_as_hours: List[str] = ["1"], @@ -539,10 +534,10 @@ def create_forecasts( For example: - --from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6 + --from_date 2015-02-02 --to_date 2015-02-04 --horizon_hours 6 --sensor-id 12 --sensor-id 14 This will create forecast values from 0am on May 2nd to 0am on May 5th, - based on a 6 hour horizon. + based on a 6-hour horizon, for sensors 12 and 14. """ # make horizons @@ -562,26 +557,26 @@ def create_forecasts( event_resolution = None if as_job: - for horizon in horizons: - # Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs - # the time period refers to the period of belief_times, therefore we are subtracting the horizon. - create_forecasting_jobs( - old_sensor_id=asset_id, - horizons=[horizon], - start_of_roll=forecast_start - horizon, - end_of_roll=forecast_end - horizon, - ) + for sensor_id in sensor_ids: + for horizon in horizons: + # Note that this time period refers to the period of events we are forecasting, while in create_forecasting_jobs + # the time period refers to the period of belief_times, therefore we are subtracting the horizon. + create_forecasting_jobs( + sensor_id=sensor_id, + horizons=[horizon], + start_of_roll=forecast_start - horizon, + end_of_roll=forecast_end - horizon, + ) else: from flexmeasures.data.scripts.data_gen import populate_time_series_forecasts populate_time_series_forecasts( db=app.db, + sensor_ids=sensor_ids, horizons=horizons, forecast_start=forecast_start, forecast_end=forecast_end, event_resolution=event_resolution, - old_sensor_class_name=asset_type, - old_sensor_id=asset_id, ) diff --git a/flexmeasures/cli/testing.py b/flexmeasures/cli/testing.py index f723f6137..8bbe137e9 100644 --- a/flexmeasures/cli/testing.py +++ b/flexmeasures/cli/testing.py @@ -53,7 +53,7 @@ def test_making_forecasts(): click.echo("Forecasts found before : %d" % forecast_filter.count()) create_forecasting_jobs( - old_sensor_id=sensor_id, + sensor_id=sensor_id, horizons=[timedelta(hours=6)], start_of_roll=as_server_time(datetime(2015, 4, 1)), end_of_roll=as_server_time(datetime(2015, 4, 3)), diff --git a/flexmeasures/data/scripts/data_gen.py b/flexmeasures/data/scripts/data_gen.py index 280ce2bf1..2d4390368 100644 --- a/flexmeasures/data/scripts/data_gen.py +++ b/flexmeasures/data/scripts/data_gen.py @@ -202,12 +202,11 @@ def populate_structure(db: SQLAlchemy): @as_transaction # noqa: C901 def populate_time_series_forecasts( # noqa: C901 db: SQLAlchemy, + sensor_ids: List[int], horizons: List[timedelta], forecast_start: datetime, forecast_end: datetime, event_resolution: Optional[timedelta] = None, - old_sensor_class_name: Optional[str] = None, - old_sensor_id: Optional[int] = None, ): training_and_testing_period = timedelta(days=30) @@ -221,51 +220,20 @@ def populate_time_series_forecasts( # noqa: C901 name="Seita", type="demo script" ).one_or_none() - # List all old sensors for which to forecast. - # Look into their type if no name is given. If a name is given, - old_sensors = [] - if old_sensor_id is None: - if old_sensor_class_name is None or old_sensor_class_name == "WeatherSensor": - sensors = WeatherSensor.query.all() - old_sensors.extend(sensors) - if old_sensor_class_name is None or old_sensor_class_name == "Asset": - assets = Asset.query.all() - old_sensors.extend(assets) - if old_sensor_class_name is None or old_sensor_class_name == "Market": - markets = Market.query.all() - old_sensors.extend(markets) - else: - if old_sensor_class_name is None: - click.echo( - "If you specify --asset-name, please also specify --asset-type, so we can look it up." - ) - return - if old_sensor_class_name == "WeatherSensor": - sensors = WeatherSensor.query.filter( - WeatherSensor.id == old_sensor_id - ).one_or_none() - if sensors is not None: - old_sensors.append(sensors) - if old_sensor_class_name == "Asset": - assets = Asset.query.filter(Asset.id == old_sensor_id).one_or_none() - if assets is not None: - old_sensors.append(assets) - if old_sensor_class_name == "Market": - markets = Market.query.filter(Market.id == old_sensor_id).one_or_none() - if markets is not None: - old_sensors.append(markets) - if not old_sensors: - click.echo("No such assets in db, so I will not add any forecasts.") + # List all sensors for which to forecast. + sensors = [Sensor.query.filter(Sensor.id.in_(sensor_ids)).one_or_none()] + if not sensors: + click.echo("No such sensors in db, so I will not add any forecasts.") return - # Make a model for each old sensor and horizon, make rolling forecasts and save to database. + # Make a model for each sensor and horizon, make rolling forecasts and save to database. # We cannot use (faster) bulk save, as forecasts might become regressors in other forecasts. - for old_sensor in old_sensors: + for sensor in sensors: for horizon in horizons: try: default_model = lookup_model_specs_configurator() model_specs, model_identifier, model_fallback = default_model( - sensor=old_sensor.corresponding_sensor, + sensor=sensor, forecast_start=forecast_start, forecast_end=forecast_end, forecast_horizon=horizon, @@ -275,11 +243,11 @@ def populate_time_series_forecasts( # noqa: C901 ), ) click.echo( - "Computing forecasts of %s ahead for %s, " + "Computing forecasts of %s ahead for sensor %s, " "from %s to %s with a training and testing period of %s, using %s ..." % ( naturaldelta(horizon), - old_sensor.name, + sensor.id, forecast_start, forecast_end, naturaldelta(training_and_testing_period), @@ -291,16 +259,14 @@ def populate_time_series_forecasts( # noqa: C901 start=forecast_start, end=forecast_end, model_specs=model_specs ) # Upsample to sensor resolution if needed - if forecasts.index.freq > pd.Timedelta(old_sensor.event_resolution): + if forecasts.index.freq > pd.Timedelta(sensor.event_resolution): forecasts = model_specs.outcome_var.resample_data( forecasts, time_window=(forecasts.index.min(), forecasts.index.max()), - expected_frequency=old_sensor.event_resolution, + expected_frequency=sensor.event_resolution, ) except (NotEnoughDataException, MissingData, NaNData) as e: - click.echo( - "Skipping forecasts for old sensor %s: %s" % (old_sensor, str(e)) - ) + click.echo("Skipping forecasts for sensor %s: %s" % (sensor, str(e))) continue """ import matplotlib.pyplot as plt @@ -320,7 +286,7 @@ def populate_time_series_forecasts( # noqa: C901 event_start=ensure_local_timezone(dt, tz_name=LOCAL_TIME_ZONE), belief_horizon=horizon, event_value=value, - sensor=old_sensor.corresponding_sensor, + sensor=sensor, source=data_source, ) for dt, value in forecasts.items() @@ -328,7 +294,7 @@ def populate_time_series_forecasts( # noqa: C901 print( "Saving %s %s-forecasts for %s..." - % (len(beliefs), naturaldelta(horizon), old_sensor.name) + % (len(beliefs), naturaldelta(horizon), sensor.id) ) for belief in beliefs: db.session.add(belief) diff --git a/flexmeasures/data/services/forecasting.py b/flexmeasures/data/services/forecasting.py index eeb91ec40..dc07628a2 100644 --- a/flexmeasures/data/services/forecasting.py +++ b/flexmeasures/data/services/forecasting.py @@ -43,7 +43,7 @@ class MisconfiguredForecastingJobException(Exception): def create_forecasting_jobs( - old_sensor_id: int, + sensor_id: int, start_of_roll: datetime, end_of_roll: datetime, resolution: timedelta = None, @@ -96,7 +96,7 @@ def create_forecasting_jobs( job = Job.create( make_rolling_viewpoint_forecasts, kwargs=dict( - old_sensor_id=old_sensor_id, + sensor_id=sensor_id, horizon=horizon, start=start_of_roll + horizon, end=end_of_roll + horizon, @@ -118,7 +118,7 @@ def create_forecasting_jobs( def make_fixed_viewpoint_forecasts( - old_sensor_id: int, + sensor_id: int, horizon: timedelta, start: datetime, end: datetime, @@ -135,7 +135,7 @@ def make_fixed_viewpoint_forecasts( def make_rolling_viewpoint_forecasts( - old_sensor_id: int, + sensor_id: int, horizon: timedelta, start: datetime, end: datetime, @@ -150,8 +150,8 @@ def make_rolling_viewpoint_forecasts( Parameters ---------- - :param old_sensor_id: int - To identify which old sensor to forecast (note: old_sensor_id == sensor_id) + :param sensor_id: int + To identify which sensor to forecast :param horizon: timedelta duration between the end of each interval and the time at which the belief about that interval is formed :param start: datetime @@ -173,7 +173,7 @@ def make_rolling_viewpoint_forecasts( model_search_term = rq_job.meta.get("model_search_term", "linear-OLS") # find sensor - sensor = Sensor.query.filter_by(id=old_sensor_id).one_or_none() + sensor = Sensor.query.filter_by(id=sensor_id).one_or_none() click.echo( "Running Forecasting Job %s: %s for %s on model '%s', from %s to %s" diff --git a/flexmeasures/data/tests/test_forecasting_jobs.py b/flexmeasures/data/tests/test_forecasting_jobs.py index df839f913..a8815cd55 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs.py +++ b/flexmeasures/data/tests/test_forecasting_jobs.py @@ -61,7 +61,7 @@ def test_forecasting_an_hour_of_wind(db, app, setup_test_data): start_of_roll=as_server_time(datetime(2015, 1, 1, 6)), end_of_roll=as_server_time(datetime(2015, 1, 1, 7)), horizons=[horizon], - old_sensor_id=wind_device_1.id, + sensor_id=wind_device_1.id, custom_model_params=custom_model_params(), ) @@ -109,7 +109,7 @@ def test_forecasting_two_hours_of_solar_at_edge_of_data_set(db, app, setup_test_ horizons=[ timedelta(hours=6) ], # so we want forecasts for 11.15pm (Jan 1st) to 0.15am (Jan 2nd) - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, custom_model_params=custom_model_params(), ) print("Job: %s" % job[0].id) @@ -176,7 +176,7 @@ def test_failed_forecasting_insufficient_data(app, clean_redis, setup_test_data) start_of_roll=as_server_time(datetime(2016, 1, 1, 20)), end_of_roll=as_server_time(datetime(2016, 1, 1, 22)), horizons=[timedelta(hours=1)], - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, custom_model_params=custom_model_params(), ) work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) @@ -190,7 +190,7 @@ def test_failed_forecasting_invalid_horizon(app, clean_redis, setup_test_data): start_of_roll=as_server_time(datetime(2015, 1, 1, 21)), end_of_roll=as_server_time(datetime(2015, 1, 1, 23)), horizons=[timedelta(hours=18)], - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, custom_model_params=custom_model_params(), ) work_on_rq(app.queues["forecasting"], exc_handler=handle_forecasting_exception) @@ -209,7 +209,7 @@ def test_failed_unknown_model(app, clean_redis, setup_test_data): start_of_roll=as_server_time(datetime(2015, 1, 1, 12)), end_of_roll=as_server_time(datetime(2015, 1, 1, 14)), horizons=[horizon], - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, model_search_term="no-one-knows-this", custom_model_params=cmp, ) diff --git a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py index b21a8f959..b1660b334 100644 --- a/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py +++ b/flexmeasures/data/tests/test_forecasting_jobs_fresh_db.py @@ -27,7 +27,7 @@ def test_forecasting_three_hours_of_wind(app, setup_fresh_test_data, clean_redis start_of_roll=as_server_time(datetime(2015, 1, 1, 10)), end_of_roll=as_server_time(datetime(2015, 1, 1, 13)), horizons=[horizon], - old_sensor_id=wind_device2.id, + sensor_id=wind_device2.id, custom_model_params=custom_model_params(), ) print("Job: %s" % job[0].id) @@ -59,7 +59,7 @@ def test_forecasting_two_hours_of_solar(app, setup_fresh_test_data, clean_redis) start_of_roll=as_server_time(datetime(2015, 1, 1, 12)), end_of_roll=as_server_time(datetime(2015, 1, 1, 14)), horizons=[horizon], - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, custom_model_params=custom_model_params(), ) print("Job: %s" % job[0].id) @@ -106,7 +106,7 @@ def test_failed_model_with_too_much_training_then_succeed_with_fallback( start_of_roll=as_server_time(datetime(2015, 1, 1, hour_start)), end_of_roll=as_server_time(datetime(2015, 1, 1, hour_start + 2)), horizons=[horizon], - old_sensor_id=solar_device1.id, + sensor_id=solar_device1.id, model_search_term=model_to_start_with, custom_model_params=cmp, )