/
test_queue.py
418 lines (333 loc) · 16 KB
/
test_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import pytest
try:
from unittest.mock import patch
except ImportError:
from mock import patch
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
from ckan.plugins import toolkit
from ckan import model
from ckan.lib.base import config
import uuid
class MockHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid='test1', job=harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid='test2', job=harvest_job)
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
obj.add()
obj2.add()
obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return []
def fetch_stage(self, harvest_object):
assert harvest_object.state == "FETCH"
assert harvest_object.fetch_started is not None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert harvest_object.state == "IMPORT"
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
)
# set previous objects to not current
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid == harvest_object.guid) \
.filter(
HarvestObject.current == True # noqa: E712
).first()
if previous_object:
previous_object.current = False
previous_object.save()
# delete test_to_delete package on second run
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if package_dict['name'] == 'test_to_delete' and package_object:
harvest_object.current = False
package_object.state = 'deleted'
package_object.save()
harvest_object.save()
return True
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'clean_queues')
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_harvester')
class TestHarvestQueue(object):
def test_01_basic_harvester(self):
if config.get('ckan.harvest.mq.type') == 'redis':
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
redis.flushdb()
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'added'
assert all_objects[1].state == 'COMPLETE'
assert all_objects[1].report_status == 'added'
assert all_objects[2].state == 'COMPLETE'
assert all_objects[2].report_status == 'added'
# fire run again to check if job is set to Finished
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['status'] == u'Finished'
assert harvest_job['stats'] == {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0}
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert harvest_source_dict['status']['last_job']['stats'] == {
'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0}
assert harvest_source_dict['status']['total_datasets'] == 3
assert harvest_source_dict['status']['job_count'] == 1
# Second run
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
assert len(all_objects) == 3
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert len(all_objects) == 2
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert len(all_objects) == 1
# run to make sure job is marked as finshed
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['stats'] == {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert harvest_source_dict['status']['last_job']['stats'] == {
'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}
assert harvest_source_dict['status']['total_datasets'] == 2
assert harvest_source_dict['status']['job_count'] == 2
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key', 'foobar')
# Create some fake jobs
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
fetch_publisher.send({'harvest_object_id': str(uuid.uuid4())})
num_keys = redis.dbsize()
# Create some fake objects
gather_consumer = queue.get_gather_consumer()
next(gather_consumer.consume(queue.get_gather_queue_name()))
fetch_consumer = queue.get_fetch_consumer()
next(fetch_consumer.consume(queue.get_fetch_queue_name()))
assert redis.dbsize() > num_keys
queue.purge_queues()
assert redis.get('ckanext-harvest:some-random-key') == 'foobar'
assert redis.dbsize() == num_keys
assert redis.llen(queue.get_gather_routing_key()) == 0
assert redis.llen(queue.get_fetch_routing_key()) == 0
finally:
redis.delete('ckanext-harvest:some-random-key')
def test_resubmit_objects(self):
'''
Test that only harvest objects re-submitted which were not be present in the redis fetch queue.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
fetch_routing_key = queue.get_fetch_routing_key()
redis.flushdb()
try:
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
assert redis.llen(fetch_routing_key) == 3
# do only one time for the first harvest object
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 1
all_objects = model.Session.query(HarvestObject).order_by(HarvestObject.state.asc()).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'added'
assert all_objects[0].current is True
assert all_objects[1].state == 'WAITING'
assert all_objects[1].current is False
assert all_objects[2].state == 'WAITING'
assert all_objects[2].current is False
assert len(redis.keys(fetch_routing_key + ':*')) == 0
assert redis.llen(fetch_routing_key) == 2
# Remove one object from redis that should be re-sent to the fetch queue
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert len(fetch_queue_items) == 1
harvest_object_id = reply[2]
assert fetch_queue_items[0] != harvest_object_id
queue.resubmit_objects()
assert redis.llen(fetch_routing_key) == 2
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert harvest_object_id in fetch_queue_items
assert redis.dbsize() == 1
finally:
redis.flushdb()
def _create_harvest_job_and_finish_gather_stage(self, consumer, context):
source_dict = {'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test'}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True})
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
return harvest_source, job_id
class TestHarvestCorruptRedis(object):
@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})
# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id
assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "concatenate" in str(args[1])
finally:
redis.delete('ckanext-harvest:some-random-key-2')