From 97187b8ef31f1fc97577515da5bf6f5b1cef2597 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 21 Apr 2021 19:59:27 +0200 Subject: [PATCH 1/6] Allow reading beliefs from xlsx files, and from data files with multiple headers --- .../data/scripts/cli_tasks/data_add.py | 67 ++++++++++++++++--- requirements/app.in | 2 +- requirements/app.txt | 2 +- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/flexmeasures/data/scripts/cli_tasks/data_add.py b/flexmeasures/data/scripts/cli_tasks/data_add.py index ab8f0a9f7..fe0cf882e 100644 --- a/flexmeasures/data/scripts/cli_tasks/data_add.py +++ b/flexmeasures/data/scripts/cli_tasks/data_add.py @@ -238,16 +238,54 @@ def add_initial_structure(): help="Allow overwriting possibly already existing data.\n" "Not allowing overwriting can be much more efficient", ) +@click.option( + "--skiprows", + required=False, + default=1, + type=int, + help="Number of rows to skip from the top. Set to >1 to skip additional headers.", +) +@click.option( + "--nrows", + required=False, + type=int, + help="Number of rows to read (from the top, after possibly skipping rows). Leave out to read all rows.", +) +@click.option( + "--datecol", + required=False, + default=0, + type=int, + help="Column number with datetimes (0 is 1st column, the default)", +) +@click.option( + "--valuecol", + required=False, + default=1, + type=int, + help="Column number with values (1 is 2nd column, the default)", +) +@click.option( + "--sheet_number", + required=False, + type=int, + help="[For xls or xlsx files] Sheet number with the data (0 is 1st sheet)", +) def add_beliefs( file: str, sensor_id: int, horizon: Optional[int] = None, cp: Optional[float] = None, allow_overwrite: bool = False, + skiprows: int = 1, + nrows: Optional[int] = None, + datecol: int = 0, + valuecol: int = 1, + sheet_number: Optional[int] = None, ): - """Add sensor data from a csv file. + """Add sensor data from a csv file (also accepts xls or xlsx). - Structure your csv file as follows: + To use default settings, structure your csv file as follows: - One header line (will be ignored!) - UTC datetimes in 1st column @@ -277,20 +315,29 @@ def add_beliefs( source = DataSource(name="Seita", type="CLI script") db.session.add(source) db.session.flush() # assigns id + + # Set up optional parameters for read_csv + kwargs = dict() + if file.split(".")[-1].lower() == "csv": + kwargs["infer_datetime_format"] = True + if sheet_number is not None: + kwargs["sheet_name"] = sheet_number + if horizon is not None: + kwargs["belief_horizon"] = timedelta(minutes=horizon) + else: + kwargs["belief_time"] = server_now().astimezone(pytz.timezone(sensor.timezone)) + bdf = tb.read_csv( file, sensor, source=source, cumulative_probability=cp, + header=None, + skiprows=skiprows, + nrows=nrows, + usecols=[datecol, valuecol], parse_dates=True, - infer_datetime_format=True, - **( - dict(belief_horizon=timedelta(minutes=horizon)) - if horizon is not None - else dict( - belief_time=server_now().astimezone(pytz.timezone(sensor.timezone)) - ) - ), + **kwargs, ) try: TimedBelief.add( diff --git a/requirements/app.in b/requirements/app.in index 37e6ee956..bd0450c63 100644 --- a/requirements/app.in +++ b/requirements/app.in @@ -32,7 +32,7 @@ netCDF4 siphon tables timetomodel>=0.6.8 -timely-beliefs>=1.4.0 +timely-beliefs>=1.4.3 python-dotenv # a backport, not needed in Python3.8 importlib_metadata diff --git a/requirements/app.txt b/requirements/app.txt index 8827feb91..a941f07ef 100644 --- a/requirements/app.txt +++ b/requirements/app.txt @@ -317,7 +317,7 @@ tables==3.6.1 # via -r requirements/app.in threadpoolctl==2.1.0 # via scikit-learn -timely-beliefs==1.4.0 +timely-beliefs==1.4.3 # via -r requirements/app.in timetomodel==0.6.9 # via -r requirements/app.in From b9933a38362f4995651500685d1147be41b3a61d Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 22 Apr 2021 18:13:59 +0200 Subject: [PATCH 2/6] Allow creating new sources and looking up existing sources --- ...rce_id_as_primary_key_for_timed_beliefs.py | 39 +++++++++++++++++++ .../data/scripts/cli_tasks/data_add.py | 34 ++++++++++------ 2 files changed, 62 insertions(+), 11 deletions(-) create mode 100644 flexmeasures/data/migrations/versions/04f0e2d2924a_add_source_id_as_primary_key_for_timed_beliefs.py diff --git a/flexmeasures/data/migrations/versions/04f0e2d2924a_add_source_id_as_primary_key_for_timed_beliefs.py b/flexmeasures/data/migrations/versions/04f0e2d2924a_add_source_id_as_primary_key_for_timed_beliefs.py new file mode 100644 index 000000000..fe2d80d88 --- /dev/null +++ b/flexmeasures/data/migrations/versions/04f0e2d2924a_add_source_id_as_primary_key_for_timed_beliefs.py @@ -0,0 +1,39 @@ +"""add source id as primary key for timed beliefs + +Revision ID: 04f0e2d2924a +Revises: e62ac5f519d7 +Create Date: 2021-04-10 13:53:22.561718 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "04f0e2d2924a" +down_revision = "e62ac5f519d7" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_constraint("timed_belief_pkey", "timed_belief") + op.create_primary_key( + "timed_belief_pkey", + "timed_belief", + [ + "event_start", + "belief_horizon", + "cumulative_probability", + "sensor_id", + "source_id", + ], + ) + + +def downgrade(): + op.drop_constraint("timed_belief_pkey", "timed_belief") + op.create_primary_key( + "timed_belief_pkey", + "timed_belief", + ["event_start", "belief_horizon", "cumulative_probability", "sensor_id"], + ) diff --git a/flexmeasures/data/scripts/cli_tasks/data_add.py b/flexmeasures/data/scripts/cli_tasks/data_add.py index fe0cf882e..3514a584d 100644 --- a/flexmeasures/data/scripts/cli_tasks/data_add.py +++ b/flexmeasures/data/scripts/cli_tasks/data_add.py @@ -220,6 +220,12 @@ def add_initial_structure(): type=click.IntRange(min=1), help="Sensor to which the beliefs pertain.", ) +@click.option( + "--source", + required=True, + type=str, + help="Source of the beliefs (an existing source id or name, or a new name).", +) @click.option( "--horizon", required=False, @@ -274,6 +280,7 @@ def add_initial_structure(): def add_beliefs( file: str, sensor_id: int, + source: str, horizon: Optional[int] = None, cp: Optional[float] = None, allow_overwrite: bool = False, @@ -305,16 +312,21 @@ def add_beliefs( if sensor is None: print(f"Failed to create beliefs: no sensor found with id {sensor_id}.") return - source = ( - DataSource.query.filter(DataSource.name == "Seita") - .filter(DataSource.type == "CLI script") - .one_or_none() - ) - if not source: - print("SETTING UP CLI SCRIPT AS NEW DATA SOURCE...") - source = DataSource(name="Seita", type="CLI script") - db.session.add(source) - db.session.flush() # assigns id + query = DataSource.query.filter(DataSource.type == "CLI script") + if source.isdigit(): + query = query.filter(DataSource.id == int(source)) + _source = query.one_or_none() + if not _source: + print(f"Failed to find source {source}.") + return + else: + query = query.filter(DataSource.name == source) + _source = query.one_or_none() + if not _source: + print(f"Setting up '{source}' as new data source...") + _source = DataSource(name=source, type="CLI script") + db.session.add(_source) + db.session.flush() # assigns id # Set up optional parameters for read_csv kwargs = dict() @@ -330,7 +342,7 @@ def add_beliefs( bdf = tb.read_csv( file, sensor, - source=source, + source=_source, cumulative_probability=cp, header=None, skiprows=skiprows, From a6f344cf443804426f83cc0e1df6f6f47b4129c3 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 29 Apr 2021 10:12:31 +0200 Subject: [PATCH 3/6] Refactor get_or_create_source --- flexmeasures/api/common/utils/api_utils.py | 12 ------- flexmeasures/api/v1/implementations.py | 4 +-- flexmeasures/api/v1_1/implementations.py | 6 ++-- .../api/v2_0/implementations/sensors.py | 8 ++--- flexmeasures/data/models/data_sources.py | 31 ++++++++++++++++++- .../data/scripts/cli_tasks/data_add.py | 17 ++++------ 6 files changed, 45 insertions(+), 33 deletions(-) diff --git a/flexmeasures/api/common/utils/api_utils.py b/flexmeasures/api/common/utils/api_utils.py index e56c15952..dbbcca910 100644 --- a/flexmeasures/api/common/utils/api_utils.py +++ b/flexmeasures/api/common/utils/api_utils.py @@ -13,9 +13,7 @@ from flexmeasures.data import db from flexmeasures.data.models.assets import Asset, Power from flexmeasures.data.models.markets import Market, Price -from flexmeasures.data.models.data_sources import DataSource from flexmeasures.data.models.weather import WeatherSensor, Weather -from flexmeasures.data.models.user import User from flexmeasures.data.utils import save_to_session from flexmeasures.utils.entity_address_utils import parse_entity_address from flexmeasures.api.common.responses import ( @@ -284,16 +282,6 @@ def asset_replace_name_with_id(connections_as_name: List[str]) -> List[str]: return connections_as_ea -def get_or_create_user_data_source(user: User) -> DataSource: - data_source = DataSource.query.filter(DataSource.user == user).one_or_none() - if not data_source: - current_app.logger.info("SETTING UP USER AS NEW DATA SOURCE...") - data_source = DataSource(user=user) - db.session.add(data_source) - db.session.flush() # flush so that we can reference the new object in the current db session - return data_source - - def get_weather_sensor_by( weather_sensor_type_name: str, latitude: float = 0, longitude: float = 0 ) -> WeatherSensor: diff --git a/flexmeasures/api/v1/implementations.py b/flexmeasures/api/v1/implementations.py index 373677ddb..0860a6e5f 100644 --- a/flexmeasures/api/v1/implementations.py +++ b/flexmeasures/api/v1/implementations.py @@ -12,6 +12,7 @@ EntityAddressException, ) from flexmeasures.data.models.assets import Asset, Power +from flexmeasures.data.models.data_sources import get_or_create_source from flexmeasures.data.services.resources import get_assets from flexmeasures.data.services.forecasting import create_forecasting_jobs from flexmeasures.api.common.responses import ( @@ -24,7 +25,6 @@ ) from flexmeasures.api.common.utils.api_utils import ( groups_to_dict, - get_or_create_user_data_source, save_to_db, ) from flexmeasures.api.common.utils.validators import ( @@ -242,7 +242,7 @@ def create_connection_and_value_groups( # noqa: C901 current_app.logger.info("POSTING POWER DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) user_assets = get_assets() if not user_assets: current_app.logger.info("User doesn't seem to have any assets") diff --git a/flexmeasures/api/v1_1/implementations.py b/flexmeasures/api/v1_1/implementations.py index 3efdf2ffe..89600dbbd 100644 --- a/flexmeasures/api/v1_1/implementations.py +++ b/flexmeasures/api/v1_1/implementations.py @@ -16,7 +16,6 @@ ) from flexmeasures.api.common.utils.api_utils import ( save_to_db, - get_or_create_user_data_source, ) from flexmeasures.api.common.utils.validators import ( type_accepted, @@ -37,6 +36,7 @@ create_connection_and_value_groups, ) from flexmeasures.api.common.utils.api_utils import get_weather_sensor_by +from flexmeasures.data.models.data_sources import get_or_create_source from flexmeasures.data.models.markets import Market, Price from flexmeasures.data.models.weather import Weather from flexmeasures.data.services.resources import get_assets @@ -79,7 +79,7 @@ def post_price_data_response( current_app.logger.info("POSTING PRICE DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) prices = [] forecasting_jobs = [] for market_group, value_group in zip(generic_asset_name_groups, value_groups): @@ -154,7 +154,7 @@ def post_weather_data_response( # noqa: C901 current_app.logger.info("POSTING WEATHER DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) weather_measurements = [] forecasting_jobs = [] for sensor_group, value_group in zip(generic_asset_name_groups, value_groups): diff --git a/flexmeasures/api/v2_0/implementations/sensors.py b/flexmeasures/api/v2_0/implementations/sensors.py index a4c8d6382..0fccd50fd 100644 --- a/flexmeasures/api/v2_0/implementations/sensors.py +++ b/flexmeasures/api/v2_0/implementations/sensors.py @@ -15,7 +15,6 @@ ResponseTuple, ) from flexmeasures.api.common.utils.api_utils import ( - get_or_create_user_data_source, get_weather_sensor_by, save_to_db, determine_belief_timing, @@ -33,6 +32,7 @@ values_required, ) from flexmeasures.data.models.assets import Asset, Power +from flexmeasures.data.models.data_sources import get_or_create_source from flexmeasures.data.models.markets import Market, Price from flexmeasures.data.models.weather import Weather from flexmeasures.data.services.forecasting import create_forecasting_jobs @@ -69,7 +69,7 @@ def post_price_data_response( # noqa C901 current_app.logger.info("POSTING PRICE DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) prices = [] forecasting_jobs = [] for market_group, event_values in zip(generic_asset_name_groups, value_groups): @@ -152,7 +152,7 @@ def post_weather_data_response( # noqa: C901 current_app.logger.info("POSTING WEATHER DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) weather_measurements = [] forecasting_jobs = [] for sensor_group, event_values in zip(generic_asset_name_groups, value_groups): @@ -301,7 +301,7 @@ def post_power_data( current_app.logger.info("POSTING POWER DATA") - data_source = get_or_create_user_data_source(current_user) + data_source = get_or_create_source(current_user) user_assets = get_assets() if not user_assets: current_app.logger.info("User doesn't seem to have any assets") diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 0ebed3eb5..a29953678 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -1,6 +1,7 @@ -from typing import Optional +from typing import Optional, Union import timely_beliefs as tb +from flask import current_app from flexmeasures.data.config import db from flexmeasures.data.models.user import User @@ -53,3 +54,31 @@ def label(self): def __repr__(self): return "" % (self.id, self.label) + + +def get_or_create_source( + source: Union[User, str], source_type: str = "user" +) -> DataSource: + query = DataSource.query.filter(DataSource.type == source_type) + if isinstance(source, User): + query = query.filter(DataSource.user == source) + elif isinstance(source, str): + query = query.filter(DataSource.name == source) + else: + raise TypeError("source should be of type User or str") + _source = query.one_or_none() + if not _source: + current_app.logger.info(f"Setting up '{source}' as new data source...") + if isinstance(source, User): + _source = DataSource(user=source) + else: + _source = DataSource(name=source, type=source_type) + db.session.add(_source) + db.session.flush() # assigns id so that we can reference the new object in the current db session + return _source + + +def get_source_or_none(source: int, source_type: str) -> Optional[DataSource]: + query = DataSource.query.filter(DataSource.type == source_type) + query = query.filter(DataSource.id == int(source)) + return query.one_or_none() diff --git a/flexmeasures/data/scripts/cli_tasks/data_add.py b/flexmeasures/data/scripts/cli_tasks/data_add.py index 3514a584d..4af282e33 100644 --- a/flexmeasures/data/scripts/cli_tasks/data_add.py +++ b/flexmeasures/data/scripts/cli_tasks/data_add.py @@ -20,7 +20,10 @@ from flexmeasures.data.models.assets import Asset, AssetSchema from flexmeasures.data.models.markets import Market from flexmeasures.data.models.weather import WeatherSensor, WeatherSensorSchema -from flexmeasures.data.models.data_sources import DataSource +from flexmeasures.data.models.data_sources import ( + get_or_create_source, + get_source_or_none, +) from flexmeasures.utils.time_utils import server_now @@ -312,21 +315,13 @@ def add_beliefs( if sensor is None: print(f"Failed to create beliefs: no sensor found with id {sensor_id}.") return - query = DataSource.query.filter(DataSource.type == "CLI script") if source.isdigit(): - query = query.filter(DataSource.id == int(source)) - _source = query.one_or_none() + _source = get_source_or_none(int(source), source_type="CLI script") if not _source: print(f"Failed to find source {source}.") return else: - query = query.filter(DataSource.name == source) - _source = query.one_or_none() - if not _source: - print(f"Setting up '{source}' as new data source...") - _source = DataSource(name=source, type="CLI script") - db.session.add(_source) - db.session.flush() # assigns id + _source = get_or_create_source(source, source_type="CLI script") # Set up optional parameters for read_csv kwargs = dict() From 19f746170aefe6b12588aaf44977623120795029 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 29 Apr 2021 10:14:03 +0200 Subject: [PATCH 4/6] Optional flush --- flexmeasures/data/models/data_sources.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index a29953678..487dfff72 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -57,7 +57,7 @@ def __repr__(self): def get_or_create_source( - source: Union[User, str], source_type: str = "user" + source: Union[User, str], source_type: str = "user", flush: bool = True ) -> DataSource: query = DataSource.query.filter(DataSource.type == source_type) if isinstance(source, User): @@ -74,7 +74,9 @@ def get_or_create_source( else: _source = DataSource(name=source, type=source_type) db.session.add(_source) - db.session.flush() # assigns id so that we can reference the new object in the current db session + if flush: + # assigns id so that we can reference the new object in the current db session + db.session.flush() return _source From 99161e754291d8e7827457b65905531a18645fb5 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Thu, 29 Apr 2021 10:43:54 +0200 Subject: [PATCH 5/6] Take into account that current_user can be of LocalProxy type, and use get_current_object to get the User object --- flexmeasures/data/models/data_sources.py | 6 +++--- flexmeasures/data/models/user.py | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/flexmeasures/data/models/data_sources.py b/flexmeasures/data/models/data_sources.py index 487dfff72..3c502745b 100644 --- a/flexmeasures/data/models/data_sources.py +++ b/flexmeasures/data/models/data_sources.py @@ -4,7 +4,7 @@ from flask import current_app from flexmeasures.data.config import db -from flexmeasures.data.models.user import User +from flexmeasures.data.models.user import User, is_user class DataSource(db.Model, tb.BeliefSourceDBMixin): @@ -60,7 +60,7 @@ def get_or_create_source( source: Union[User, str], source_type: str = "user", flush: bool = True ) -> DataSource: query = DataSource.query.filter(DataSource.type == source_type) - if isinstance(source, User): + if is_user(source): query = query.filter(DataSource.user == source) elif isinstance(source, str): query = query.filter(DataSource.name == source) @@ -69,7 +69,7 @@ def get_or_create_source( _source = query.one_or_none() if not _source: current_app.logger.info(f"Setting up '{source}' as new data source...") - if isinstance(source, User): + if is_user(source): _source = DataSource(user=source) else: _source = DataSource(name=source, type=source_type) diff --git a/flexmeasures/data/models/user.py b/flexmeasures/data/models/user.py index c2fa4ea9f..59260b1ec 100644 --- a/flexmeasures/data/models/user.py +++ b/flexmeasures/data/models/user.py @@ -98,3 +98,14 @@ def remember_login(the_app, user): if user.login_count is None: user.login_count = 0 user.login_count = user.login_count + 1 + + +def is_user(o) -> bool: + """True of object is user, False otherwise. + + Takes into account that object can be of LocalProxy type, and + uses get_current_object to get the underlying (User) object. + """ + return isinstance(o, User) or ( + hasattr(o, "_get_current_object") and isinstance(o._get_current_object(), User) + ) From fe85c1913992b7d78f0e84d98cf56c58fa1dda39 Mon Sep 17 00:00:00 2001 From: Felix Claessen <30658763+Flix6x@users.noreply.github.com> Date: Thu, 29 Apr 2021 11:48:37 +0200 Subject: [PATCH 6/6] Improve docstring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Nicolas Höning --- flexmeasures/data/models/user.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flexmeasures/data/models/user.py b/flexmeasures/data/models/user.py index 59260b1ec..fca257a48 100644 --- a/flexmeasures/data/models/user.py +++ b/flexmeasures/data/models/user.py @@ -101,7 +101,7 @@ def remember_login(the_app, user): def is_user(o) -> bool: - """True of object is user, False otherwise. + """True if object is or proxies a User, False otherwise. Takes into account that object can be of LocalProxy type, and uses get_current_object to get the underlying (User) object.