/
storage.py
178 lines (159 loc) · 7.16 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from datetime import datetime, timedelta
from typing import Optional, List, Union
import pandas as pd
from flexmeasures import Sensor
from flexmeasures.data.models.planning import Scheduler
from flexmeasures.data.models.planning.linear_optimization import device_scheduler
from flexmeasures.data.models.planning.utils import (
get_prices,
add_tiny_price_slope,
initialize_series,
initialize_df,
get_power_values,
fallback_charging_policy,
)
class StorageScheduler(Scheduler):
__version__ = "1"
__author__ = "Seita"
def schedule(
self,
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
storage_specs: dict,
consumption_price_sensor: Optional[Sensor] = None,
production_price_sensor: Optional[Sensor] = None,
inflexible_device_sensors: Optional[List[Sensor]] = None,
belief_time: Optional[datetime] = None,
round_to_decimals: Optional[int] = 6,
) -> Union[pd.Series, None]:
"""Schedule a battery or Charge Point based directly on the latest beliefs regarding market prices within the specified time window.
For the resulting consumption schedule, consumption is defined as positive values.
"""
soc_at_start = storage_specs.get("soc_at_start")
soc_targets = storage_specs.get("soc_targets")
soc_min = storage_specs.get("soc_min")
soc_max = storage_specs.get("soc_max")
roundtrip_efficiency = storage_specs.get("roundtrip_efficiency")
prefer_charging_sooner = storage_specs.get("prefer_charging_sooner", True)
# Check for required Sensor attributes
sensor.check_required_attributes([("capacity_in_mw", (float, int))])
# Check for known prices or price forecasts, trimming planning window accordingly
up_deviation_prices, (start, end) = get_prices(
(start, end),
resolution,
beliefs_before=belief_time,
price_sensor=consumption_price_sensor,
sensor=sensor,
allow_trimmed_query_window=False,
)
down_deviation_prices, (start, end) = get_prices(
(start, end),
resolution,
beliefs_before=belief_time,
price_sensor=production_price_sensor,
sensor=sensor,
allow_trimmed_query_window=False,
)
start = pd.Timestamp(start).tz_convert("UTC")
end = pd.Timestamp(end).tz_convert("UTC")
# Add tiny price slope to prefer charging now rather than later, and discharging later rather than now.
# We penalise the future with at most 1 per thousand times the price spread.
if prefer_charging_sooner:
up_deviation_prices = add_tiny_price_slope(
up_deviation_prices, "event_value"
)
down_deviation_prices = add_tiny_price_slope(
down_deviation_prices, "event_value"
)
# Set up commitments to optimise for
commitment_quantities = [initialize_series(0, start, end, resolution)]
# Todo: convert to EUR/(deviation of commitment, which is in MW)
commitment_upwards_deviation_price = [
up_deviation_prices.loc[start : end - resolution]["event_value"]
]
commitment_downwards_deviation_price = [
down_deviation_prices.loc[start : end - resolution]["event_value"]
]
# Set up device constraints: only one scheduled flexible device for this EMS (at index 0), plus the forecasted inflexible devices (at indices 1 to n).
columns = [
"equals",
"max",
"min",
"derivative equals",
"derivative max",
"derivative min",
"derivative down efficiency",
"derivative up efficiency",
]
if inflexible_device_sensors is None:
inflexible_device_sensors = []
device_constraints = [
initialize_df(columns, start, end, resolution)
for i in range(1 + len(inflexible_device_sensors))
]
for i, inflexible_sensor in enumerate(inflexible_device_sensors):
device_constraints[i + 1]["derivative equals"] = get_power_values(
query_window=(start, end),
resolution=resolution,
beliefs_before=belief_time,
sensor=inflexible_sensor,
)
if soc_targets is not None and not soc_targets.empty:
soc_targets = soc_targets.tz_convert("UTC")
device_constraints[0]["equals"] = soc_targets.shift(
-1, freq=resolution
).values * (timedelta(hours=1) / resolution) - soc_at_start * (
timedelta(hours=1) / resolution
) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time,
# while the "equals" constraint defines what the total stock should be at the end of a time slot,
# where the time slot is indexed by its starting time)
device_constraints[0]["min"] = (soc_min - soc_at_start) * (
timedelta(hours=1) / resolution
)
device_constraints[0]["max"] = (soc_max - soc_at_start) * (
timedelta(hours=1) / resolution
)
if sensor.get_attribute("is_strictly_non_positive"):
device_constraints[0]["derivative min"] = 0
else:
device_constraints[0]["derivative min"] = (
sensor.get_attribute("capacity_in_mw") * -1
)
if sensor.get_attribute("is_strictly_non_negative"):
device_constraints[0]["derivative max"] = 0
else:
device_constraints[0]["derivative max"] = sensor.get_attribute(
"capacity_in_mw"
)
# Apply round-trip efficiency evenly to charging and discharging
device_constraints[0]["derivative down efficiency"] = (
roundtrip_efficiency**0.5
)
device_constraints[0]["derivative up efficiency"] = roundtrip_efficiency**0.5
# Set up EMS constraints
columns = ["derivative max", "derivative min"]
ems_constraints = initialize_df(columns, start, end, resolution)
ems_capacity = sensor.generic_asset.get_attribute("capacity_in_mw")
if ems_capacity is not None:
ems_constraints["derivative min"] = ems_capacity * -1
ems_constraints["derivative max"] = ems_capacity
ems_schedule, expected_costs, scheduler_results = device_scheduler(
device_constraints,
ems_constraints,
commitment_quantities,
commitment_downwards_deviation_price,
commitment_upwards_deviation_price,
)
if scheduler_results.solver.termination_condition == "infeasible":
# Fallback policy if the problem was unsolvable
battery_schedule = fallback_charging_policy(
sensor, device_constraints[0], start, end, resolution
)
else:
battery_schedule = ems_schedule[0]
# Round schedule
if round_to_decimals:
battery_schedule = battery_schedule.round(round_to_decimals)
return battery_schedule