/
charging_station.py
117 lines (103 loc) · 4.63 KB
/
charging_station.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import Union
from datetime import datetime, timedelta
from pandas import Series, Timestamp
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.planning.solver import device_scheduler
from flexmeasures.data.models.planning.utils import (
initialize_df,
initialize_series,
add_tiny_price_slope,
get_prices,
fallback_charging_policy,
)
def schedule_charging_station(
sensor: Sensor,
start: datetime,
end: datetime,
resolution: timedelta,
soc_at_start: float,
soc_targets: Series,
prefer_charging_sooner: bool = True,
) -> Union[Series, None]:
"""Schedule a charging station asset based directly on the latest beliefs regarding market prices within the specified time
window.
For the resulting consumption schedule, consumption is defined as positive values.
Todo: handle uni-directional charging by setting the "min" or "derivative min" constraint to 0
"""
# Check for required Sensor attributes
sensor.check_required_attributes([("capacity_in_mw", (float, int))])
# Check for known prices or price forecasts, trimming planning window accordingly
prices, (start, end) = get_prices(
sensor, (start, end), resolution, allow_trimmed_query_window=True
)
# soc targets are at the end of each time slot, while prices are indexed by the start of each time slot
soc_targets = soc_targets.tz_convert("UTC")
start = Timestamp(start).tz_convert("UTC")
end = Timestamp(end).tz_convert("UTC")
soc_targets = soc_targets[start + resolution : end]
# Add tiny price slope to prefer charging now rather than later, and discharging later rather than now.
# We penalise the future with at most 1 per thousand times the price spread.
if prefer_charging_sooner:
prices = add_tiny_price_slope(prices, "event_value")
# Set up commitments to optimise for
commitment_quantities = [initialize_series(0, start, end, resolution)]
# Todo: convert to EUR/(deviation of commitment, which is in MW)
commitment_upwards_deviation_price = [
prices.loc[start : end - resolution]["event_value"]
]
commitment_downwards_deviation_price = [
prices.loc[start : end - resolution]["event_value"]
]
# Set up device constraints (only one device for this EMS)
columns = [
"equals",
"max",
"min",
"derivative equals",
"derivative max",
"derivative min",
]
device_constraints = [initialize_df(columns, start, end, resolution)]
device_constraints[0]["equals"] = soc_targets.shift(-1, freq=resolution).values * (
timedelta(hours=1) / resolution
) - soc_at_start * (
timedelta(hours=1) / resolution
) # shift "equals" constraint for target SOC by one resolution (the target defines a state at a certain time,
# while the "equals" constraint defines what the total stock should be at the end of a time slot,
# where the time slot is indexed by its starting time)
device_constraints[0]["min"] = -soc_at_start * (
timedelta(hours=1) / resolution
) # Can't drain the EV battery by more than it contains
device_constraints[0]["max"] = max(soc_targets.values) * (
timedelta(hours=1) / resolution
) - soc_at_start * (
timedelta(hours=1) / resolution
) # Lacking information about the battery's nominal capacity, we use the highest target value as the maximum state of charge
if sensor.get_attribute("is_strictly_non_positive"):
device_constraints[0]["derivative min"] = 0
else:
device_constraints[0]["derivative min"] = (
sensor.get_attribute("capacity_in_mw") * -1
)
if sensor.get_attribute("is_strictly_non_negative"):
device_constraints[0]["derivative max"] = 0
else:
device_constraints[0]["derivative max"] = sensor.get_attribute("capacity_in_mw")
# Set up EMS constraints (no additional constraints)
columns = ["derivative max", "derivative min"]
ems_constraints = initialize_df(columns, start, end, resolution)
ems_schedule, expected_costs, scheduler_results = device_scheduler(
device_constraints,
ems_constraints,
commitment_quantities,
commitment_downwards_deviation_price,
commitment_upwards_deviation_price,
)
if scheduler_results.solver.termination_condition == "infeasible":
# Fallback policy if the problem was unsolvable
charging_station_schedule = fallback_charging_policy(
sensor, device_constraints[0], start, end, resolution
)
else:
charging_station_schedule = ems_schedule[0]
return charging_station_schedule