Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reading beliefs from xlsx files, and from data files with multiple headers #103

Merged
merged 6 commits into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 0 additions & 12 deletions flexmeasures/api/common/utils/api_utils.py
Expand Up @@ -13,9 +13,7 @@
from flexmeasures.data import db
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.weather import WeatherSensor, Weather
from flexmeasures.data.models.user import User
from flexmeasures.data.utils import save_to_session
from flexmeasures.utils.entity_address_utils import parse_entity_address
from flexmeasures.api.common.responses import (
Expand Down Expand Up @@ -284,16 +282,6 @@ def asset_replace_name_with_id(connections_as_name: List[str]) -> List[str]:
return connections_as_ea


def get_or_create_user_data_source(user: User) -> DataSource:
data_source = DataSource.query.filter(DataSource.user == user).one_or_none()
if not data_source:
current_app.logger.info("SETTING UP USER AS NEW DATA SOURCE...")
data_source = DataSource(user=user)
db.session.add(data_source)
db.session.flush() # flush so that we can reference the new object in the current db session
return data_source


def get_weather_sensor_by(
weather_sensor_type_name: str, latitude: float = 0, longitude: float = 0
) -> WeatherSensor:
Expand Down
4 changes: 2 additions & 2 deletions flexmeasures/api/v1/implementations.py
Expand Up @@ -12,6 +12,7 @@
EntityAddressException,
)
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.services.resources import get_assets
from flexmeasures.data.services.forecasting import create_forecasting_jobs
from flexmeasures.api.common.responses import (
Expand All @@ -24,7 +25,6 @@
)
from flexmeasures.api.common.utils.api_utils import (
groups_to_dict,
get_or_create_user_data_source,
save_to_db,
)
from flexmeasures.api.common.utils.validators import (
Expand Down Expand Up @@ -242,7 +242,7 @@ def create_connection_and_value_groups( # noqa: C901

current_app.logger.info("POSTING POWER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
user_assets = get_assets()
if not user_assets:
current_app.logger.info("User doesn't seem to have any assets")
Expand Down
6 changes: 3 additions & 3 deletions flexmeasures/api/v1_1/implementations.py
Expand Up @@ -16,7 +16,6 @@
)
from flexmeasures.api.common.utils.api_utils import (
save_to_db,
get_or_create_user_data_source,
)
from flexmeasures.api.common.utils.validators import (
type_accepted,
Expand All @@ -37,6 +36,7 @@
create_connection_and_value_groups,
)
from flexmeasures.api.common.utils.api_utils import get_weather_sensor_by
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.weather import Weather
from flexmeasures.data.services.resources import get_assets
Expand Down Expand Up @@ -79,7 +79,7 @@ def post_price_data_response(

current_app.logger.info("POSTING PRICE DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
prices = []
forecasting_jobs = []
for market_group, value_group in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -154,7 +154,7 @@ def post_weather_data_response( # noqa: C901

current_app.logger.info("POSTING WEATHER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
weather_measurements = []
forecasting_jobs = []
for sensor_group, value_group in zip(generic_asset_name_groups, value_groups):
Expand Down
8 changes: 4 additions & 4 deletions flexmeasures/api/v2_0/implementations/sensors.py
Expand Up @@ -15,7 +15,6 @@
ResponseTuple,
)
from flexmeasures.api.common.utils.api_utils import (
get_or_create_user_data_source,
get_weather_sensor_by,
save_to_db,
determine_belief_timing,
Expand All @@ -33,6 +32,7 @@
values_required,
)
from flexmeasures.data.models.assets import Asset, Power
from flexmeasures.data.models.data_sources import get_or_create_source
from flexmeasures.data.models.markets import Market, Price
from flexmeasures.data.models.weather import Weather
from flexmeasures.data.services.forecasting import create_forecasting_jobs
Expand Down Expand Up @@ -69,7 +69,7 @@ def post_price_data_response( # noqa C901

current_app.logger.info("POSTING PRICE DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
prices = []
forecasting_jobs = []
for market_group, event_values in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -152,7 +152,7 @@ def post_weather_data_response( # noqa: C901

current_app.logger.info("POSTING WEATHER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
weather_measurements = []
forecasting_jobs = []
for sensor_group, event_values in zip(generic_asset_name_groups, value_groups):
Expand Down Expand Up @@ -301,7 +301,7 @@ def post_power_data(

current_app.logger.info("POSTING POWER DATA")

data_source = get_or_create_user_data_source(current_user)
data_source = get_or_create_source(current_user)
user_assets = get_assets()
if not user_assets:
current_app.logger.info("User doesn't seem to have any assets")
Expand Down
@@ -0,0 +1,39 @@
"""add source id as primary key for timed beliefs

Revision ID: 04f0e2d2924a
Revises: e62ac5f519d7
Create Date: 2021-04-10 13:53:22.561718

"""
from alembic import op


# revision identifiers, used by Alembic.
revision = "04f0e2d2924a"
down_revision = "e62ac5f519d7"
branch_labels = None
depends_on = None


def upgrade():
op.drop_constraint("timed_belief_pkey", "timed_belief")
op.create_primary_key(
"timed_belief_pkey",
"timed_belief",
[
"event_start",
"belief_horizon",
"cumulative_probability",
"sensor_id",
"source_id",
],
)


def downgrade():
op.drop_constraint("timed_belief_pkey", "timed_belief")
op.create_primary_key(
"timed_belief_pkey",
"timed_belief",
["event_start", "belief_horizon", "cumulative_probability", "sensor_id"],
)
35 changes: 33 additions & 2 deletions flexmeasures/data/models/data_sources.py
@@ -1,9 +1,10 @@
from typing import Optional
from typing import Optional, Union

import timely_beliefs as tb
from flask import current_app

from flexmeasures.data.config import db
from flexmeasures.data.models.user import User
from flexmeasures.data.models.user import User, is_user


class DataSource(db.Model, tb.BeliefSourceDBMixin):
Expand Down Expand Up @@ -53,3 +54,33 @@ def label(self):

def __repr__(self):
return "<Data source %r (%s)>" % (self.id, self.label)


def get_or_create_source(
source: Union[User, str], source_type: str = "user", flush: bool = True
) -> DataSource:
query = DataSource.query.filter(DataSource.type == source_type)
if is_user(source):
query = query.filter(DataSource.user == source)
elif isinstance(source, str):
query = query.filter(DataSource.name == source)
else:
raise TypeError("source should be of type User or str")
_source = query.one_or_none()
if not _source:
current_app.logger.info(f"Setting up '{source}' as new data source...")
if is_user(source):
_source = DataSource(user=source)
else:
_source = DataSource(name=source, type=source_type)
db.session.add(_source)
if flush:
# assigns id so that we can reference the new object in the current db session
db.session.flush()
return _source


def get_source_or_none(source: int, source_type: str) -> Optional[DataSource]:
query = DataSource.query.filter(DataSource.type == source_type)
query = query.filter(DataSource.id == int(source))
return query.one_or_none()
11 changes: 11 additions & 0 deletions flexmeasures/data/models/user.py
Expand Up @@ -98,3 +98,14 @@ def remember_login(the_app, user):
if user.login_count is None:
user.login_count = 0
user.login_count = user.login_count + 1


def is_user(o) -> bool:
"""True if object is or proxies a User, False otherwise.

Takes into account that object can be of LocalProxy type, and
uses get_current_object to get the underlying (User) object.
"""
return isinstance(o, User) or (
hasattr(o, "_get_current_object") and isinstance(o._get_current_object(), User)
)
98 changes: 76 additions & 22 deletions flexmeasures/data/scripts/cli_tasks/data_add.py
Expand Up @@ -20,7 +20,10 @@
from flexmeasures.data.models.assets import Asset, AssetSchema
from flexmeasures.data.models.markets import Market
from flexmeasures.data.models.weather import WeatherSensor, WeatherSensorSchema
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.data_sources import (
get_or_create_source,
get_source_or_none,
)
from flexmeasures.utils.time_utils import server_now


Expand Down Expand Up @@ -220,6 +223,12 @@ def add_initial_structure():
type=click.IntRange(min=1),
help="Sensor to which the beliefs pertain.",
)
@click.option(
"--source",
required=True,
type=str,
help="Source of the beliefs (an existing source id or name, or a new name).",
)
@click.option(
"--horizon",
required=False,
Expand All @@ -238,16 +247,55 @@ def add_initial_structure():
help="Allow overwriting possibly already existing data.\n"
"Not allowing overwriting can be much more efficient",
)
@click.option(
"--skiprows",
required=False,
default=1,
type=int,
help="Number of rows to skip from the top. Set to >1 to skip additional headers.",
)
@click.option(
"--nrows",
required=False,
type=int,
help="Number of rows to read (from the top, after possibly skipping rows). Leave out to read all rows.",
)
@click.option(
"--datecol",
required=False,
default=0,
type=int,
help="Column number with datetimes (0 is 1st column, the default)",
)
@click.option(
"--valuecol",
required=False,
default=1,
type=int,
help="Column number with values (1 is 2nd column, the default)",
)
@click.option(
"--sheet_number",
required=False,
type=int,
help="[For xls or xlsx files] Sheet number with the data (0 is 1st sheet)",
)
def add_beliefs(
file: str,
sensor_id: int,
source: str,
horizon: Optional[int] = None,
cp: Optional[float] = None,
allow_overwrite: bool = False,
skiprows: int = 1,
nrows: Optional[int] = None,
datecol: int = 0,
valuecol: int = 1,
sheet_number: Optional[int] = None,
):
"""Add sensor data from a csv file.
"""Add sensor data from a csv file (also accepts xls or xlsx).

Structure your csv file as follows:
To use default settings, structure your csv file as follows:

- One header line (will be ignored!)
- UTC datetimes in 1st column
Expand All @@ -267,30 +315,36 @@ def add_beliefs(
if sensor is None:
print(f"Failed to create beliefs: no sensor found with id {sensor_id}.")
return
source = (
DataSource.query.filter(DataSource.name == "Seita")
.filter(DataSource.type == "CLI script")
.one_or_none()
)
if not source:
print("SETTING UP CLI SCRIPT AS NEW DATA SOURCE...")
source = DataSource(name="Seita", type="CLI script")
db.session.add(source)
db.session.flush() # assigns id
if source.isdigit():
_source = get_source_or_none(int(source), source_type="CLI script")
if not _source:
print(f"Failed to find source {source}.")
return
else:
_source = get_or_create_source(source, source_type="CLI script")

# Set up optional parameters for read_csv
kwargs = dict()
if file.split(".")[-1].lower() == "csv":
kwargs["infer_datetime_format"] = True
if sheet_number is not None:
kwargs["sheet_name"] = sheet_number
if horizon is not None:
kwargs["belief_horizon"] = timedelta(minutes=horizon)
else:
kwargs["belief_time"] = server_now().astimezone(pytz.timezone(sensor.timezone))

bdf = tb.read_csv(
file,
sensor,
source=source,
source=_source,
cumulative_probability=cp,
header=None,
skiprows=skiprows,
nrows=nrows,
usecols=[datecol, valuecol],
parse_dates=True,
infer_datetime_format=True,
**(
dict(belief_horizon=timedelta(minutes=horizon))
if horizon is not None
else dict(
belief_time=server_now().astimezone(pytz.timezone(sensor.timezone))
)
),
**kwargs,
)
try:
TimedBelief.add(
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -32,7 +32,7 @@ netCDF4
siphon
tables
timetomodel>=0.6.8
timely-beliefs>=1.4.0
timely-beliefs>=1.4.3
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.txt
Expand Up @@ -317,7 +317,7 @@ tables==3.6.1
# via -r requirements/app.in
threadpoolctl==2.1.0
# via scikit-learn
timely-beliefs==1.4.0
timely-beliefs==1.4.3
# via -r requirements/app.in
timetomodel==0.6.9
# via -r requirements/app.in
Expand Down