Skip to content

Commit

Permalink
Merge pull request #164 from ckan/use-api-tokens-for-jobs
Browse files Browse the repository at this point in the history
Use API tokens for xloader jobs
  • Loading branch information
amercader committed Jul 1, 2022
2 parents 67a5374 + b421384 commit f40d0d0
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 183 deletions.
28 changes: 17 additions & 11 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,15 @@ offers to contribute this are welcomed.
Requirements
------------

Works with CKAN 2.7.x and later.

Works with CKAN 2.3.x - 2.6.x if you install ckanext-rq.

Compatibility with core CKAN versions:

=============== =============
CKAN version Compatibility
=============== =============
2.3 yes, but no longer tested and you must install ckanext-rq
2.4 yes, but no longer tested and you must install ckanext-rq
2.5 yes, but no longer tested and you must install ckanext-rq
2.6 yes, but no longer tested and you must install ckanext-rq
2.3 no longer tested and you must install ckanext-rq
2.4 no longer tested and you must install ckanext-rq
2.5 no longer tested and you must install ckanext-rq
2.6 no longer tested and you must install ckanext-rq
2.7 yes
2.8 yes
2.9 yes (both Python2 and Python3)
Expand Down Expand Up @@ -174,7 +170,12 @@ To install XLoader:

Ensure ``datastore`` is also listed, to enable CKAN DataStore.

6. If it is a production server, you'll want to store jobs info in a more
6. Starting CKAN 2.10 you will need to set an API Token to be able to
exeute jobs against the server::

ckanext.xloader.api_token = <your-CKAN-generated-API-Token>

7. If it is a production server, you'll want to store jobs info in a more
robust database than the default sqlite file. It can happily use the main
CKAN postgres db by adding this line to the config, but with the same value
as you have for ``sqlalchemy.url``::
Expand All @@ -183,11 +184,11 @@ To install XLoader:

(This step can be skipped when just developing or testing.)

7. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::
8. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::

sudo service apache2 reload

8. Run the worker. First test it on the command-line::
9. Run the worker. First test it on the command-line::

paster --plugin=ckan jobs -c /etc/ckan/default/ckan.ini worker

Expand Down Expand Up @@ -265,6 +266,11 @@ Configuration:
# to True.
ckanext.xloader.ssl_verify = True

# Uses a specific API token for the xloader_submit action instead of the
# apikey of the site_user
ckanext.xloader.api_token = ckan-provided-api-token


------------------------
Developer installation
------------------------
Expand Down
84 changes: 39 additions & 45 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
# encoding: utf-8

from __future__ import absolute_import
from six import text_type as str
import logging
import json
import datetime
import json
import logging

from dateutil.parser import parse as parse_date

import ckan.lib.jobs as rq_jobs
import ckan.lib.navl.dictization_functions
from ckan import logic
import ckan.plugins as p
from ckan.logic import side_effect_free
import ckan.lib.jobs as rq_jobs
import ckan.plugins as p
from dateutil.parser import parse as parse_date
from six import text_type as str

import ckanext.xloader.schema

from . import interfaces as xloader_interfaces
from . import jobs
from . import db
from . import utils

enqueue_job = p.toolkit.enqueue_job
get_queue = rq_jobs.get_queue

log = logging.getLogger(__name__)
config = p.toolkit.config

_get_or_bust = logic.get_or_bust
_get_or_bust = p.toolkit.get_or_bust
_validate = ckan.lib.navl.dictization_functions.validate


Expand Down Expand Up @@ -54,27 +54,16 @@ def xloader_submit(context, data_dict):
if errors:
raise p.toolkit.ValidationError(errors)

res_id = data_dict['resource_id']

p.toolkit.check_access('xloader_submit', context, data_dict)

res_id = data_dict['resource_id']
try:
resource_dict = p.toolkit.get_action('resource_show')(context, {
'id': res_id,
})
except logic.NotFound:
except p.toolkit.ObjectNotFound:
return False

site_url = config['ckan.site_url']
callback_url = p.toolkit.url_for(
"api.action",
ver=3,
logic_function="xloader_hook",
qualified=True
)

site_user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {})

for plugin in p.PluginImplementations(xloader_interfaces.IXloader):
upload = plugin.can_upload(res_id)
if not upload:
Expand Down Expand Up @@ -138,25 +127,29 @@ def xloader_submit(context, data_dict):
return False

task['id'] = existing_task['id']
except logic.NotFound:
except p.toolkit.ObjectNotFound:
pass

model = context['model']

p.toolkit.get_action('task_status_update')({
'session': model.meta.create_local_session(),
'ignore_auth': True
},
p.toolkit.get_action('task_status_update')(
{'session': model.meta.create_local_session(), 'ignore_auth': True},
task
)
)

callback_url = p.toolkit.url_for(
"api.action",
ver=3,
logic_function="xloader_hook",
qualified=True
)
data = {
'api_key': site_user['apikey'],
'api_key': utils.get_xloader_user_apitoken(),
'job_type': 'xloader_to_datastore',
'result_url': callback_url,
'metadata': {
'ignore_hash': data_dict.get('ignore_hash', False),
'ckan_url': site_url,
'ckan_url': config['ckan.site_url'],
'resource_id': res_id,
'set_url_type': data_dict.get('set_url_type', False),
'task_created': task['last_updated'],
Expand All @@ -165,12 +158,14 @@ def xloader_submit(context, data_dict):
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
try:
try:
job = enqueue_job(jobs.xloader_data_into_datastore, [data],
timeout=timeout)
except TypeError:
# older ckans didn't allow the timeout keyword
job = _enqueue(jobs.xloader_data_into_datastore, [data], timeout=timeout)
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
)
except TypeError:
# This except provides support for 2.7.
job = _enqueue(
jobs.xloader_data_into_datastore, [data], timeout=timeout
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
return False
Expand All @@ -182,20 +177,21 @@ def xloader_submit(context, data_dict):
task['state'] = 'pending'
task['last_updated'] = str(datetime.datetime.utcnow())

p.toolkit.get_action('task_status_update')({
'session': model.meta.create_local_session(),
'ignore_auth': True
},
p.toolkit.get_action('task_status_update')(
{'session': model.meta.create_local_session(), 'ignore_auth': True},
task
)
)

return True


def _enqueue(fn, args=None, kwargs=None, title=None, queue='default',
timeout=180):
'''Same as latest ckan.lib.jobs.enqueue - earlier CKAN versions dont have
the timeout param'''
the timeout param
This function can be removed when dropping support for 2.7
'''
if args is None:
args = []
if kwargs is None:
Expand Down Expand Up @@ -275,7 +271,7 @@ def xloader_hook(context, data_dict):
for plugin in p.PluginImplementations(xloader_interfaces.IXloader):
plugin.after_upload(context, resource_dict, dataset_dict)

logic.get_action('resource_create_default_resource_views')(
p.toolkit.get_action('resource_create_default_resource_views')(
context,
{
'resource': resource_dict,
Expand Down Expand Up @@ -325,8 +321,6 @@ def xloader_status(context, data_dict):

p.toolkit.check_access('xloader_status', context, data_dict)

if 'id' in data_dict:
data_dict['resource_id'] = data_dict['id']
res_id = _get_or_bust(data_dict, 'resource_id')

task = p.toolkit.get_action('task_status_show')(context, {
Expand Down
15 changes: 6 additions & 9 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ def _setup_xloader_logger(self):
logger.propagate = False # in case the config

def _submit_all_existing(self):
import ckan.model as model
from ckanext.datastore.backend \
import get_all_resources_ids_in_datastore
resource_ids = get_all_resources_ids_in_datastore()
print('Processing %d resources' % len(resource_ids))
user = tk.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})
{'ignore_auth': True}, {})
for resource_id in resource_ids:
try:
resource_dict = tk.get_action('resource_show')(
{'model': model, 'ignore_auth': True}, {'id': resource_id})
{'ignore_auth': True}, {'id': resource_id})
except tk.ObjectNotFound:
print(' Skipping resource {} found in datastore but not in '
'metadata'.format(resource_id))
Expand All @@ -44,25 +43,23 @@ def _submit_all(self):
# submit every package
# for each package in the package list,
# submit each resource w/ _submit_package
import ckan.model as model
package_list = tk.get_action('package_search')(
{'model': model, 'ignore_auth': True}, {'include_private': True, 'rows': 1000})
{'ignore_auth': True}, {'include_private': True, 'rows': 1000})
package_list = [pkg['id'] for pkg in package_list['results']]
print('Processing %d datasets' % len(package_list))
user = tk.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})
{'ignore_auth': True}, {})
for p_id in package_list:
self._submit_package(p_id, user, indent=2)

def _submit_package(self, pkg_id, user=None, indent=0):
import ckan.model as model
if not user:
user = tk.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})
{'ignore_auth': True}, {})

try:
pkg = tk.get_action('package_show')(
{'model': model, 'ignore_auth': True},
{'ignore_auth': True},
{'id': pkg_id.strip()})
except Exception as e:
print(e)
Expand Down
41 changes: 32 additions & 9 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
import sqlalchemy as sa

import ckan.model as model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version
import ckan.lib.search as search

from . import loader
from . import db
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError

try:
from ckan.lib.api_token import get_user_from_token
except ImportError:
get_user_from_token = None

SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()
Expand Down Expand Up @@ -473,23 +478,41 @@ def update_resource(resource, patch_only=False):
or patch the given CKAN resource for file hash
"""
action = 'resource_update' if not patch_only else 'resource_patch'
from ckan import model
user = get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})
context = {'model': model, 'session': model.Session, 'ignore_auth': True,
'user': user['name'], 'auth_user_obj': None}
user = get_action('get_site_user')({'ignore_auth': True}, {})
context = {
'ignore_auth': True,
'user': user['name'],
'auth_user_obj': None
}
get_action(action)(context, resource)


def _get_user_from_key(api_key_or_token):
""" Gets the user using the API Token or API Key.
This method provides backwards compatibility for CKAN 2.9 that
supported both methods and previous CKAN versions supporting
only API Keys.
"""
user = None
if get_user_from_token:
user = get_user_from_token(api_key_or_token)
if not user:
user = model.Session.query(model.User).filter_by(
apikey=api_key_or_token
).first()
return user


def get_resource_and_dataset(resource_id, api_key):
"""
Gets available information about the resource and its dataset from CKAN
"""
user = model.Session.query(model.User).filter_by(
apikey=api_key).first()
context = None
user = _get_user_from_key(api_key)
if user is not None:
context = {'user': user.name}
else:
context = None

res_dict = get_action('resource_show')(context, {'id': resource_id})
pkg_dict = get_action('package_show')(context, {'id': res_dict['package_id']})
return res_dict, pkg_dict
Expand Down

0 comments on commit f40d0d0

Please sign in to comment.