diff --git a/README.rst b/README.rst index e8d5893ac..ad30f13cb 100644 --- a/README.rst +++ b/README.rst @@ -94,7 +94,7 @@ Configuration Run the following command to create the necessary tables in the database (ensuring the pyenv is activated): - (pyenv) $ ckan --config=/etc/ckan/default/ckan.ini harvester initdb + (pyenv) $ ckan --config=/etc/ckan/default/ckan.ini db upgrade -p harvest Finally, restart CKAN to have the changes take effect:: @@ -229,9 +229,6 @@ Command line interface The following operations can be run from the command line as described underneath:: - harvester initdb - - Creates the necessary tables in the database - harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - create new harvest source diff --git a/ckanext/harvest/cli.py b/ckanext/harvest/cli.py index 20a750447..d5602beb9 100644 --- a/ckanext/harvest/cli.py +++ b/ckanext/harvest/cli.py @@ -20,14 +20,6 @@ def harvester(): pass -@harvester.command() -def initdb(): - """Creates the necessary tables in the database. - """ - utils.initdb() - click.secho(u"DB tables created", fg=u"green") - - @harvester.group() def source(): """Manage harvest sources @@ -337,7 +329,6 @@ def import_stage( import. e.g. 15af will run segments 1,5,a,f """ - ctx.invoke(initdb) flask_app = ctx.meta["flask_app"] with flask_app.test_request_context(): try: diff --git a/ckanext/harvest/commands/__init__.py b/ckanext/harvest/commands/__init__.py deleted file mode 100644 index 2f4b4738a..000000000 --- a/ckanext/harvest/commands/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - import pkgutil - - __path__ = pkgutil.extend_path(__path__, __name__) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py deleted file mode 100644 index daa482aea..000000000 --- a/ckanext/harvest/commands/harvester.py +++ /dev/null @@ -1,447 +0,0 @@ -from __future__ import print_function -import sys - -from ckan import model -from ckan.logic import get_action, ValidationError - -from ckantoolkit import CkanCommand - -import ckanext.harvest.utils as utils -from ckanext.harvest.logic.schema import unicode_safe - - -class Harvester(CkanCommand): - """Harvests remotely mastered metadata - - Usage: - - harvester initdb - - Creates the necessary tables in the database - - harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - - create new harvest source - - harvester source {source-id/name} - - shows a harvest source - - harvester rmsource {source-id/name} - - remove (deactivate) a harvester source, whilst leaving any related - datasets, jobs and objects - - harvester clearsource {source-id/name} - - clears all datasets, jobs and objects related to a harvest source, - but keeps the source itself - - harvester clearsource_history [{source-id}] [-k] - - If no source id is given the history for all harvest sources (maximum is 1000) will be cleared. - Clears all jobs and objects related to a harvest source, but keeps the source itself. - The datasets imported from the harvest source will NOT be deleted!!! - If a source id is given, it only clears the history of the harvest source with the given source id. - - To keep the currently active jobs use the -k option. - - harvester sources [all] - - lists harvest sources - If 'all' is defined, it also shows the Inactive sources - - harvester job {source-id/name} - - create new harvest job and runs it (puts it on the gather queue) - - harvester jobs - - lists harvest jobs - - harvester job_abort {source-id/source-name/obj-id} - - marks a job as "Aborted" so that the source can be restarted afresh. - It ensures that the job's harvest objects status are also marked - finished. You should ensure that neither the job nor its objects are - currently in the gather/fetch queues. - - harvester run - - starts any harvest jobs that have been created by putting them onto - the gather queue. Also checks running jobs - if finished it - changes their status to Finished. - - harvester run_test {source-id/name} - - runs a harvest - for testing only. - This does all the stages of the harvest (creates job, gather, fetch, - import) without involving the web UI or the queue backends. This is - useful for testing a harvester without having to fire up - gather/fetch_consumer processes, as is done in production. - - harvester gather_consumer - - starts the consumer for the gathering queue - - harvester fetch_consumer - - starts the consumer for the fetching queue - - harvester purge_queues - - removes all jobs from fetch and gather queue - - harvester abort_failed_jobs {job_life_span} [--include={source_id}] [--exclude={source_id}] - - abort all jobs which are in a "limbo state" where the job has - run with errors but the harvester run command will not mark it - as finished, and therefore you cannot run another job. - - job_life_span determines from what moment - the job must be considered as failed - - harvester clean_harvest_log - - Clean-up mechanism for the harvest log table. - You can configure the time frame through the configuration - parameter `ckan.harvest.log_timeframe`. The default time frame is 30 days - - harvester [-j] [-o|-g|-p {id/guid}] [--segments={segments}] import [{source-id}] - - perform the import stage with the last fetched objects, for a certain - source or a single harvest object. Please note that no objects will - be fetched from the remote server. It will only affect the objects - already present in the database. - - To import a particular harvest source, specify its id as an argument. - To import a particular harvest object use the -o option. - To import a particular guid use the -g option. - To import a particular package use the -p option. - - You will need to specify the -j flag in cases where the datasets are - not yet created (e.g. first harvest, or all previous harvests have - failed) - - The --segments flag allows to define a string containing hex digits that represent which of - the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f - - harvester job-all - - create new harvest jobs for all active sources. - - harvester reindex - - reindexes the harvest source datasets - - The commands should be run from the ckanext-harvest directory and expect - a development.ini file to be present. Most of the time you will - specify the config explicitly though:: - - paster harvester sources --config=../ckan/development.ini - - """ - - summary = __doc__.split("\n")[0] - usage = __doc__ - max_args = 9 - min_args = 0 - - def __init__(self, name): - - super(Harvester, self).__init__(name) - - self.parser.add_option( - "-j", - "--no-join-datasets", - dest="no_join_datasets", - action="store_true", - default=False, - help="Do not join harvest objects to existing datasets", - ) - - self.parser.add_option( - "-o", - "--harvest-object-id", - dest="harvest_object_id", - default=False, - help="Id of the harvest object to which perform the import stage", - ) - - self.parser.add_option( - "-p", - "--package-id", - dest="package_id", - default=False, - help="Id of the package whose harvest object to perform the import stage for", - ) - - self.parser.add_option( - "-g", - "--guid", - dest="guid", - default=False, - help="Guid of the harvest object to which perform the import stage for", - ) - - self.parser.add_option( - "--segments", - dest="segments", - default=False, - help="""A string containing hex digits that represent which of - the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f""", - ) - - self.parser.add_option( - "-i", - "--include", - dest="include_sources", - default=False, - help="""If source_id provided as included, then only it's failed jobs will be aborted. - You can use comma as a separator to provide multiple source_id's""", - ) - - self.parser.add_option( - "-e", - "--exclude", - dest="exclude_sources", - default=False, - help="""If source_id provided as excluded, all sources failed jobs, except for that - will be aborted. You can use comma as a separator to provide multiple source_id's""", - ) - - self.parser.add_option( - "-k", - "--keep-current", - dest="keep_current", - default=False, - help="Do not delete relevant harvest objects", - ) - - def command(self): - self._load_config() - - # We'll need a sysadmin user to perform most of the actions - # We will use the sysadmin site user (named as the site_id) - context = { - "model": model, - "session": model.Session, - "ignore_auth": True, - } - self.admin_user = get_action("get_site_user")(context, {}) - - print("") - - if len(self.args) == 0: - self.parser.print_usage() - sys.exit(1) - cmd = self.args[0] - if cmd == "source": - if len(self.args) > 2: - self.create_harvest_source() - else: - self.show_harvest_source() - elif cmd == "rmsource": - self.remove_harvest_source() - elif cmd == "clearsource": - self.clear_harvest_source() - elif cmd == "clearsource_history": - self.clear_harvest_source_history() - elif cmd == "sources": - self.list_harvest_sources() - elif cmd == "job": - self.create_harvest_job() - elif cmd == "jobs": - self.list_harvest_jobs() - elif cmd == "job_abort": - self.job_abort() - elif cmd == "run": - self.run_harvester() - elif cmd == "run_test": - self.run_test_harvest() - elif cmd == "gather_consumer": - utils.gather_consumer() - elif cmd == "fetch_consumer": - utils.fetch_consumer() - elif cmd == "purge_queues": - self.purge_queues() - elif cmd == "abort_failed_jobs": - self.abort_failed_jobs() - elif cmd == "initdb": - self.initdb() - elif cmd == "import": - self.initdb() - self.import_stage() - elif cmd == "job-all": - self.create_harvest_job_all() - elif cmd == "harvesters-info": - print(utils.harvesters_info()) - elif cmd == "reindex": - self.reindex() - elif cmd == "clean_harvest_log": - self.clean_harvest_log() - else: - print("Command {0} not recognized".format(cmd)) - - def _load_config(self): - super(Harvester, self)._load_config() - - def initdb(self): - utils.initdb() - print("DB tables created") - - def create_harvest_source(self): - - if len(self.args) >= 2: - name = unicode_safe(self.args[1]) - else: - print("Please provide a source name") - sys.exit(1) - if len(self.args) >= 3: - url = unicode_safe(self.args[2]) - else: - print("Please provide a source URL") - sys.exit(1) - if len(self.args) >= 4: - type = unicode_safe(self.args[3]) - else: - print("Please provide a source type") - sys.exit(1) - - if len(self.args) >= 5: - title = unicode_safe(self.args[4]) - else: - title = None - if len(self.args) >= 6: - active = not ( - self.args[5].lower() == "false" or self.args[5] == "0" - ) - else: - active = True - if len(self.args) >= 7: - owner_org = unicode_safe(self.args[6]) - else: - owner_org = None - if len(self.args) >= 8: - frequency = unicode_safe(self.args[7]) - if not frequency: - frequency = "MANUAL" - else: - frequency = "MANUAL" - if len(self.args) >= 9: - source_config = unicode_safe(self.args[8]) - else: - source_config = None - try: - result = utils.create_harvest_source( - name, url, type, title, active, owner_org, frequency, source_config - ) - except ValidationError as e: - print("An error occurred:") - print(str(e.error_dict)) - raise e - - print(result) - - def clear_harvest_source_history(self): - keep_current = bool(self.options.keep_current) - source_id = None - if len(self.args) >= 2: - source_id = unicode_safe(self.args[1]) - - print(utils.clear_harvest_source_history(source_id, keep_current)) - - def show_harvest_source(self): - - if len(self.args) >= 2: - source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a source name") - sys.exit(1) - print(utils.show_harvest_source(source_id_or_name)) - - def remove_harvest_source(self): - if len(self.args) >= 2: - source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a source id") - sys.exit(1) - utils.remove_harvest_source(source_id_or_name) - - def clear_harvest_source(self): - if len(self.args) >= 2: - source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a source id") - sys.exit(1) - utils.clear_harvest_source(source_id_or_name) - - def list_harvest_sources(self): - if len(self.args) >= 2 and self.args[1] == "all": - all = True - else: - all = False - - print(utils.list_sources(all)) - - def create_harvest_job(self): - if len(self.args) >= 2: - source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a source id") - sys.exit(1) - print(utils.create_job(source_id_or_name)) - - def list_harvest_jobs(self): - print(utils.list_jobs()) - - def job_abort(self): - if len(self.args) >= 2: - job_or_source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a job id or source name/id") - sys.exit(1) - print(utils.abort_job(job_or_source_id_or_name)) - - def run_harvester(self): - utils.run_harvester() - - def run_test_harvest(self): - # Determine the source - force_import = None - if len(self.args) >= 2: - if len(self.args) >= 3 and self.args[2].startswith('force-import='): - force_import = self.args[2].split('=')[-1] - source_id_or_name = unicode_safe(self.args[1]) - else: - print("Please provide a source id") - sys.exit(1) - - utils.run_test_harvester(source_id_or_name, force_import) - - def import_stage(self): - - if len(self.args) >= 2: - source_id_or_name = unicode_safe(self.args[1]) - context = { - "model": model, - "session": model.Session, - "user": self.admin_user["name"], - } - source = get_action("harvest_source_show")( - context, {"id": source_id_or_name} - ) - source_id = source["id"] - else: - source_id = None - utils.import_stage( - source_id, - self.options.no_join_datasets, - self.options.harvest_object_id, - self.options.guid, - self.options.package_id, - self.options.segments, - ) - - def create_harvest_job_all(self): - print(utils.job_all()) - - def reindex(self): - utils.reindex() - - def purge_queues(self): - utils.purge_queues() - - def clean_harvest_log(self): - utils.clean_harvest_log() - - def abort_failed_jobs(self): - job_life_span = False - if len(self.args) >= 2: - job_life_span = unicode_safe(self.args[1]) - - utils.abort_failed_jobs( - job_life_span, - include=self.options.include_sources, - exclude=self.options.exclude_sources - ) diff --git a/ckanext/harvest/migration/harvest/README b/ckanext/harvest/migration/harvest/README new file mode 100644 index 000000000..98e4f9c44 --- /dev/null +++ b/ckanext/harvest/migration/harvest/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/ckanext/harvest/migration/harvest/alembic.ini b/ckanext/harvest/migration/harvest/alembic.ini new file mode 100644 index 000000000..006fee785 --- /dev/null +++ b/ckanext/harvest/migration/harvest/alembic.ini @@ -0,0 +1,74 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = %(here)s + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +#truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; this defaults +# to /home/sergey/projects/core/ckanext-harvest/ckanext/harvest/migration/harvest/versions. When using multiple version +# directories, initial revisions must be specified with --version-path +# version_locations = %(here)s/bar %(here)s/bat /home/sergey/projects/core/ckanext-harvest/ckanext/harvest/migration/harvest/versions + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/ckanext/harvest/migration/harvest/env.py b/ckanext/harvest/migration/harvest/env.py new file mode 100644 index 000000000..009368218 --- /dev/null +++ b/ckanext/harvest/migration/harvest/env.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement +from alembic import context +from sqlalchemy import engine_from_config, pool +from logging.config import fileConfig + +import os + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + +name = os.path.basename(os.path.dirname(__file__)) + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + + url = config.get_main_option(u"sqlalchemy.url") + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix=u'sqlalchemy.', + poolclass=pool.NullPool) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + version_table=u'{}_alembic_version'.format(name) + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/ckanext/harvest/migration/harvest/script.py.mako b/ckanext/harvest/migration/harvest/script.py.mako new file mode 100644 index 000000000..2c0156303 --- /dev/null +++ b/ckanext/harvest/migration/harvest/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/ckanext/harvest/migration/harvest/versions/3b4894672727_create_harvest_tables.py b/ckanext/harvest/migration/harvest/versions/3b4894672727_create_harvest_tables.py new file mode 100644 index 000000000..689e48f37 --- /dev/null +++ b/ckanext/harvest/migration/harvest/versions/3b4894672727_create_harvest_tables.py @@ -0,0 +1,187 @@ +"""create harvest tables + +Revision ID: 3b4894672727 +Revises: +Create Date: 2023-11-02 15:53:02.262586 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "3b4894672727" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + engine = op.get_bind() + inspector = sa.inspect(engine) + + if not inspector.has_table("harvest_source"): + op.create_table( + "harvest_source", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column("url", sa.UnicodeText, nullable=False), + sa.Column("title", sa.UnicodeText), + sa.Column("description", sa.UnicodeText), + sa.Column("config", sa.UnicodeText), + sa.Column("created", sa.DateTime), + sa.Column("type", sa.UnicodeText, nullable=False), + sa.Column("active", sa.Boolean), + sa.Column("user_id", sa.UnicodeText), + sa.Column("publisher_id", sa.UnicodeText), + sa.Column("frequency", sa.UnicodeText), + sa.Column("next_run", sa.DateTime), + ) + + if not inspector.has_table("harvest_job"): + op.create_table( + "harvest_job", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column("created", sa.DateTime), + sa.Column("gather_started", sa.DateTime), + sa.Column("gather_finished", sa.DateTime), + sa.Column("finished", sa.DateTime), + sa.Column( + "source_id", + sa.UnicodeText, + sa.ForeignKey("harvest_source.id"), + ), + sa.Column("status", sa.UnicodeText, nullable=False), + ) + + if not inspector.has_table("harvest_object"): + op.create_table( + "harvest_object", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column("guid", sa.UnicodeText), + sa.Column("current", sa.Boolean), + sa.Column("gathered", sa.DateTime), + sa.Column("fetch_started", sa.DateTime), + sa.Column("content", sa.UnicodeText, nullable=True), + sa.Column("fetch_finished", sa.DateTime), + sa.Column("import_started", sa.DateTime), + sa.Column("import_finished", sa.DateTime), + sa.Column("state", sa.UnicodeText), + sa.Column("metadata_modified_date", sa.DateTime), + sa.Column("retry_times", sa.Integer), + sa.Column( + "harvest_job_id", + sa.UnicodeText, + sa.ForeignKey("harvest_job.id"), + ), + sa.Column( + "harvest_source_id", + sa.UnicodeText, + sa.ForeignKey("harvest_source.id"), + ), + sa.Column( + "package_id", + sa.UnicodeText, + sa.ForeignKey("package.id", deferrable=True), + nullable=True, + ), + sa.Column("report_status", sa.UnicodeText, nullable=True), + ) + + index_names = [index["name"] for index in inspector.get_indexes("harvest_object")] + if "harvest_job_id_idx" not in index_names: + op.create_index("harvest_job_id_idx", "harvest_object", ["harvest_job_id"]) + + if "harvest_source_id_idx" not in index_names: + op.create_index( + "harvest_source_id_idx", "harvest_object", ["harvest_source_id"] + ) + + if "package_id_idx" not in index_names: + op.create_index("package_id_idx", "harvest_object", ["package_id"]) + + if "guid_idx" not in index_names: + op.create_index("guid_idx", "harvest_object", ["guid"]) + + if not inspector.has_table("harvest_object_extra"): + op.create_table( + "harvest_object_extra", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column( + "harvest_object_id", + sa.UnicodeText, + sa.ForeignKey("harvest_object.id"), + ), + sa.Column("key", sa.UnicodeText), + sa.Column("value", sa.UnicodeText), + ) + + index_names = [ + index["name"] for index in inspector.get_indexes("harvest_object_extra") + ] + if "harvest_object_id_idx" not in index_names: + op.create_index( + "harvest_object_id_idx", "harvest_object_extra", ["harvest_object_id"] + ) + + if not inspector.has_table("harvest_gather_error"): + op.create_table( + "harvest_gather_error", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column( + "harvest_job_id", + sa.UnicodeText, + sa.ForeignKey("harvest_job.id"), + ), + sa.Column("message", sa.UnicodeText), + sa.Column("created", sa.DateTime), + ) + + if not inspector.has_table("harvest_object_error"): + op.create_table( + "harvest_object_error", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column( + "harvest_object_id", + sa.UnicodeText, + sa.ForeignKey("harvest_object.id"), + ), + sa.Column("message", sa.UnicodeText), + sa.Column("stage", sa.UnicodeText), + sa.Column("line", sa.Integer), + sa.Column("created", sa.DateTime), + ) + + index_names = [ + index["name"] for index in inspector.get_indexes("harvest_object_error") + ] + if "harvest_error_harvest_object_id_idx" not in index_names: + op.create_index( + "harvest_error_harvest_object_id_idx", + "harvest_object_error", + ["harvest_object_id"], + ) + + if not inspector.has_table("harvest_log"): + op.create_table( + "harvest_log", + sa.Column("id", sa.UnicodeText, primary_key=True), + sa.Column("content", sa.UnicodeText, nullable=False), + sa.Column( + "level", + sa.Enum( + "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", name="log_level" + ), + ), + sa.Column("created", sa.DateTime), + ) + + +def downgrade(): + op.drop_table("harvest_log") + sa.Enum(name="log_level").drop(op.get_bind()) + op.drop_table("harvest_object_error") + op.drop_table("harvest_gather_error") + op.drop_table("harvest_object_extra") + op.drop_table("harvest_object") + op.drop_table("harvest_job") + op.drop_table("harvest_source") diff --git a/ckanext/harvest/migration/harvest/versions/75d650dfd519_add_cascade_to_harvest_tables.py b/ckanext/harvest/migration/harvest/versions/75d650dfd519_add_cascade_to_harvest_tables.py new file mode 100644 index 000000000..8a5553209 --- /dev/null +++ b/ckanext/harvest/migration/harvest/versions/75d650dfd519_add_cascade_to_harvest_tables.py @@ -0,0 +1,102 @@ +"""add cascade to harvest tables + +Revision ID: 75d650dfd519 +Revises: 3b4894672727 +Create Date: 2023-11-02 17:13:39.995339 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "75d650dfd519" +down_revision = "3b4894672727" +branch_labels = None +depends_on = None + + +def upgrade(): + _recreate_fk("CASCADE") + + +def downgrade(): + _recreate_fk(None) + + +def _recreate_fk(ondelete): + op.drop_constraint("harvest_job_source_id_fkey", "harvest_job") + op.create_foreign_key( + "harvest_job_source_id_fkey", + "harvest_job", + "harvest_source", + ["source_id"], + ["id"], + ondelete=ondelete, + ) + + op.drop_constraint("harvest_object_harvest_job_id_fkey", "harvest_object") + op.create_foreign_key( + "harvest_object_harvest_job_id_fkey", + "harvest_object", + "harvest_job", + ["harvest_job_id"], + ["id"], + ondelete=ondelete, + ) + + op.drop_constraint("harvest_object_harvest_source_id_fkey", "harvest_object") + op.create_foreign_key( + "harvest_object_harvest_source_id_fkey", + "harvest_object", + "harvest_source", + ["harvest_source_id"], + ["id"], + ondelete=ondelete, + ) + + op.drop_constraint("harvest_object_package_id_fkey", "harvest_object") + op.create_foreign_key( + "harvest_object_package_id_fkey", + "harvest_object", + "package", + ["package_id"], + ["id"], + ondelete=ondelete, + deferrable=True, + ) + + op.drop_constraint( + "harvest_object_extra_harvest_object_id_fkey", "harvest_object_extra" + ) + op.create_foreign_key( + "harvest_object_extra_harvest_object_id_fkey", + "harvest_object_extra", + "harvest_object", + ["harvest_object_id"], + ["id"], + ondelete=ondelete, + ) + + op.drop_constraint( + "harvest_gather_error_harvest_job_id_fkey", "harvest_gather_error" + ) + op.create_foreign_key( + "harvest_gather_error_harvest_job_id_fkey", + "harvest_gather_error", + "harvest_job", + ["harvest_job_id"], + ["id"], + ondelete=ondelete, + ) + + op.drop_constraint( + "harvest_object_error_harvest_object_id_fkey", "harvest_object_error" + ) + op.create_foreign_key( + "harvest_object_error_harvest_object_id_fkey", + "harvest_object_error", + "harvest_object", + ["harvest_object_id"], + ["id"], + ondelete=ondelete, + ) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 288817921..c0211ddf5 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -7,100 +7,140 @@ from sqlalchemy import ForeignKey from sqlalchemy import types from sqlalchemy import Index -from sqlalchemy.engine.reflection import Inspector from sqlalchemy.orm import backref, relation from sqlalchemy.exc import InvalidRequestError -from ckan import model from ckan.model.meta import metadata, mapper, Session from ckan.model.types import make_uuid from ckan.model.domain_object import DomainObject from ckan.model.package import Package -UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS'] +UPDATE_FREQUENCIES = ["MANUAL", "MONTHLY", "WEEKLY", "BIWEEKLY", "DAILY", "ALWAYS"] log = logging.getLogger(__name__) __all__ = [ - 'HarvestSource', 'harvest_source_table', - 'HarvestJob', 'harvest_job_table', - 'HarvestObject', 'harvest_object_table', - 'HarvestGatherError', 'harvest_gather_error_table', - 'HarvestObjectError', 'harvest_object_error_table', - 'HarvestLog', 'harvest_log_table' + "HarvestSource", + "harvest_source_table", + "HarvestJob", + "harvest_job_table", + "HarvestObject", + "harvest_object_table", + "HarvestGatherError", + "harvest_gather_error_table", + "HarvestObjectError", + "harvest_object_error_table", + "HarvestLog", + "harvest_log_table", ] -harvest_source_table = None -harvest_job_table = None -harvest_object_table = None -harvest_gather_error_table = None -harvest_object_error_table = None -harvest_object_extra_table = None -harvest_log_table = None - - -def setup(): - - if harvest_source_table is None: - define_harvester_tables() - log.debug('Harvest tables defined in memory') - - if not model.package_table.exists(): - log.debug('Harvest table creation deferred') - return - - if not harvest_source_table.exists(): - - # Create each table individually rather than - # using metadata.create_all() - harvest_source_table.create() - harvest_job_table.create() - harvest_object_table.create() - harvest_gather_error_table.create() - harvest_object_error_table.create() - harvest_object_extra_table.create() - harvest_log_table.create() - - log.debug('Harvest tables created') - else: - from ckan.model.meta import engine - log.debug('Harvest tables already exist') - # Check if existing tables need to be updated - inspector = Inspector.from_engine(engine) - - # Check if harvest_log table exist - needed for existing users - if 'harvest_log' not in inspector.get_table_names(): - harvest_log_table.create() - - # Check if harvest_object has a index - index_names = [index['name'] for index in inspector.get_indexes("harvest_object")] - if "harvest_job_id_idx" not in index_names: - log.debug('Creating index for harvest_object') - Index("harvest_job_id_idx", harvest_object_table.c.harvest_job_id).create() - - if "harvest_source_id_idx" not in index_names: - log.debug('Creating index for harvest source') - Index("harvest_source_id_idx", harvest_object_table.c.harvest_source_id).create() - - if "package_id_idx" not in index_names: - log.debug('Creating index for package') - Index("package_id_idx", harvest_object_table.c.package_id).create() - - if "guid_idx" not in index_names: - log.debug('Creating index for guid') - Index("guid_idx", harvest_object_table.c.guid).create() - - index_names = [index['name'] for index in inspector.get_indexes("harvest_object_extra")] - if "harvest_object_id_idx" not in index_names: - log.debug('Creating index for harvest_object_extra') - Index("harvest_object_id_idx", harvest_object_extra_table.c.harvest_object_id).create() - - index_names = [index['name'] for index in inspector.get_indexes("harvest_object_error")] - if "harvest_error_harvest_object_id_idx" not in index_names: - log.debug('Creating index for harvest_object_error') - Index("harvest_error_harvest_object_id_idx", harvest_object_error_table.c.harvest_object_id).create() +harvest_source_table = Table( + "harvest_source", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("url", types.UnicodeText, nullable=False), + Column("title", types.UnicodeText, default=""), + Column("description", types.UnicodeText, default=""), + Column("config", types.UnicodeText, default=""), + Column("created", types.DateTime, default=datetime.datetime.utcnow), + Column("type", types.UnicodeText, nullable=False), + Column("active", types.Boolean, default=True), + Column("user_id", types.UnicodeText, default=""), + Column("publisher_id", types.UnicodeText, default=""), + Column("frequency", types.UnicodeText, default="MANUAL"), + Column("next_run", types.DateTime), +) +harvest_job_table = Table( + "harvest_job", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("created", types.DateTime, default=datetime.datetime.utcnow), + Column("gather_started", types.DateTime), + Column("gather_finished", types.DateTime), + Column("finished", types.DateTime), + Column("source_id", types.UnicodeText, ForeignKey("harvest_source.id")), + # status: New, Running, Finished + Column("status", types.UnicodeText, default="New", nullable=False), +) +harvest_object_table = Table( + "harvest_object", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + # The guid is the 'identity' of the dataset, according to the source. + # So if you reharvest it, then the harvester knows which dataset to + # update because of this identity. The identity needs to be unique + # within this CKAN. + Column("guid", types.UnicodeText, default=""), + # When you harvest a dataset multiple times, only the latest + # successfully imported harvest_object should be flagged 'current'. + # The import_stage usually reads and writes it. + Column("current", types.Boolean, default=False), + Column("gathered", types.DateTime, default=datetime.datetime.utcnow), + Column("fetch_started", types.DateTime), + Column("content", types.UnicodeText, nullable=True), + Column("fetch_finished", types.DateTime), + Column("import_started", types.DateTime), + Column("import_finished", types.DateTime), + # state: WAITING, FETCH, IMPORT, COMPLETE, ERROR + Column("state", types.UnicodeText, default="WAITING"), + Column("metadata_modified_date", types.DateTime), + Column("retry_times", types.Integer, default=0), + Column("harvest_job_id", types.UnicodeText, ForeignKey("harvest_job.id")), + Column("harvest_source_id", types.UnicodeText, ForeignKey("harvest_source.id")), + Column( + "package_id", + types.UnicodeText, + ForeignKey("package.id", deferrable=True), + nullable=True, + ), + # report_status: 'added', 'updated', 'not modified', 'deleted', 'errored' + Column("report_status", types.UnicodeText, nullable=True), + Index("harvest_job_id_idx", "harvest_job_id"), + Index("harvest_source_id_idx", "harvest_source_id"), + Index("package_id_idx", "package_id"), + Index("guid_idx", "guid"), +) +harvest_gather_error_table = Table( + "harvest_gather_error", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("harvest_job_id", types.UnicodeText, ForeignKey("harvest_job.id")), + Column("message", types.UnicodeText), + Column("created", types.DateTime, default=datetime.datetime.utcnow), +) +harvest_object_error_table = Table( + "harvest_object_error", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("harvest_object_id", types.UnicodeText, ForeignKey("harvest_object.id")), + Column("message", types.UnicodeText), + Column("stage", types.UnicodeText), + Column("line", types.Integer), + Column("created", types.DateTime, default=datetime.datetime.utcnow), + Index("harvest_error_harvest_object_id_idx", "harvest_object_id"), +) +harvest_object_extra_table = Table( + "harvest_object_extra", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("harvest_object_id", types.UnicodeText, ForeignKey("harvest_object.id")), + Column("key", types.UnicodeText), + Column("value", types.UnicodeText), + Index("harvest_object_id_idx", "harvest_object_id"), +) +harvest_log_table = Table( + "harvest_log", + metadata, + Column("id", types.UnicodeText, primary_key=True, default=make_uuid), + Column("content", types.UnicodeText, nullable=False), + Column( + "level", + types.Enum("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", name="log_level"), + ), + Column("created", types.DateTime, default=datetime.datetime.utcnow), +) class HarvestError(Exception): @@ -108,13 +148,13 @@ class HarvestError(Exception): class HarvestDomainObject(DomainObject): - '''Convenience methods for searching objects - ''' - key_attr = 'id' + """Convenience methods for searching objects""" + + key_attr = "id" @classmethod def get(cls, key, default=None, attr=None): - '''Finds a single entity in the register.''' + """Finds a single entity in the register.""" if attr is None: attr = cls.key_attr kwds = {attr: key} @@ -131,20 +171,25 @@ def filter(cls, **kwds): class HarvestSource(HarvestDomainObject): - '''A Harvest Source is essentially a URL plus some other metadata. - It must have a type (e.g. CSW) and can have a status of "active" - or "inactive". The harvesting processes are not fired on inactive - sources. - ''' + """A Harvest Source is essentially a URL plus some other metadata. + It must have a type (e.g. CSW) and can have a status of "active" + or "inactive". The harvesting processes are not fired on inactive + sources. + """ + def __repr__(self): - return '' % \ - (self.id, self.title, self.url, self.active) + return "" % ( + self.id, + self.title, + self.url, + self.active, + ) def __str__(self): - return self.__repr__().encode('ascii', 'ignore') + return self.__repr__().encode("ascii", "ignore") def get_jobs(self, status=None): - """ get the running jobs for this source """ + """get the running jobs for this source""" query = Session.query(HarvestJob).filter(HarvestJob.source_id == self.id) @@ -155,42 +200,46 @@ def get_jobs(self, status=None): class HarvestJob(HarvestDomainObject): - '''A Harvesting Job is performed in two phases. In first place, the - **gather** stage collects all the Ids and URLs that need to be fetched - from the harvest source. Errors occurring in this phase - (``HarvestGatherError``) are stored in the ``harvest_gather_error`` - table. During the next phase, the **fetch** stage retrieves the - ``HarvestedObjects`` and, if necessary, the **import** stage stores - them on the database. Errors occurring in this second stage - (``HarvestObjectError``) are stored in the ``harvest_object_error`` - table. - ''' + """A Harvesting Job is performed in two phases. In first place, the + **gather** stage collects all the Ids and URLs that need to be fetched + from the harvest source. Errors occurring in this phase + (``HarvestGatherError``) are stored in the ``harvest_gather_error`` + table. During the next phase, the **fetch** stage retrieves the + ``HarvestedObjects`` and, if necessary, the **import** stage stores + them on the database. Errors occurring in this second stage + (``HarvestObjectError``) are stored in the ``harvest_object_error`` + table. + """ def get_last_finished_object(self): - ''' Determine the last finished object in this job - Helpful to know if a job is running or not and - to avoid timeouts when the source is running - ''' - - query = Session.query(HarvestObject)\ - .filter(HarvestObject.harvest_job_id == self.id)\ - .filter(HarvestObject.state == "COMPLETE")\ - .filter(HarvestObject.import_finished.isnot(None))\ - .order_by(HarvestObject.import_finished.desc())\ + """Determine the last finished object in this job + Helpful to know if a job is running or not and + to avoid timeouts when the source is running + """ + + query = ( + Session.query(HarvestObject) + .filter(HarvestObject.harvest_job_id == self.id) + .filter(HarvestObject.state == "COMPLETE") + .filter(HarvestObject.import_finished.isnot(None)) + .order_by(HarvestObject.import_finished.desc()) .first() + ) return query def get_last_gathered_object(self): - ''' Determine the last gathered object in this job - Helpful to know if a job is running or not and - to avoid timeouts when the source is running - ''' - - query = Session.query(HarvestObject)\ - .filter(HarvestObject.harvest_job_id == self.id)\ - .order_by(HarvestObject.gathered.desc())\ + """Determine the last gathered object in this job + Helpful to know if a job is running or not and + to avoid timeouts when the source is running + """ + + query = ( + Session.query(HarvestObject) + .filter(HarvestObject.harvest_job_id == self.id) + .order_by(HarvestObject.gathered.desc()) .first() + ) return query @@ -209,34 +258,37 @@ def get_last_action_time(self): return self.created def get_gather_errors(self): - query = Session.query(HarvestGatherError)\ - .filter(HarvestGatherError.harvest_job_id == self.id)\ - .order_by(HarvestGatherError.created.desc()) + query = ( + Session.query(HarvestGatherError) + .filter(HarvestGatherError.harvest_job_id == self.id) + .order_by(HarvestGatherError.created.desc()) + ) return query.all() class HarvestObject(HarvestDomainObject): - '''A Harvest Object is created every time an element is fetched from a - harvest source. Its contents can be processed and imported to ckan - packages, RDF graphs, etc. + """A Harvest Object is created every time an element is fetched from a + harvest source. Its contents can be processed and imported to ckan + packages, RDF graphs, etc. - ''' + """ class HarvestObjectExtra(HarvestDomainObject): - '''Extra key value data for Harvest objects''' + """Extra key value data for Harvest objects""" class HarvestGatherError(HarvestDomainObject): - '''Gather errors are raised during the **gather** stage of a harvesting - job. - ''' + """Gather errors are raised during the **gather** stage of a harvesting + job. + """ + @classmethod def create(cls, message, job): - ''' + """ Helper function to create an error object and save it. - ''' + """ err = cls(message=message, job=job) try: err.save() @@ -249,16 +301,16 @@ def create(cls, message, job): class HarvestObjectError(HarvestDomainObject): - '''Object errors are raised during the **fetch** or **import** stage of a - harvesting job, and are referenced to a specific harvest object. - ''' + """Object errors are raised during the **fetch** or **import** stage of a + harvesting job, and are referenced to a specific harvest object. + """ + @classmethod - def create(cls, message, object, stage=u'Fetch', line=None): - ''' + def create(cls, message, object, stage="Fetch", line=None): + """ Helper function to create an error object and save it. - ''' - err = cls(message=message, object=object, - stage=stage, line=line) + """ + err = cls(message=message, object=object, stage=stage, line=line) try: err.save() except InvalidRequestError: @@ -273,245 +325,123 @@ def create(cls, message, object, stage=u'Fetch', line=None): pass err.save() finally: - log_message = '{0}, line {1}'.format(message, line) \ - if line else message + log_message = "{0}, line {1}".format(message, line) if line else message log.debug(log_message) class HarvestLog(HarvestDomainObject): - '''HarvestLog objects are created each time something is logged - using python's standard logging module - ''' + """HarvestLog objects are created each time something is logged + using python's standard logging module + """ + pass def harvest_object_before_insert_listener(mapper, connection, target): - ''' - For compatibility with old harvesters, check if the source id has - been set, and set it automatically from the job if not. - ''' + """ + For compatibility with old harvesters, check if the source id has + been set, and set it automatically from the job if not. + """ if not target.harvest_source_id or not target.source: if not target.job: - raise Exception('You must define a Harvest Job for each Harvest Object') + raise Exception("You must define a Harvest Job for each Harvest Object") target.source = target.job.source target.harvest_source_id = target.job.source.id -def define_harvester_tables(): - - global harvest_source_table - global harvest_job_table - global harvest_object_table - global harvest_object_extra_table - global harvest_gather_error_table - global harvest_object_error_table - global harvest_log_table - - harvest_source_table = Table( - 'harvest_source', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('url', types.UnicodeText, nullable=False), - Column('title', types.UnicodeText, default=u''), - Column('description', types.UnicodeText, default=u''), - Column('config', types.UnicodeText, default=u''), - Column('created', types.DateTime, default=datetime.datetime.utcnow), - Column('type', types.UnicodeText, nullable=False), - Column('active', types.Boolean, default=True), - Column('user_id', types.UnicodeText, default=u''), - Column('publisher_id', types.UnicodeText, default=u''), - Column('frequency', types.UnicodeText, default=u'MANUAL'), - Column('next_run', types.DateTime), - ) - # Was harvesting_job - harvest_job_table = Table( - 'harvest_job', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('created', types.DateTime, default=datetime.datetime.utcnow), - Column('gather_started', types.DateTime), - Column('gather_finished', types.DateTime), - Column('finished', types.DateTime), - Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), - # status: New, Running, Finished - Column('status', types.UnicodeText, default=u'New', nullable=False), - ) - # A harvest_object contains a representation of one dataset during a - # particular harvest - harvest_object_table = Table( - 'harvest_object', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - # The guid is the 'identity' of the dataset, according to the source. - # So if you reharvest it, then the harvester knows which dataset to - # update because of this identity. The identity needs to be unique - # within this CKAN. - Column('guid', types.UnicodeText, default=u''), - # When you harvest a dataset multiple times, only the latest - # successfully imported harvest_object should be flagged 'current'. - # The import_stage usually reads and writes it. - Column('current', types.Boolean, default=False), - Column('gathered', types.DateTime, default=datetime.datetime.utcnow), - Column('fetch_started', types.DateTime), - Column('content', types.UnicodeText, nullable=True), - Column('fetch_finished', types.DateTime), - Column('import_started', types.DateTime), - Column('import_finished', types.DateTime), - # state: WAITING, FETCH, IMPORT, COMPLETE, ERROR - Column('state', types.UnicodeText, default=u'WAITING'), - Column('metadata_modified_date', types.DateTime), - Column('retry_times', types.Integer, default=0), - Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), - Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')), - Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), - nullable=True), - # report_status: 'added', 'updated', 'not modified', 'deleted', 'errored' - Column('report_status', types.UnicodeText, nullable=True), - Index('harvest_job_id_idx', 'harvest_job_id'), - Index('harvest_source_id_idx', 'harvest_source_id'), - Index('package_id_idx', 'package_id'), - Index('guid_idx', 'guid'), - ) - - # New table - harvest_object_extra_table = Table( - 'harvest_object_extra', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')), - Column('key', types.UnicodeText), - Column('value', types.UnicodeText), - Index('harvest_object_id_idx', 'harvest_object_id'), - ) - - # New table - harvest_gather_error_table = Table( - 'harvest_gather_error', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), - Column('message', types.UnicodeText), - Column('created', types.DateTime, default=datetime.datetime.utcnow), - ) - # New table - harvest_object_error_table = Table( - 'harvest_object_error', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')), - Column('message', types.UnicodeText), - Column('stage', types.UnicodeText), - Column('line', types.Integer), - Column('created', types.DateTime, default=datetime.datetime.utcnow), - Index('harvest_error_harvest_object_id_idx', 'harvest_object_id'), - ) - # Harvest Log table - harvest_log_table = Table( - 'harvest_log', - metadata, - Column('id', types.UnicodeText, primary_key=True, default=make_uuid), - Column('content', types.UnicodeText, nullable=False), - Column('level', types.Enum('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', name='log_level')), - Column('created', types.DateTime, default=datetime.datetime.utcnow), - ) - - mapper( - HarvestSource, - harvest_source_table, - properties={ - 'jobs': relation( - HarvestJob, - lazy=True, - backref=u'source', - order_by=harvest_job_table.c.created, - ), - }, - ) - - mapper( - HarvestJob, - harvest_job_table, - ) - - mapper( - HarvestObject, - harvest_object_table, - properties={ - 'package': relation( - Package, - lazy=True, - backref='harvest_objects', - ), - 'job': relation( - HarvestJob, - lazy=True, - backref=u'objects', - ), - 'source': relation( - HarvestSource, - lazy=True, - backref=u'objects', - ), - - }, - ) - - mapper( - HarvestGatherError, - harvest_gather_error_table, - properties={ - 'job': relation( - HarvestJob, - backref='gather_errors' - ), - }, - ) - - mapper( - HarvestObjectError, - harvest_object_error_table, - properties={ - 'object': relation( - HarvestObject, - backref=backref('errors', cascade='all,delete-orphan') - ), - }, - ) - - mapper( - HarvestObjectExtra, - harvest_object_extra_table, - properties={ - 'object': relation( - HarvestObject, - backref=backref('extras', cascade='all,delete-orphan') - ), - }, - ) - - mapper( - HarvestLog, - harvest_log_table, - ) - - event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener) - - class PackageIdHarvestSourceIdMismatch(Exception): """ The package created for the harvest source must match the id of the harvest source """ + pass def clean_harvest_log(condition): - Session.query(HarvestLog).filter(HarvestLog.created <= condition) \ - .delete(synchronize_session=False) + Session.query(HarvestLog).filter(HarvestLog.created <= condition).delete( + synchronize_session=False + ) try: Session.commit() except InvalidRequestError: Session.rollback() - log.error('An error occurred while trying to clean-up the harvest log table') - - log.info('Harvest log table clean-up finished successfully') + log.error("An error occurred while trying to clean-up the harvest log table") + + log.info("Harvest log table clean-up finished successfully") + + +mapper( + HarvestSource, + harvest_source_table, + properties={ + "jobs": relation( + HarvestJob, + lazy=True, + backref="source", + order_by=harvest_job_table.c.created, + ), + }, +) + +mapper( + HarvestJob, + harvest_job_table, +) + +mapper( + HarvestObject, + harvest_object_table, + properties={ + "package": relation( + Package, + lazy=True, + backref="harvest_objects", + ), + "job": relation( + HarvestJob, + lazy=True, + backref="objects", + ), + "source": relation( + HarvestSource, + lazy=True, + backref="objects", + ), + }, +) + +mapper( + HarvestGatherError, + harvest_gather_error_table, + properties={ + "job": relation(HarvestJob, backref="gather_errors"), + }, +) + +mapper( + HarvestObjectError, + harvest_object_error_table, + properties={ + "object": relation( + HarvestObject, backref=backref("errors", cascade="all,delete-orphan") + ), + }, +) + +mapper( + HarvestObjectExtra, + harvest_object_extra_table, + properties={ + "object": relation( + HarvestObject, backref=backref("extras", cascade="all,delete-orphan") + ), + }, +) + +mapper( + HarvestLog, + harvest_log_table, +) + +event.listen(HarvestObject, "before_insert", harvest_object_before_insert_listener) diff --git a/ckanext/harvest/plugin.py b/ckanext/harvest/plugin.py index 02ab6ae4b..72d145264 100644 --- a/ckanext/harvest/plugin.py +++ b/ckanext/harvest/plugin.py @@ -15,7 +15,6 @@ import ckanext.harvest from ckanext.harvest import cli, views -from ckanext.harvest.model import setup as model_setup from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.log import DBLogHandler @@ -275,9 +274,6 @@ def configure(self, config): self.startup = True - # Setup harvest model - model_setup() - # Configure database logger _configure_db_logger(config) diff --git a/ckanext/harvest/utils.py b/ckanext/harvest/utils.py index a58c7f598..99a1e124c 100644 --- a/ckanext/harvest/utils.py +++ b/ckanext/harvest/utils.py @@ -101,12 +101,6 @@ def _there_are(what, sequence, condition=""): ) -def initdb(): - from ckanext.harvest.model import setup as db_setup - - db_setup() - - def create_harvest_source( name, url, diff --git a/setup.py b/setup.py index f1efc2088..8a9fe2adc 100644 --- a/setup.py +++ b/setup.py @@ -39,8 +39,6 @@ test_harvester2=ckanext.harvest.tests.test_queue2:MockHarvester test_action_harvester=ckanext.harvest.tests.test_action:MockHarvesterForActionTests - [paste.paster_command] - harvester = ckanext.harvest.commands.harvester:Harvester [babel.extractors] ckan = ckan.lib.extract:extract_ckan """,