/
battery.py
112 lines (99 loc) · 4.18 KB
/
battery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from typing import Optional, Union
from datetime import datetime, timedelta
import pandas as pd
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.planning.solver import device_scheduler
from flexmeasures.data.models.planning.utils import (
initialize_df,
initialize_series,
add_tiny_price_slope,
get_prices,
fallback_charging_policy,
)
def schedule_battery(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
soc_at_start: float,
soc_targets: Optional[pd.Series] = None,
prefer_charging_sooner: bool = True,
) -> Union[pd.Series, None]:
"""Schedule a battery asset based directly on the latest beliefs regarding market prices within the specified time
window.
For the resulting consumption schedule, consumption is defined as positive values.
"""
# Check for required Sensor attributes
sensor.check_required_attributes(
[
("capacity_in_mw", (float, int)),
("max_soc_in_mwh", (float, int)),
("min_soc_in_mwh", (float, int)),
],
)
# Check for known prices or price forecasts, trimming planning window accordingly
prices, (start, end) = get_prices(
sensor, (start, end), resolution, allow_trimmed_query_window=True
)
if soc_targets is not None:
# soc targets are at the end of each time slot, while prices are indexed by the start of each time slot
soc_targets = soc_targets[start + resolution : end]
# Add tiny price slope to prefer charging now rather than later, and discharging later rather than now.
# We penalise the future with at most 1 per thousand times the price spread.
if prefer_charging_sooner:
prices = add_tiny_price_slope(prices, "event_value")
# Set up commitments to optimise for
commitment_quantities = [initialize_series(0, start, end, resolution)]
# Todo: convert to EUR/(deviation of commitment, which is in MW)
commitment_upwards_deviation_price = [
prices.loc[start : end - resolution]["event_value"]
]
commitment_downwards_deviation_price = [
prices.loc[start : end - resolution]["event_value"]
]
# Set up device constraints (only one device for this EMS)
columns = [
"equals",
"max",
"min",
"derivative equals",
"derivative max",
"derivative min",
]
device_constraints = [initialize_df(columns, start, end, resolution)]
if soc_targets is not None:
device_constraints[0]["equals"] = soc_targets.shift(
-1, freq=resolution
).values * (timedelta(hours=1) / resolution) - soc_at_start * (
timedelta(hours=1) / resolution
) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time,
# while the "equals" constraint defines what the total stock should be at the end of a time slot,
# where the time slot is indexed by its starting time)
device_constraints[0]["min"] = (
sensor.get_attribute("min_soc_in_mwh") - soc_at_start
) * (timedelta(hours=1) / resolution)
device_constraints[0]["max"] = (
sensor.get_attribute("max_soc_in_mwh") - soc_at_start
) * (timedelta(hours=1) / resolution)
device_constraints[0]["derivative min"] = (
sensor.get_attribute("capacity_in_mw") * -1
)
device_constraints[0]["derivative max"] = sensor.get_attribute("capacity_in_mw")
# Set up EMS constraints (no additional constraints)
columns = ["derivative max", "derivative min"]
ems_constraints = initialize_df(columns, start, end, resolution)
ems_schedule, expected_costs, scheduler_results = device_scheduler(
device_constraints,
ems_constraints,
commitment_quantities,
commitment_downwards_deviation_price,
commitment_upwards_deviation_price,
)
if scheduler_results.solver.termination_condition == "infeasible":
# Fallback policy if the problem was unsolvable
battery_schedule = fallback_charging_policy(
sensor, device_constraints[0], start, end, resolution
)
else:
battery_schedule = ems_schedule[0]
return battery_schedule