-
Notifications
You must be signed in to change notification settings - Fork 30
/
scheduling.py
311 lines (273 loc) · 11.4 KB
/
scheduling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
import os
import sys
import importlib.util
from importlib.abc import Loader
from rq.job import Job
from flask import current_app
import click
from rq import get_current_job
import timely_beliefs as tb
from flexmeasures.data import db
from flexmeasures.data.models.planning.storage import StorageScheduler
from flexmeasures.data.models.planning.utils import ensure_storage_specs
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.planning import Scheduler
from flexmeasures.data.utils import get_data_source, save_to_db
"""
The life cycle of a scheduling job:
1. A scheduling job is born in create_scheduling_job.
2. It is run in make_schedule which writes results to the db.
3. If an error occurs (and the worker is configured accordingly), handle_scheduling_exception comes in.
This might re-enqueue the job or try a different model (which creates a new job).
"""
def create_scheduling_job(
sensor: Sensor,
start_of_schedule: datetime,
end_of_schedule: datetime,
belief_time: datetime,
resolution: timedelta,
consumption_price_sensor: Optional[Sensor] = None,
production_price_sensor: Optional[Sensor] = None,
inflexible_device_sensors: Optional[List[Sensor]] = None,
job_id: Optional[str] = None,
enqueue: bool = True,
storage_specs: Optional[dict] = None,
) -> Job:
"""
Create a new Job, which is queued for later execution.
Before enqueuing, we perform some checks on sensor type and specs, for errors we want to bubble up early.
To support quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event.
That means one event leads to one job (i.e. actions are event driven).
Target SOC values should be indexed by their due date. For example, for quarter-hourly targets between 5 and 6 AM:
>>> df = pd.Series(data=[1, 2, 2.5, 3], index=pd.date_range(datetime(2010,1,1,5), datetime(2010,1,1,6), freq=timedelta(minutes=15), inclusive="right"))
>>> print(df)
2010-01-01 05:15:00 1.0
2010-01-01 05:30:00 2.0
2010-01-01 05:45:00 2.5
2010-01-01 06:00:00 3.0
Freq: 15T, dtype: float64
"""
storage_specs = ensure_storage_specs(
storage_specs, sensor, start_of_schedule, end_of_schedule, resolution
)
job = Job.create(
make_schedule,
kwargs=dict(
sensor_id=sensor.id,
start=start_of_schedule,
end=end_of_schedule,
belief_time=belief_time,
resolution=resolution,
storage_specs=storage_specs,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
), # TODO: maybe also pass these sensors as IDs, to avoid potential db sessions confusion
id=job_id,
connection=current_app.queues["scheduling"].connection,
ttl=int(
current_app.config.get(
"FLEXMEASURES_JOB_TTL", timedelta(-1)
).total_seconds()
),
result_ttl=int(
current_app.config.get(
"FLEXMEASURES_PLANNING_TTL", timedelta(-1)
).total_seconds()
), # NB job.cleanup docs says a negative number of seconds means persisting forever
)
if enqueue:
current_app.queues["scheduling"].enqueue_job(job)
return job
def make_schedule(
sensor_id: int,
start: datetime,
end: datetime,
belief_time: datetime,
resolution: timedelta,
storage_specs: Optional[dict],
consumption_price_sensor: Optional[Sensor] = None,
production_price_sensor: Optional[Sensor] = None,
inflexible_device_sensors: Optional[List[Sensor]] = None,
) -> bool:
"""
This function is meant to be queued as a job.
It thus potentially runs on a different FlexMeasures node than where the job is created.
- Choose which scheduling function can be used
- Compute schedule
- Turn scheduled values into beliefs and save them to db
"""
# https://docs.sqlalchemy.org/en/13/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork
db.engine.dispose()
sensor = Sensor.query.filter_by(id=sensor_id).one_or_none()
rq_job = get_current_job()
if rq_job:
click.echo(
"Running Scheduling Job %s: %s, from %s to %s"
% (rq_job.id, sensor, start, end)
)
data_source_info = {}
# Choose which algorithm to use TODO: unify loading this into a func store concept
if "custom-scheduler" in sensor.attributes:
scheduler_specs = sensor.attributes.get("custom-scheduler")
scheduler, data_source_info = load_custom_scheduler(scheduler_specs)
elif sensor.generic_asset.generic_asset_type.name in (
"battery",
"one-way_evse",
"two-way_evse",
):
scheduler = StorageScheduler
data_source_info["model"] = scheduler.__name__
data_source_info["name"] = scheduler.__author__
data_source_info["version"] = scheduler.__version__
else:
raise ValueError(
"Scheduling is not (yet) supported for asset type %s."
% sensor.generic_asset.generic_asset_type
)
storage_specs = ensure_storage_specs(storage_specs, sensor, start, end, resolution)
consumption_schedule = scheduler().schedule(
sensor,
start,
end,
resolution,
storage_specs=storage_specs,
consumption_price_sensor=consumption_price_sensor,
production_price_sensor=production_price_sensor,
inflexible_device_sensors=inflexible_device_sensors,
belief_time=belief_time,
)
if rq_job:
click.echo("Job %s made schedule." % rq_job.id)
data_source = get_data_source(
data_source_name=data_source_info["name"],
data_source_model=data_source_info["model"],
data_source_version=data_source_info["version"],
data_source_type="scheduling script",
)
# saving info on the job, so the API for a job can look the data up
data_source_info["id"] = data_source.id
if rq_job:
rq_job.meta["data_source_info"] = data_source_info
rq_job.save_meta()
ts_value_schedule = [
TimedBelief(
event_start=dt,
belief_time=belief_time,
event_value=-value,
sensor=sensor,
source=data_source,
)
for dt, value in consumption_schedule.items()
] # For consumption schedules, positive values denote consumption. For the db, consumption is negative
bdf = tb.BeliefsDataFrame(ts_value_schedule)
save_to_db(bdf)
db.session.commit()
return True
def load_custom_scheduler(scheduler_specs: dict) -> Tuple[Scheduler, dict]:
"""
Read in custom scheduling spec.
Attempt to load the Callable, also derive data source info.
The scheduler class should be derived from flexmeasures.data.models.planning.Scheduler.
The Callable is assumed to be named "schedule".
Example specs:
{
"module": "/path/to/module.py", # or sthg importable, e.g. "package.module"
"class": "NameOfSchedulerClass",
}
"""
assert isinstance(
scheduler_specs, dict
), f"Scheduler specs is {type(scheduler_specs)}, should be a dict"
assert "module" in scheduler_specs, "scheduler specs have no 'module'."
assert "class" in scheduler_specs, "scheduler specs have no 'class'"
scheduler_name = scheduler_specs["class"]
source_info = dict(
model=scheduler_name, version="1", name="Unknown author"
) # default
# import module
module_descr = scheduler_specs["module"]
if os.path.exists(module_descr):
spec = importlib.util.spec_from_file_location(scheduler_name, module_descr)
assert spec, f"Could not load specs for scheduling module at {module_descr}."
module = importlib.util.module_from_spec(spec)
sys.modules[scheduler_name] = module
assert isinstance(spec.loader, Loader)
spec.loader.exec_module(module)
else: # assume importable module
try:
module = importlib.import_module(module_descr)
except TypeError as te:
current_app.log.error(f"Cannot load {module_descr}: {te}.")
raise
except ModuleNotFoundError:
current_app.logger.error(
f"Attempted to import module {module_descr} (as it is not a valid file path), but it is not installed."
)
raise
assert module, f"Module {module_descr} could not be loaded."
# get scheduling function
assert hasattr(
module, scheduler_specs["class"]
), "Module at {module_descr} has no class {scheduler_specs['class']}"
scheduler_class = getattr(module, scheduler_specs["class"])
if hasattr(scheduler_class, "__version__"):
source_info["version"] = str(scheduler_class.__version__)
else:
current_app.logger.warning(
f"Scheduler {scheduler_class.__name__} loaded, but has no __version__ attribute."
)
if hasattr(scheduler_class, "__author__"):
source_info["name"] = str(scheduler_class.__author__)
else:
current_app.logger.warning(
f"Scheduler {scheduler_class.__name__} loaded, but has no __author__ attribute."
)
schedule_function_name = "schedule"
if not hasattr(scheduler_class, schedule_function_name):
raise NotImplementedError(
f"No function {schedule_function_name} in {scheduler_class}. Cannot load custom scheduler."
)
return scheduler_class, source_info
def handle_scheduling_exception(job, exc_type, exc_value, traceback):
"""
Store exception as job meta data.
"""
click.echo("HANDLING RQ WORKER EXCEPTION: %s:%s\n" % (exc_type, exc_value))
job.meta["exception"] = exc_value
job.save_meta()
def get_data_source_for_job(
job: Optional[Job], sensor: Optional[Sensor] = None
) -> Optional[DataSource]:
"""
Try to find the data source linked by this scheduling job.
We expect that enough info on the source was placed in the meta dict.
For a transition period, we might have to guess a bit.
TODO: Afterwards, this can be lighter. We should also expect a job and no sensor is needed,
once API v1.3 is deprecated.
"""
data_source_info = None
if job:
data_source_info = job.meta.get("data_source_info")
if data_source_info and "id" in data_source_info:
return DataSource.query.get(data_source_info["id"])
if data_source_info is None and sensor:
data_source_info = dict(name="Seita", model="StorageScheduler")
# TODO: change to raise later (v0.13) - all scheduling jobs now get full info
current_app.logger.warning(
"Looking up scheduling data without knowing full data_source_info (version). This is deprecated soon. Please specify a job id as event or switch to API v3."
)
scheduler_sources = (
DataSource.query.filter_by(
type="scheduling script",
**data_source_info,
)
.order_by(DataSource.version.desc())
.all()
) # Might still be more than one, e.g. per user
if len(scheduler_sources) == 0:
return None
return scheduler_sources[0]