/
test_api_v1_3.py
199 lines (181 loc) · 8.61 KB
/
test_api_v1_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from flask import url_for
import pytest
from datetime import timedelta
from isodate import parse_datetime
import pandas as pd
from rq.job import Job
from flexmeasures.api.common.responses import unrecognized_event
from flexmeasures.api.tests.utils import get_auth_token
from flexmeasures.api.v1_3.tests.utils import (
message_for_get_device_message,
message_for_post_udi_event,
)
from flexmeasures.data.models.data_sources import DataSource
from flexmeasures.data.models.time_series import Sensor, TimedBelief
from flexmeasures.data.tests.utils import work_on_rq
from flexmeasures.data.services.scheduling import handle_scheduling_exception
from flexmeasures.utils.calculations import integrate_time_series
@pytest.mark.parametrize("message", [message_for_get_device_message(wrong_id=True)])
def test_get_device_message_wrong_event_id(client, message):
sensor = Sensor.query.filter(Sensor.name == "Test battery").one_or_none()
message["event"] = message["event"] % sensor.id
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
get_device_message_response = client.get(
url_for("flexmeasures_api_v1_3.get_device_message"),
query_string=message,
headers={"content-type": "application/json", "Authorization": auth_token},
)
print("Server responded with:\n%s" % get_device_message_response.json)
assert get_device_message_response.status_code == 400
assert get_device_message_response.json["type"] == "GetDeviceMessageResponse"
assert (
get_device_message_response.json["status"]
== unrecognized_event(9999, "soc")[0]["status"]
)
@pytest.mark.parametrize(
"message, asset_name",
[
(message_for_post_udi_event(), "Test battery"),
(message_for_post_udi_event(targets=True), "Test charging station"),
],
)
def test_post_udi_event_and_get_device_message(
app, add_charging_station_assets, message, asset_name
):
auth_token = None
with app.test_client() as client:
sensor = Sensor.query.filter(Sensor.name == asset_name).one_or_none()
message["event"] = message["event"] % sensor.id
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
post_udi_event_response = client.post(
url_for("flexmeasures_api_v1_3.post_udi_event"),
json=message,
headers={"Authorization": auth_token},
)
print("Server responded with:\n%s" % post_udi_event_response.json)
assert post_udi_event_response.status_code == 200
assert post_udi_event_response.json["type"] == "PostUdiEventResponse"
# test database state
msg_dt = message["datetime"]
sensor = Sensor.query.filter(Sensor.name == asset_name).one_or_none()
assert sensor.generic_asset.get_attribute("soc_datetime") == msg_dt
assert sensor.generic_asset.get_attribute("soc_in_mwh") == message["value"] / 1000
assert sensor.generic_asset.get_attribute("soc_udi_event_id") == 204
# look for scheduling jobs in queue
assert (
len(app.queues["scheduling"]) == 1
) # only 1 schedule should be made for 1 asset
job = app.queues["scheduling"].jobs[0]
assert job.kwargs["sensor_id"] == sensor.id
assert job.kwargs["start"] == parse_datetime(message["datetime"])
assert job.id == message["event"]
# process the scheduling queue
work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception)
assert (
Job.fetch(
message["event"], connection=app.queues["scheduling"].connection
).is_finished
is True
)
# check results are in the database
resolution = timedelta(minutes=15)
scheduler_source = DataSource.query.filter_by(
name="Seita", type="scheduling script"
).one_or_none()
assert (
scheduler_source is not None
) # Make sure the scheduler data source is now there
power_values = (
TimedBelief.query.filter(TimedBelief.sensor_id == sensor.id)
.filter(TimedBelief.source_id == scheduler_source.id)
.all()
)
consumption_schedule = pd.Series(
[-v.event_value for v in power_values],
index=pd.DatetimeIndex([v.event_start for v in power_values], freq=resolution),
) # For consumption schedules, positive values denote consumption. For the db, consumption is negative
assert (
len(consumption_schedule)
== app.config.get("FLEXMEASURES_PLANNING_HORIZON") / resolution
)
# check targets, if applicable
if "targets" in message:
start_soc = message["value"] / 1000 # in MWh
soc_schedule = integrate_time_series(consumption_schedule, start_soc, 6)
print(consumption_schedule)
print(soc_schedule)
for target in message["targets"]:
assert soc_schedule[target["datetime"]] == target["value"] / 1000
# try to retrieve the schedule through the getDeviceMessage api endpoint
get_device_message = message_for_get_device_message()
get_device_message["event"] = get_device_message["event"] % sensor.id
auth_token = get_auth_token(client, "test_prosumer_user@seita.nl", "testtest")
get_device_message_response = client.get(
url_for("flexmeasures_api_v1_3.get_device_message"),
query_string=get_device_message,
headers={"content-type": "application/json", "Authorization": auth_token},
)
print("Server responded with:\n%s" % get_device_message_response.json)
assert get_device_message_response.status_code == 200
assert get_device_message_response.json["type"] == "GetDeviceMessageResponse"
assert len(get_device_message_response.json["values"]) == 192
# Test that a shorter planning horizon yields the same result for the shorter planning horizon
get_device_message["duration"] = "PT6H"
get_device_message_response_short = client.get(
url_for("flexmeasures_api_v1_3.get_device_message"),
query_string=get_device_message,
headers={"content-type": "application/json", "Authorization": auth_token},
)
assert (
get_device_message_response_short.json["values"]
== get_device_message_response.json["values"][0:24]
)
# Test that a much longer planning horizon yields the same result (when there are only 2 days of prices)
get_device_message["duration"] = "PT1000H"
get_device_message_response_long = client.get(
url_for("flexmeasures_api_v1_3.get_device_message"),
query_string=get_device_message,
headers={"content-type": "application/json", "Authorization": auth_token},
)
assert (
get_device_message_response_long.json["values"][0:192]
== get_device_message_response.json["values"]
)
# sending again results in an error, unless we increase the event ID
with app.test_client() as client:
next_msg_dt = parse_datetime(msg_dt) + timedelta(minutes=5)
message["datetime"] = next_msg_dt.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
post_udi_event_response = client.post(
url_for("flexmeasures_api_v1_3.post_udi_event"),
json=message,
headers={"Authorization": auth_token},
)
print("Server responded with:\n%s" % post_udi_event_response.json)
assert post_udi_event_response.status_code == 400
assert post_udi_event_response.json["type"] == "PostUdiEventResponse"
assert post_udi_event_response.json["status"] == "OUTDATED_UDI_EVENT"
message["event"] = message["event"].replace("204", "205")
post_udi_event_response = client.post(
url_for("flexmeasures_api_v1_3.post_udi_event"),
json=message,
headers={"Authorization": auth_token},
)
print("Server responded with:\n%s" % post_udi_event_response.json)
assert post_udi_event_response.status_code == 200
assert post_udi_event_response.json["type"] == "PostUdiEventResponse"
# test database state
sensor = Sensor.query.filter(Sensor.name == asset_name).one_or_none()
assert parse_datetime(
sensor.generic_asset.get_attribute("soc_datetime")
) == parse_datetime(message["datetime"])
assert sensor.generic_asset.get_attribute("soc_in_mwh") == message["value"] / 1000
assert sensor.generic_asset.get_attribute("soc_udi_event_id") == 205
# process the scheduling queue
work_on_rq(app.queues["scheduling"], exc_handler=handle_scheduling_exception)
# the job still fails due to missing prices for the last time slot, but we did test that the api and worker now processed the UDI event and attempted to create a schedule
assert (
Job.fetch(
message["event"], connection=app.queues["scheduling"].connection
).is_failed
is True
)