/
test_solver.py
221 lines (199 loc) · 8.82 KB
/
test_solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from datetime import datetime, timedelta
import pytest
import numpy as np
import pandas as pd
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.planning.battery import schedule_battery
from flexmeasures.data.models.planning.charging_station import schedule_charging_station
from flexmeasures.utils.calculations import integrate_time_series
from flexmeasures.utils.time_utils import as_server_time
TOLERANCE = 0.00001
def test_battery_solver_day_1(add_battery_assets):
epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
battery = Sensor.query.filter(Sensor.name == "Test battery").one_or_none()
assert Sensor.query.get(battery.get_attribute("market_id")) == epex_da
start = as_server_time(datetime(2015, 1, 1))
end = as_server_time(datetime(2015, 1, 2))
resolution = timedelta(minutes=15)
soc_at_start = battery.get_attribute("soc_in_mwh")
schedule = schedule_battery(battery, start, end, resolution, soc_at_start)
soc_schedule = integrate_time_series(schedule, soc_at_start, decimal_precision=6)
with pd.option_context("display.max_rows", None, "display.max_columns", 3):
print(soc_schedule)
# Check if constraints were met
assert (
min(schedule.values) >= battery.get_attribute("capacity_in_mw") * -1 - TOLERANCE
)
assert max(schedule.values) <= battery.get_attribute("capacity_in_mw")
for soc in soc_schedule.values:
assert soc >= battery.get_attribute("min_soc_in_mwh")
assert soc <= battery.get_attribute("max_soc_in_mwh")
@pytest.mark.parametrize(
"roundtrip_efficiency",
[
1,
0.99,
0.01,
],
)
def test_battery_solver_day_2(add_battery_assets, roundtrip_efficiency: float):
"""Check battery scheduling results for day 2, which is set up with
8 expensive, then 8 cheap, then again 8 expensive hours.
If efficiency losses aren't too bad, we expect the scheduler to:
- completely discharge within the first 8 hours
- completely charge within the next 8 hours
- completely discharge within the last 8 hours
If efficiency losses are bad, the price difference is not worth cycling the battery,
and so we expect the scheduler to only:
- completely discharge within the last 8 hours
"""
epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
battery = Sensor.query.filter(Sensor.name == "Test battery").one_or_none()
assert Sensor.query.get(battery.get_attribute("market_id")) == epex_da
start = as_server_time(datetime(2015, 1, 2))
end = as_server_time(datetime(2015, 1, 3))
resolution = timedelta(minutes=15)
soc_at_start = battery.get_attribute("soc_in_mwh")
soc_min = 0.5
soc_max = 4.5
schedule = schedule_battery(
battery,
start,
end,
resolution,
soc_at_start,
soc_min=soc_min,
soc_max=soc_max,
roundtrip_efficiency=roundtrip_efficiency,
)
soc_schedule = integrate_time_series(schedule, soc_at_start, decimal_precision=6)
with pd.option_context("display.max_rows", None, "display.max_columns", 3):
print(soc_schedule)
# Check if constraints were met
assert min(schedule.values) >= battery.get_attribute("capacity_in_mw") * -1
assert max(schedule.values) <= battery.get_attribute("capacity_in_mw") + TOLERANCE
for soc in soc_schedule.values:
assert soc >= max(soc_min, battery.get_attribute("min_soc_in_mwh"))
assert soc <= battery.get_attribute("max_soc_in_mwh")
# Check whether the resulting soc schedule follows our expectations for 8 expensive, 8 cheap and 8 expensive hours
assert soc_schedule.iloc[-1] == max(
soc_min, battery.get_attribute("min_soc_in_mwh")
) # Battery sold out at the end of its planning horizon
# As long as the roundtrip efficiency isn't too bad (I haven't computed the actual switch point)
if roundtrip_efficiency > 0.9:
assert soc_schedule.loc[start + timedelta(hours=8)] == max(
soc_min, battery.get_attribute("min_soc_in_mwh")
) # Sell what you begin with
assert soc_schedule.loc[start + timedelta(hours=16)] == min(
soc_max, battery.get_attribute("max_soc_in_mwh")
) # Buy what you can to sell later
else:
# If the roundtrip efficiency is poor, best to stand idle
assert soc_schedule.loc[start + timedelta(hours=8)] == battery.get_attribute(
"soc_in_mwh"
)
assert soc_schedule.loc[start + timedelta(hours=16)] == battery.get_attribute(
"soc_in_mwh"
)
@pytest.mark.parametrize(
"target_soc, charging_station_name",
[
(1, "Test charging station"),
(5, "Test charging station"),
(0, "Test charging station (bidirectional)"),
(5, "Test charging station (bidirectional)"),
],
)
def test_charging_station_solver_day_2(target_soc, charging_station_name):
"""Starting with a state of charge 1 kWh, within 2 hours we should be able to reach
any state of charge in the range [1, 5] kWh for a unidirectional station,
or [0, 5] for a bidirectional station, given a charging capacity of 2 kW.
"""
soc_at_start = 1
duration_until_target = timedelta(hours=2)
epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
charging_station = Sensor.query.filter(
Sensor.name == charging_station_name
).one_or_none()
assert charging_station.get_attribute("capacity_in_mw") == 2
assert Sensor.query.get(charging_station.get_attribute("market_id")) == epex_da
start = as_server_time(datetime(2015, 1, 2))
end = as_server_time(datetime(2015, 1, 3))
resolution = timedelta(minutes=15)
target_soc_datetime = start + duration_until_target
soc_targets = pd.Series(
np.nan, index=pd.date_range(start, end, freq=resolution, closed="right")
)
soc_targets.loc[target_soc_datetime] = target_soc
consumption_schedule = schedule_charging_station(
charging_station, start, end, resolution, soc_at_start, soc_targets
)
soc_schedule = integrate_time_series(
consumption_schedule, soc_at_start, decimal_precision=6
)
# Check if constraints were met
assert (
min(consumption_schedule.values)
>= charging_station.get_attribute("capacity_in_mw") * -1
)
assert (
max(consumption_schedule.values)
<= charging_station.get_attribute("capacity_in_mw") + TOLERANCE
)
print(consumption_schedule.head(12))
print(soc_schedule.head(12))
assert abs(soc_schedule.loc[target_soc_datetime] - target_soc) < TOLERANCE
@pytest.mark.parametrize(
"target_soc, charging_station_name",
[
(9, "Test charging station"),
(15, "Test charging station"),
(5, "Test charging station (bidirectional)"),
(15, "Test charging station (bidirectional)"),
],
)
def test_fallback_to_unsolvable_problem(target_soc, charging_station_name):
"""Starting with a state of charge 10 kWh, within 2 hours we should be able to reach
any state of charge in the range [10, 14] kWh for a unidirectional station,
or [6, 14] for a bidirectional station, given a charging capacity of 2 kW.
Here we test target states of charge outside that range, ones that we should be able
to get as close to as 1 kWh difference.
We want our scheduler to handle unsolvable problems like these with a sensible fallback policy.
"""
soc_at_start = 10
duration_until_target = timedelta(hours=2)
expected_gap = 1
epex_da = Sensor.query.filter(Sensor.name == "epex_da").one_or_none()
charging_station = Sensor.query.filter(
Sensor.name == charging_station_name
).one_or_none()
assert charging_station.get_attribute("capacity_in_mw") == 2
assert Sensor.query.get(charging_station.get_attribute("market_id")) == epex_da
start = as_server_time(datetime(2015, 1, 2))
end = as_server_time(datetime(2015, 1, 3))
resolution = timedelta(minutes=15)
target_soc_datetime = start + duration_until_target
soc_targets = pd.Series(
np.nan, index=pd.date_range(start, end, freq=resolution, closed="right")
)
soc_targets.loc[target_soc_datetime] = target_soc
consumption_schedule = schedule_charging_station(
charging_station, start, end, resolution, soc_at_start, soc_targets
)
soc_schedule = integrate_time_series(
consumption_schedule, soc_at_start, decimal_precision=6
)
# Check if constraints were met
assert (
min(consumption_schedule.values)
>= charging_station.get_attribute("capacity_in_mw") * -1
)
assert max(consumption_schedule.values) <= charging_station.get_attribute(
"capacity_in_mw"
)
print(consumption_schedule.head(12))
print(soc_schedule.head(12))
assert (
abs(abs(soc_schedule.loc[target_soc_datetime] - target_soc) - expected_gap)
< TOLERANCE
)