Skip to content

Commit

Permalink
Allow reading beliefs from xlsx files, and from data files with multi…
Browse files Browse the repository at this point in the history
…ple headers (#103)

* Allow reading beliefs from xlsx files, and from data files with multiple headers

* Allow creating new sources and looking
up existing sources

* Refactor get_or_create_source

* Optional flush

* Take into account that current_user can be of LocalProxy type, and use get_current_object to get the User object

* Improve docstring

Co-authored-by: Nicolas Höning <iam@nicolashoening.de>

Co-authored-by: F.N. Claessen <felix@seita.nl>
Co-authored-by: Nicolas Höning <iam@nicolashoening.de>
  • Loading branch information
3 people committed Apr 29, 2021
1 parent 8ed869d commit 04e1076
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 47 deletions.
12 changes: 0 additions & 12 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -13,9 +13,7 @@
from flexmeasures.data import db
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.models.user import User
from flexmeasures.data.utils import save_to_session
from flexmeasures.utils.entity_address_utils import parse_entity_address
from flexmeasures.api.common.responses import (
Expand Down Expand Up @@ -284,16 +282,6 @@ def asset_replace_name_with_id(connections_as_name: List[str]) -> List[str]:
return connections_as_ea


def get_or_create_user_data_source(user: User) -> DataSource:
data_source = DataSource.query.filter(DataSource.user == user).one_or_none()
if not data_source:
current_app.logger.info("SETTING UP USER AS NEW DATA SOURCE...")
data_source = DataSource(user=user)
db.session.add(data_source)
db.session.flush() # flush so that we can reference the new object in the current db session
return data_source


def get_weather_sensor_by(
weather_sensor_type_name: str, latitude: float = 0, longitude: float = 0
) -> WeatherSensor:
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/api/v1/implementations.py
Expand Up @@ -12,6 +12,7 @@
EntityAddressException,
)
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.services.resources import get_assets
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.api.common.responses import (
Expand All @@ -24,7 +25,6 @@
)
from flexmeasures.api.common.utils.api_utils import (
groups_to_dict,
get_or_create_user_data_source,
save_to_db,
)
from flexmeasures.api.common.utils.validators import (
Expand Down Expand Up @@ -244,7 +244,7 @@ def create_connection_and_value_groups( # noqa: C901

current_app.logger.info("POSTING POWER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
user_assets = get_assets()
if not user_assets:
current_app.logger.info("User doesn't seem to have any assets")
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/v1_1/implementations.py
Expand Up @@ -17,7 +17,6 @@
)
from flexmeasures.api.common.utils.api_utils import (
save_to_db,
get_or_create_user_data_source,
)
from flexmeasures.api.common.utils.validators import (
type_accepted,
Expand All @@ -38,6 +37,7 @@
create_connection_and_value_groups,
)
from flexmeasures.api.common.utils.api_utils import get_weather_sensor_by
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.weather import Weather
from flexmeasures.data.services.resources import get_assets
Expand Down Expand Up @@ -80,7 +80,7 @@ def post_price_data_response(

current_app.logger.info("POSTING PRICE DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
prices = []
forecasting_jobs = []
for market_group, value_group in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -155,7 +155,7 @@ def post_weather_data_response( # noqa: C901

current_app.logger.info("POSTING WEATHER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
weather_measurements = []
forecasting_jobs = []
for sensor_group, value_group in zip(generic_asset_name_groups, value_groups):
Expand Down
8 changes: 4 additions & 4 deletions flexmeasures/api/v2_0/implementations/sensors.py
Expand Up @@ -15,7 +15,6 @@
ResponseTuple,
)
from flexmeasures.api.common.utils.api_utils import (
get_or_create_user_data_source,
get_weather_sensor_by,
save_to_db,
determine_belief_timing,
Expand All @@ -33,6 +32,7 @@
values_required,
)
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.weather import Weather
from flexmeasures.data.services.forecasting import create_forecasting_jobs
Expand Down Expand Up @@ -69,7 +69,7 @@ def post_price_data_response( # noqa C901

current_app.logger.info("POSTING PRICE DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
prices = []
forecasting_jobs = []
for market_group, event_values in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -152,7 +152,7 @@ def post_weather_data_response( # noqa: C901

current_app.logger.info("POSTING WEATHER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
weather_measurements = []
forecasting_jobs = []
for sensor_group, event_values in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -301,7 +301,7 @@ def post_power_data(

current_app.logger.info("POSTING POWER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
user_assets = get_assets()
if not user_assets:
current_app.logger.info("User doesn't seem to have any assets")
Expand Down
@@ -0,0 +1,39 @@
"""add source id as primary key for timed beliefs
Revision ID: 04f0e2d2924a
Revises: e62ac5f519d7
Create Date: 2021-04-10 13:53:22.561718
"""
from alembic import op


# revision identifiers, used by Alembic.
revision = "04f0e2d2924a"
down_revision = "e62ac5f519d7"
branch_labels = None
depends_on = None


def upgrade():
op.drop_constraint("timed_belief_pkey", "timed_belief")
op.create_primary_key(
"timed_belief_pkey",
"timed_belief",
[
"event_start",
"belief_horizon",
"cumulative_probability",
"sensor_id",
"source_id",
],
)


def downgrade():
op.drop_constraint("timed_belief_pkey", "timed_belief")
op.create_primary_key(
"timed_belief_pkey",
"timed_belief",
["event_start", "belief_horizon", "cumulative_probability", "sensor_id"],
)
35 changes: 33 additions & 2 deletions flexmeasures/data/models/data_sources.py
@@ -1,9 +1,10 @@
from typing import Optional
from typing import Optional, Union

import timely_beliefs as tb
from flask import current_app

from flexmeasures.data.config import db
from flexmeasures.data.models.user import User
from flexmeasures.data.models.user import User, is_user


class DataSource(db.Model, tb.BeliefSourceDBMixin):
Expand Down Expand Up @@ -53,3 +54,33 @@ def label(self):

def __repr__(self):
return "<Data source %r (%s)>" % (self.id, self.label)


def get_or_create_source(
source: Union[User, str], source_type: str = "user", flush: bool = True
) -> DataSource:
query = DataSource.query.filter(DataSource.type == source_type)
if is_user(source):
query = query.filter(DataSource.user == source)
elif isinstance(source, str):
query = query.filter(DataSource.name == source)
else:
raise TypeError("source should be of type User or str")
_source = query.one_or_none()
if not _source:
current_app.logger.info(f"Setting up '{source}' as new data source...")
if is_user(source):
_source = DataSource(user=source)
else:
_source = DataSource(name=source, type=source_type)
db.session.add(_source)
if flush:
# assigns id so that we can reference the new object in the current db session
db.session.flush()
return _source


def get_source_or_none(source: int, source_type: str) -> Optional[DataSource]:
query = DataSource.query.filter(DataSource.type == source_type)
query = query.filter(DataSource.id == int(source))
return query.one_or_none()
11 changes: 11 additions & 0 deletions flexmeasures/data/models/user.py
Expand Up @@ -98,3 +98,14 @@ def remember_login(the_app, user):
if user.login_count is None:
user.login_count = 0
user.login_count = user.login_count + 1


def is_user(o) -> bool:
"""True if object is or proxies a User, False otherwise.
Takes into account that object can be of LocalProxy type, and
uses get_current_object to get the underlying (User) object.
"""
return isinstance(o, User) or (
hasattr(o, "_get_current_object") and isinstance(o._get_current_object(), User)
)
98 changes: 76 additions & 22 deletions flexmeasures/data/scripts/cli_tasks/data_add.py
Expand Up @@ -20,7 +20,10 @@
from flexmeasures.data.models.assets import Asset, AssetSchema
from flexmeasures.data.models.markets import Market
from flexmeasures.data.models.weather import WeatherSensor, WeatherSensorSchema
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.data_sources import (
get_or_create_source,
get_source_or_none,
)
from flexmeasures.utils.time_utils import server_now


Expand Down Expand Up @@ -220,6 +223,12 @@ def add_initial_structure():
type=click.IntRange(min=1),
help="Sensor to which the beliefs pertain.",
)
@click.option(
"--source",
required=True,
type=str,
help="Source of the beliefs (an existing source id or name, or a new name).",
)
@click.option(
"--horizon",
required=False,
Expand All @@ -238,16 +247,55 @@ def add_initial_structure():
help="Allow overwriting possibly already existing data.\n"
"Not allowing overwriting can be much more efficient",
)
@click.option(
"--skiprows",
required=False,
default=1,
type=int,
help="Number of rows to skip from the top. Set to >1 to skip additional headers.",
)
@click.option(
"--nrows",
required=False,
type=int,
help="Number of rows to read (from the top, after possibly skipping rows). Leave out to read all rows.",
)
@click.option(
"--datecol",
required=False,
default=0,
type=int,
help="Column number with datetimes (0 is 1st column, the default)",
)
@click.option(
"--valuecol",
required=False,
default=1,
type=int,
help="Column number with values (1 is 2nd column, the default)",
)
@click.option(
"--sheet_number",
required=False,
type=int,
help="[For xls or xlsx files] Sheet number with the data (0 is 1st sheet)",
)
def add_beliefs(
file: str,
sensor_id: int,
source: str,
horizon: Optional[int] = None,
cp: Optional[float] = None,
allow_overwrite: bool = False,
skiprows: int = 1,
nrows: Optional[int] = None,
datecol: int = 0,
valuecol: int = 1,
sheet_number: Optional[int] = None,
):
"""Add sensor data from a csv file.
"""Add sensor data from a csv file (also accepts xls or xlsx).
Structure your csv file as follows:
To use default settings, structure your csv file as follows:
- One header line (will be ignored!)
- UTC datetimes in 1st column
Expand All @@ -267,30 +315,36 @@ def add_beliefs(
if sensor is None:
print(f"Failed to create beliefs: no sensor found with id {sensor_id}.")
return
source = (
DataSource.query.filter(DataSource.name == "Seita")
.filter(DataSource.type == "CLI script")
.one_or_none()
)
if not source:
print("SETTING UP CLI SCRIPT AS NEW DATA SOURCE...")
source = DataSource(name="Seita", type="CLI script")
db.session.add(source)
db.session.flush() # assigns id
if source.isdigit():
_source = get_source_or_none(int(source), source_type="CLI script")
if not _source:
print(f"Failed to find source {source}.")
return
else:
_source = get_or_create_source(source, source_type="CLI script")

# Set up optional parameters for read_csv
kwargs = dict()
if file.split(".")[-1].lower() == "csv":
kwargs["infer_datetime_format"] = True
if sheet_number is not None:
kwargs["sheet_name"] = sheet_number
if horizon is not None:
kwargs["belief_horizon"] = timedelta(minutes=horizon)
else:
kwargs["belief_time"] = server_now().astimezone(pytz.timezone(sensor.timezone))

bdf = tb.read_csv(
file,
sensor,
source=source,
source=_source,
cumulative_probability=cp,
header=None,
skiprows=skiprows,
nrows=nrows,
usecols=[datecol, valuecol],
parse_dates=True,
infer_datetime_format=True,
**(
dict(belief_horizon=timedelta(minutes=horizon))
if horizon is not None
else dict(
belief_time=server_now().astimezone(pytz.timezone(sensor.timezone))
)
),
**kwargs,
)
try:
TimedBelief.add(
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.4.0
timely-beliefs>=1.4.3
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -317,7 +317,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==2.1.0
# via scikit-learn
timely-beliefs==1.4.0
timely-beliefs==1.4.3
# via -r requirements/app.in
timetomodel==0.6.9
# via -r requirements/app.in
Expand Down

0 comments on commit 04e1076

Please sign in to comment.