Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow full clear of completed jobs #503

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -13,4 +13,5 @@ development.ini
node_modules
*.project
.eggs
.vscode/
.vscode/
.idea/
46 changes: 10 additions & 36 deletions ckanext/harvest/logic/action/update.py
Expand Up @@ -313,16 +313,14 @@ def harvest_abort_failed_jobs(context, data_dict):

def harvest_sources_job_history_clear(context, data_dict):
'''
Clears the history for all active harvest sources. All jobs and objects related to a harvest source will
be cleared, but keeps the source itself.
Clears the history for all active harvest sources. All non-running jobs and non-current harvest objects will
be cleared, but keeps the source itself and the most current harvest objects.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

'''
check_access('harvest_sources_clear', context, data_dict)

keep_current = data_dict.get('keep_current', False)

job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
Expand All @@ -331,7 +329,7 @@ def harvest_sources_job_history_clear(context, data_dict):
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(
context, {'id': data_dict['id'], 'keep_current': keep_current})
context, {'id': data_dict['id']})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
Expand All @@ -343,7 +341,7 @@ def harvest_sources_job_history_clear(context, data_dict):

def harvest_source_job_history_clear(context, data_dict):
'''
Clears all jobs and objects related to a harvest source, but keeps the source itself.
Clears all jobs and out-of-date harvest objects from a harvest source.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

Expand All @@ -354,7 +352,6 @@ def harvest_source_job_history_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)

harvest_source_id = data_dict.get('id', None)
keep_current = data_dict.get('keep_current', False)

source = HarvestSource.get(harvest_source_id)
if not source:
Expand All @@ -365,49 +362,26 @@ def harvest_source_job_history_clear(context, data_dict):

model = context['model']

if keep_current:
sql = '''BEGIN;
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
AND status = 'Running')));
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
AND status = 'Running')));
DELETE FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true));
AND status = 'Running'));
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id));
AND job.status != 'Running');
DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id);
COMMIT;
'''.format(harvest_source_id=harvest_source_id)
else:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
AND job.status != 'Running';
COMMIT;
'''.format(harvest_source_id=harvest_source_id)

Expand Down
3 changes: 2 additions & 1 deletion ckanext/harvest/model/__init__.py
Expand Up @@ -28,6 +28,7 @@
'HarvestObject', 'harvest_object_table',
'HarvestGatherError', 'harvest_gather_error_table',
'HarvestObjectError', 'harvest_object_error_table',
'HarvestObjectExtra', 'harvest_object_extra_table',
'HarvestLog', 'harvest_log_table'
]

Expand Down Expand Up @@ -356,7 +357,7 @@ def define_harvester_tables():
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times', types.Integer, default=0),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id', ondelete='SET NULL'), nullable=True),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing table structure would require migration script, otherwise it would not be applied to any instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a schema change creates some challenges for existing databases. However, the ckanapi package gives very nice support for exporting and re-importing users, packages, and organizations, so that a fresh database instance with imported users, etc is not very time consuming. I could write up a set of instructions for the Wiki if this is useful.

Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True),
nullable=True),
Expand Down
29 changes: 16 additions & 13 deletions ckanext/harvest/tests/test_action.py
Expand Up @@ -353,15 +353,15 @@ def test_harvest_sources_job_history_clear_keep_current(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {'keep_current': True})
context, {})

# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])

# dataset, related source, object and job still persist!
# dataset, related source, object still persist, job is deleted!
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id)
assert not harvest_model.HarvestJob.get(job_1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1
Expand Down Expand Up @@ -397,12 +397,12 @@ def test_harvest_source_job_history_clear_keep_current(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})
context, {'id': source.id})

# verify
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id)
assert not harvest_model.HarvestJob.get(job.id)
assert harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
Expand All @@ -415,7 +415,7 @@ def test_harvest_source_job_history_clear_keep_current(self):
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']

def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
def test_harvest_source_job_history_clear_deletes_current_finished_jobs(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
Expand All @@ -439,7 +439,7 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})
context, {'id': source.id})

# verify
assert result == {'id': source.id}
Expand All @@ -449,8 +449,9 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# job2 and related objects are untouched
assert harvest_model.HarvestJob.get(job2.id)

# job2 is deleted, but harvest objects are kept
assert not harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
Expand Down Expand Up @@ -488,17 +489,19 @@ def test_harvest_source_job_history_clear_keep_current_running_job(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})
context, {'id': source.id})

# verify that both jobs still exists
# verify first job and non-current objects are deleted, but any current objects are kept
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
assert not harvest_model.HarvestJob.get(job1.id)
assert not harvest_model.HarvestObject.get(object_1_.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db = model.Package.get(dataset1['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset1['id']

# verify that second job still exists and all harvest objects are kept
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_3_.id)
assert harvest_model.HarvestObject.get(object_4_.id)
Expand Down
7 changes: 2 additions & 5 deletions ckanext/harvest/utils.py
Expand Up @@ -216,18 +216,15 @@ def clear_harvest_source_history(source_id, keep_current):
if source_id is not None:
tk.get_action("harvest_source_job_history_clear")(context, {
"id": source_id,
"keep_current": keep_current
})
})
return "Cleared job history of harvest source: {0}".format(source_id)
Copy link
Contributor Author

@bonnland bonnland May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the language used in the return statement. This command is most useful if it clears the "job history", not the entire source history. Perhaps a change in the command name to "clear-job-history" would be better, as it more clearly states the eventual outcome of the command.

else:
# Purge queues, because we clean all harvest jobs and
# objects in the database.
if not keep_current:
purge_queues()
cleared_sources_dicts = tk.get_action(
"harvest_sources_job_history_clear")(context, {
"keep_current": keep_current
})
"harvest_sources_job_history_clear")(context)
return "Cleared job history for all harvest sources: {0} source(s)".format(
len(cleared_sources_dicts))

Expand Down