Skip to content

Commit

Permalink
CKAN Harvester: add delete_missing option ckan#542
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcoelhocgu committed May 8, 2024
1 parent 8ea4b1b commit 5b8da48
Showing 1 changed file with 56 additions and 3 deletions.
59 changes: 56 additions & 3 deletions ckanext/harvest/harvesters/ckanharvester.py
Expand Up @@ -10,7 +10,7 @@
from ckan.lib.helpers import json
from ckan.plugins import toolkit

from ckanext.harvest.model import HarvestObject
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from .base import HarvesterBase

import logging
Expand Down Expand Up @@ -159,7 +159,7 @@ def validate_config(self, config):
except NotFound:
raise ValueError('User not found')

for key in ('read_only', 'force_all'):
for key in ('read_only', 'force_all', 'delete_missing'):
if key in config_obj:
if not isinstance(config_obj[key], bool):
raise ValueError('%s must be boolean' % key)
Expand All @@ -184,6 +184,18 @@ def gather_stage(self, harvest_job):

self._set_config(harvest_job.source.config)

# If using delete_missing, get the previous package ids for this source
delete_missing = self.config.get('delete_missing', False)
if delete_missing:
query = model.Session.query(HarvestObject.package_id) \
.filter(HarvestObject.package_id != None) \
.filter(HarvestObject.current == True) \
.filter(HarvestObject.harvest_source_id == harvest_job.source.id)

package_ids_in_db = set()
for package_id, in query:
package_ids_in_db.add(package_id)

# Get source URL
remote_ckan_base_url = harvest_job.source.url.rstrip('/')

Expand All @@ -209,10 +221,12 @@ def gather_stage(self, harvest_job):

# Ideally we can request from the remote CKAN only those datasets
# modified since the last completely successful harvest.
# If using delete_missing option we have to get all datasets
last_error_free_job = self.last_error_free_job(harvest_job)
log.debug('Last error-free job: %r', last_error_free_job)
if (last_error_free_job and
not self.config.get('force_all', False)):
not self.config.get('force_all', False) and
not delete_missing):
get_all_packages = False

# Request only the datasets modified since
Expand Down Expand Up @@ -283,6 +297,25 @@ def gather_stage(self, harvest_job):
obj.save()
object_ids.append(obj.id)

# If using delete_missing option, check for datasets that no longer
# exist
if delete_missing:
ids_to_delete = package_ids_in_db - package_ids
for pkg_id in ids_to_delete:
log.debug('Creating HarvestObject to delete dataset %s',
pkg_id)
obj = HarvestObject(
guid=pkg_id,
package_id=pkg_id,
job=harvest_job,
content='',
extras=[
HarvestObjectExtra(key='missing', value='true')
]
)
obj.save()
object_ids.append(obj.id)

return object_ids
except Exception as e:
self._save_gather_error('%r' % e.message, harvest_job)
Expand Down Expand Up @@ -385,6 +418,26 @@ def import_stage(self, harvest_object):
self._set_config(harvest_object.job.source.config)

try:
# If using delete_missing option, check if this object tells to
# delete the dataset
delete_missing = self.config.get('delete_missing', False)
if (delete_missing and
'missing' in [item.key for item in harvest_object.extras]):
# Delete package
toolkit.get_action('package_delete')(
base_context.copy(), {'id': harvest_object.package_id})
log.info('Deleted package {0}'.format(harvest_object.package_id))

# Update previous harvest object
model.Session.query(HarvestObject).\
filter(HarvestObject.guid == harvest_object.guid).\
filter(HarvestObject.harvest_source_id ==
harvest_object.source.id).\
filter(HarvestObject.current == True).\
update({'current': False})

return True

package_dict = json.loads(harvest_object.content)

if package_dict.get('type') == 'harvest':
Expand Down

0 comments on commit 5b8da48

Please sign in to comment.