Skip to content

Commit

Permalink
Read csv with naive data, column filters and datetime/timedelta units (
Browse files Browse the repository at this point in the history
…#521)

Improved import of time series data from CSV file: localize timezone naive data, support reading in datetime and timedelta values, remove rows with NaN values, and filter by values in specific columns.


* Support unit conversion for python datetime and timedelta objects

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Allow to filter by column when reading in beliefs from CSV

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Allow to set a timezone for reading in timezone naive data

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Allow throwing out NaN values when reading in beliefs

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Support datetime unit conversion for aware datetimes with mixed offset

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Raise instead of assume UTC when reading in timezone naive data without a timezone set explicitly

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Bump timely-beliefs dependency for read_csv

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Refactor: flake8

Signed-off-by: F.N. Claessen <felix@seita.nl>

* CLI changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* changelog entry

Signed-off-by: F.N. Claessen <felix@seita.nl>

* mypy

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Use sensor id field validation

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Querying for a source with a given id no longer requires knowing the source type

Signed-off-by: F.N. Claessen <felix@seita.nl>

* make freeze-deps

Signed-off-by: F.N. Claessen <felix@seita.nl>

* add optional dependency: timely-beliefs[forecast]

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Mention data conversion from 'datetime' or 'timedelta' units

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Allow converting 'datetime' values to a duration other than seconds (since UNIX epoch)

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Refactor and make convert_time_units a private function

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Refactor and add inline comment explaining why we check to_unit for a digit

Signed-off-by: F.N. Claessen <felix@seita.nl>

* mypy: PEP 484 prohibits implicit Optional

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Attempt to revert bugs introduced in merge with main

Signed-off-by: F.N. Claessen <felix@seita.nl>

* black and flake8

Signed-off-by: F.N. Claessen <felix@seita.nl>

* A few more reverts

Signed-off-by: F.N. Claessen <felix@seita.nl>

* Fix typos

Signed-off-by: F.N. Claessen <felix@seita.nl>

Signed-off-by: F.N. Claessen <felix@seita.nl>
  • Loading branch information
Flix6x committed Nov 10, 2022
1 parent 7b974ff commit b1bd21d
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 74 deletions.
2 changes: 1 addition & 1 deletion documentation/changelog.rst
Expand Up @@ -12,8 +12,8 @@ New features
* Ability to provide your own custom scheduling function [see `PR #505 <http://www.github.com/FlexMeasures/flexmeasures/pull/505>`_]
* Visually distinguish forecasts/schedules (dashed lines) from measurements (solid lines), and expand the tooltip with timing info regarding the forecast/schedule horizon or measurement lag [see `PR #503 <http://www.github.com/FlexMeasures/flexmeasures/pull/503>`_]
* The asset page also allows to show sensor data from other assets that belong to the same account [see `PR #500 <http://www.github.com/FlexMeasures/flexmeasures/pull/500>`_]
* Improved import of time series data from CSV file: 1) drop duplicate records with warning, and 2) allow configuring which column contains explicit recording times for each data point (use case: import forecasts) [see `PR #501 <http://www.github.com/FlexMeasures/flexmeasures/pull/501>`_]
* The CLI command ``flexmeasures show beliefs`` supports showing beliefs data in a custom resolution and/or timezone, and also saving the shown beliefs data to a CSV file [see `PR #519 <http://www.github.com/FlexMeasures/flexmeasures/pull/519>`_]
* Improved import of time series data from CSV file: 1) drop duplicate records with warning, 2) allow configuring which column contains explicit recording times for each data point (use case: import forecasts) [see `PR #501 <http://www.github.com/FlexMeasures/flexmeasures/pull/501>`_], 3) localize timezone naive data, 4) support reading in datetime and timedelta values, 5) remove rows with NaN values, and 6) filter by values in specific columns [see `PR #521 <http://www.github.com/FlexMeasures/flexmeasures/pull/521>`_]

Bugfixes
-----------
Expand Down
1 change: 1 addition & 0 deletions documentation/cli/change_log.rst
Expand Up @@ -8,6 +8,7 @@ since v0.12.0 | November XX, 2022
=================================

* Add ``--resolution``, ``--timezone`` and ``--to-file`` options to ``flexmeasures show beliefs``, to show beliefs data in a custom resolution and/or timezone, and also to save shown beliefs data to a CSV file.
* Add options to ``flexmeasures add beliefs`` to 1) read CSV data with timezone naive datetimes (use ``--timezone`` to localize the data), 2) read CSV data with datetime/timedelta units (use ``--unit datetime`` or ``--unit timedelta``, 3) remove rows with NaN values, and 4) add filter to read-in data by matching values in specific columns (use ``--filter-column`` and ``--filter-value`` together).
* Fix ``flexmeasures db-ops dump`` and ``flexmeasures db-ops restore`` incorrectly reporting a success when `pg_dump` and `pg_restore` are not installed.

since v0.11.0 | August 28, 2022
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/api/v1_3/tests/test_api_v1_3.py
Expand Up @@ -113,7 +113,9 @@ def test_post_udi_event_and_get_device_message(
if "targets" in message:
start_soc = message["value"] / 1000 # in MWh
soc_schedule = integrate_time_series(
consumption_schedule, start_soc, decimal_precision=6
consumption_schedule,
start_soc,
decimal_precision=6,
)
print(consumption_schedule)
print(soc_schedule)
Expand Down
4 changes: 3 additions & 1 deletion flexmeasures/api/v3_0/tests/test_sensor_schedules.py
Expand Up @@ -91,7 +91,9 @@ def test_trigger_and_get_schedule(
if "targets" in message:
start_soc = message["soc-at-start"] / 1000 # in MWh
soc_schedule = integrate_time_series(
consumption_schedule, start_soc, decimal_precision=6
consumption_schedule,
start_soc,
decimal_precision=6,
)
print(consumption_schedule)
print(soc_schedule)
Expand Down
69 changes: 56 additions & 13 deletions flexmeasures/cli/data_add.py
Expand Up @@ -286,8 +286,9 @@ def add_initial_structure():
@click.argument("file", type=click.Path(exists=True))
@click.option(
"--sensor-id",
"sensor",
required=True,
type=click.IntRange(min=1),
type=SensorIdField(),
help="Sensor to which the beliefs pertain.",
)
@click.option(
Expand All @@ -301,6 +302,8 @@ def add_initial_structure():
required=False,
type=str,
help="Unit of the data, for conversion to the sensor unit, if possible (a string unit such as 'kW' or 'm³/h').\n"
"Measurements of time itself that are formatted as a 'datetime' or 'timedelta' can be converted to a sensor unit representing time (such as 's' or 'h'),\n"
"where datetimes are represented as a duration with respect to the UNIX epoch."
"Hint: to switch the sign of the data, prepend a minus sign.\n"
"For example, when assigning kW consumption data to a kW production sensor, use '-kW'.",
)
Expand Down Expand Up @@ -341,6 +344,12 @@ def add_initial_structure():
multiple=True,
help="Additional strings to recognize as NaN values. This argument can be given multiple times.",
)
@click.option(
"--keep-default-na",
default=False,
type=bool,
help="Whether or not to keep NaN values in the data.",
)
@click.option(
"--nrows",
required=False,
Expand All @@ -367,6 +376,24 @@ def add_initial_structure():
type=int,
help="Column number with datetimes",
)
@click.option(
"--timezone",
required=False,
default=None,
help="timezone as string, e.g. 'UTC' or 'Europe/Amsterdam'",
)
@click.option(
"--filter-column",
"filter_columns",
multiple=True,
help="Set a column number to filter data. Use together with --filter-value.",
)
@click.option(
"--filter-value",
"filter_values",
multiple=True,
help="Set a column value to filter data. Only rows with this value will be added. Use together with --filter-column.",
)
@click.option(
"--delimiter",
required=True,
Expand Down Expand Up @@ -396,19 +423,23 @@ def add_initial_structure():
)
def add_beliefs(
file: str,
sensor_id: int,
sensor: Sensor,
source: str,
filter_columns: List[int],
filter_values: List[int],
unit: Optional[str] = None,
horizon: Optional[int] = None,
cp: Optional[float] = None,
resample: bool = True,
allow_overwrite: bool = False,
skiprows: int = 1,
na_values: list[str] | None = None,
keep_default_na: bool = False,
nrows: Optional[int] = None,
datecol: int = 0,
valuecol: int = 1,
beliefcol: Optional[int] = None,
timezone: Optional[str] = None,
delimiter: str = ",",
decimal: str = ".",
thousands: Optional[str] = None,
Expand All @@ -433,17 +464,7 @@ def add_beliefs(
In case no --horizon is specified and no beliefcol is specified,
the moment of executing this CLI command is taken as the time at which the beliefs were recorded.
"""
sensor = Sensor.query.filter(Sensor.id == sensor_id).one_or_none()
if sensor is None:
print(f"Failed to create beliefs: no sensor found with ID {sensor_id}.")
return
if source.isdigit():
_source = get_source_or_none(int(source), source_type="CLI script")
if not _source:
print(f"Failed to find source {source}.")
return
else:
_source = get_or_create_source(source, source_type="CLI script")
_source = parse_source(source)

# Set up optional parameters for read_csv
if file.split(".")[-1].lower() == "csv":
Expand All @@ -458,6 +479,14 @@ def add_beliefs(
elif beliefcol is None:
kwargs["belief_time"] = server_now().astimezone(pytz.timezone(sensor.timezone))

# Set up optional filters:
if len(filter_columns) != len(filter_values):
raise ValueError(
"The number of filter columns and filter values should be the same."
)
filter_by_column = (
dict(zip(filter_columns, filter_values)) if filter_columns else None
)
bdf = tb.read_csv(
file,
sensor,
Expand All @@ -472,6 +501,9 @@ def add_beliefs(
else [datecol, beliefcol, valuecol],
parse_dates=True,
na_values=na_values,
keep_default_na=keep_default_na,
timezone=timezone,
filter_by_column=filter_by_column,
**kwargs,
)
duplicate_rows = bdf.index.duplicated(keep="first")
Expand Down Expand Up @@ -1099,3 +1131,14 @@ def check_errors(errors: Dict[str, List[str]]):
f"Please correct the following errors:\n{errors}.\n Use the --help flag to learn more."
)
raise click.Abort


def parse_source(source):
if source.isdigit():
_source = get_source_or_none(int(source))
if not _source:
print(f"Failed to find source {source}.")
return
else:
_source = get_or_create_source(source, source_type="CLI script")
return _source
12 changes: 10 additions & 2 deletions flexmeasures/data/queries/data_sources.py
Expand Up @@ -42,7 +42,15 @@ def get_or_create_source(
return _source


def get_source_or_none(source: int, source_type: str) -> Optional[DataSource]:
query = DataSource.query.filter(DataSource.type == source_type)
def get_source_or_none(
source: int | str, source_type: str | None = None
) -> DataSource | None:
"""
:param source: source id
:param source_type: optionally, filter by source type
"""
query = DataSource.query
if source_type is not None:
query = query.filter(DataSource.type == source_type)
query = query.filter(DataSource.id == int(source))
return query.one_or_none()
23 changes: 23 additions & 0 deletions flexmeasures/utils/unit_utils.py
Expand Up @@ -8,6 +8,7 @@
Time series with fixed resolution can be converted from units of flow to units of stock (such as 'kW' to 'kWh'), and vice versa.
Percentages can be converted to units of some physical capacity if a capacity is known (such as '%' to 'kWh').
"""
from __future__ import annotations

from datetime import timedelta
from typing import List, Optional, Union
Expand Down Expand Up @@ -207,6 +208,26 @@ def is_energy_price_unit(unit: str) -> bool:
return False


def _convert_time_units(
data: Union[tb.BeliefsSeries, pd.Series, List[Union[int, float]], int, float],
from_unit: str,
to_unit: str,
):
"""Convert data with datetime or timedelta dtypes to float values.
Use Unix epoch or the requested time unit, respectively.
"""
if not to_unit[0].isdigit():
# unit abbreviations passed to pd.Timedelta need a number (so, for example, h becomes 1h)
to_unit = f"1{to_unit}"
if from_unit == "datetime":
return (
pd.to_datetime(data, utc=True) - pd.Timestamp("1970-01-01", tz="utc")
) // pd.Timedelta(to_unit)
else:
return data / pd.Timedelta(to_unit)


def convert_units(
data: Union[tb.BeliefsSeries, pd.Series, List[Union[int, float]], int, float],
from_unit: str,
Expand All @@ -215,6 +236,8 @@ def convert_units(
capacity: Optional[str] = None,
) -> Union[pd.Series, List[Union[int, float]], int, float]:
"""Updates data values to reflect the given unit conversion."""
if from_unit in ("datetime", "timedelta"):
return _convert_time_units(data, from_unit, to_unit)

if from_unit != to_unit:
from_magnitudes = (
Expand Down
2 changes: 1 addition & 1 deletion requirements/app.in
Expand Up @@ -28,7 +28,7 @@ tldextract
pyomo>=5.6
tabulate
timetomodel>=0.7.1
timely-beliefs>=1.12
timely-beliefs[forecast]>=1.13
python-dotenv
# a backport, not needed in Python3.8
importlib_metadata
Expand Down

0 comments on commit b1bd21d

Please sign in to comment.