Skip to content

Commit

Permalink
Version 2.2.2 (#390)
Browse files Browse the repository at this point in the history
* Adding an option to use /dev/shm for mikado serialise and compare.

* Bumped version

* Using TMPDIR by default when creating/reading sqlite databases
  • Loading branch information
lucventurini committed Mar 19, 2021
1 parent 9a5a5dc commit 40f887b
Show file tree
Hide file tree
Showing 18 changed files with 257 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.2.1
current_version = 2.2.2
commit = False
tag = False

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,9 @@
# Version 2.2.2

Added the SHM capabilities to `mikado serialise` and `mikado compare`.
Now `mikado` `serialise`, `pick` and `compare` will copy the database to the TMPDIR before running, to ensure atomicity.
This will also prevent problems in case the data is usually on an NFS drive, as is the case in most computing clusters.

# Version 2.2.1
Pinning sqlalchemy to <1.4.0 until sqlalchemy_utils is updated.
Moreover, solved a small bug in `prepare`: setting `prepare.exclude_redundant` to `True` in the configuration file had
Expand Down
4 changes: 4 additions & 0 deletions Mikado/configuration/serialise_config.py
Expand Up @@ -125,3 +125,7 @@ class SerialiseConfiguration:
"metadata": {"description": "Whether to drop and reload everything into the DB"},
})
single_thread: bool = field(default=False, metadata={})
shm: bool = field(default=False, metadata={
"metadata": {"description": "Whether to build the database in /dev/shm before copying it to its final "
"location, or not."}
})
42 changes: 18 additions & 24 deletions Mikado/picking/picker.py
Expand Up @@ -17,12 +17,14 @@
from logging import handlers as logging_handlers
import functools
import multiprocessing

from Mikado import version
from sqlalchemy.engine import create_engine # SQLAlchemy/DB imports
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.pool import QueuePool as SqlPool
import sqlalchemy
import sqlalchemy.exc
from ..utilities import path_join
from ..utilities import path_join, create_shm_handle
from ..utilities.log_utils import formatter
from ..parsers.GTF import GTF, GtfLine
from ..parsers.GFF import GFF3
Expand Down Expand Up @@ -221,28 +223,19 @@ def setup_shm_db(self):
This method will copy the SQLite input DB into memory.
"""

if self.configuration.pick.run_options.shm is True:
shm_available = os.path.exists("/dev/shm") and os.access("/dev/shm", os.W_OK)
if shm_available is False:
self.main_logger.info("Mikado was asked to copy the database into /dev/shm, but it is either \
not available or not writable for this user. Leaving the DB where it is.")
return
self.main_logger.info("Copying Mikado database into a SHM db")
assert self.configuration.db_settings.dbtype == "sqlite"
# Create temporary file
self.shm_db = tempfile.NamedTemporaryFile(suffix=".db", prefix="/dev/shm/")
self.main_logger.debug("Copying {0} into {1}".format(
self.configuration.db_settings.db,
self.shm_db.name))
try:
shutil.copy2(self.configuration.db_settings.db, self.shm_db.name)
self.configuration.db_settings.db = self.shm_db.name
except PermissionError:
self.main_logger.warning(
"""Permission to write on /dev/shm denied.
Back to using the DB on disk.""")
self.configuration.pick.run_options.shm = False
self.main_logger.info("DB copied into memory")
if self.configuration.db_settings.dbtype == "sqlite" and os.path.exists(self.configuration.db_settings.db):
if self.configuration.pick.run_options.shm is True:
handle = create_shm_handle()
if handle is None:
self.main_logger.info("Mikado was asked to copy the database into /dev/shm, but it is either "
"not available or not writable for this user.")
handle = tempfile.NamedTemporaryFile(suffix=".db")
else:
handle = tempfile.NamedTemporaryFile(suffix=".db")
self.shm_db = handle
shutil.copy2(self.configuration.db_settings.db, self.shm_db.name)
self.configuration.db_settings.db = self.shm_db.name
self.main_logger.info(f"DB copied into {self.shm_db.name}")

def setup_logger(self):

Expand Down Expand Up @@ -299,13 +292,14 @@ def setup_logger(self):
self.main_logger.setLevel(logging.INFO)
self.main_logger.addHandler(self.log_handler)

self.main_logger.info("Begun analysis of {0}".format(self.input_file))
self.main_logger.info(f"Mikado version: {version.__version__}")
if self.commandline != '':
self.main_logger.info("Command line: {0}".format(self.commandline))
else:
self.main_logger.info(
"Analysis launched directly, without using the launch script.")

self.main_logger.info("Begun analysis of {0}".format(self.input_file))
# Create the shared DB if necessary
self.log_writer = logging_handlers.QueueListener(self.logging_queue, self.logger)
self.log_writer.start()
Expand Down
4 changes: 3 additions & 1 deletion Mikado/scales/accountant.py
Expand Up @@ -10,6 +10,7 @@
import logging
import operator
from logging import handlers as log_handlers
from .. import version
from ..transcripts import Transcript
from ..utilities.namespace import Namespace
from ..utilities import calc_f1, Interval
Expand Down Expand Up @@ -1027,7 +1028,8 @@ def print_stats(self):
with open("{0}.stats".format(self.args.out), 'wt') as out:

# noinspection PyUnresolvedReferences
print("Command line:\n{0:>10}".format(self.args.commandline), file=out)
print("Mikado version {1}; Command line:\n{0:>10}".format(self.args.commandline,
version.__version__), file=out)
print(gene_transcript_results["ref"]["total"], "reference RNAs in",
len(self.ref_genes), "genes", file=out)
print(gene_transcript_results["pred"]["total"], "predicted RNAs in ",
Expand Down
27 changes: 20 additions & 7 deletions Mikado/scales/compare.py
Expand Up @@ -10,6 +10,9 @@
import os
import sys
from logging import handlers as log_handlers
import tempfile
from .. import version
from ..exceptions import CorruptIndex
from ..utilities.log_utils import create_default_logger, formatter
import multiprocessing as mp

Expand Down Expand Up @@ -69,18 +72,22 @@ def setup_logger(args):


def _shutdown(args, index_name, logger, handler, queue_logger: logging.Logger, log_queue_listener):
args.reference.close()
if hasattr(args.prediction, "close"):
args.prediction.close()
if args.no_save_index is True or args.shm is True or os.path.dirname(index_name) == tempfile.tempdir:
queue_logger.debug(f"Removing the index in {index_name}")
os.remove(index_name)
queue_logger.debug(f"Removed the index in {index_name}")
else:
queue_logger.debug("Index left in place")
queue_logger.info("Finished")
log_queue_listener.enqueue_sentinel()
log_queue_listener.stop()
args.queue_handler.close()
[_.close() for _ in logger.handlers]
handler.flush()
handler.close()
args.reference.close()
if hasattr(args.prediction, "close"):
args.prediction.close()
if args.no_save_index is True:
os.remove(index_name)
[_.close() for _ in queue_logger.handlers]
return

Expand All @@ -100,12 +107,18 @@ def compare(args):
args, handler, logger, log_queue_listener, queue_logger = setup_logger(args)
queue_logger.info("Start")
args.commandline = " ".join(sys.argv)
queue_logger.info(f"Mikado version: {version.__version__}")
queue_logger.info("Command line: %s", args.commandline)
logger.handlers[0].flush()

from .reference_preparation import prepare_index
index_name = prepare_index(args, queue_logger)
assert os.path.exists(index_name), "Something went wrong in creating or loading the index {}".format(index_name)
try:
index_name = prepare_index(args, queue_logger)
except (CorruptIndex, PermissionError, ValueError, IndexError, KeyError):
queue_logger.critical("Something went wrong in creating or loading the index.")
raise

assert os.path.exists(index_name)

if args.index is True:
_shutdown(args, index_name, logger, handler, queue_logger, log_queue_listener)
Expand Down
29 changes: 25 additions & 4 deletions Mikado/scales/reference_preparation/__init__.py
@@ -1,16 +1,26 @@
import shutil

from ...parsers.GFF import GFF3
from .indexing import create_index, check_index
from ...exceptions import CorruptIndex
import os
import tempfile

from ...utilities import create_shm_handle


def prepare_index(args, queue_logger):
index_name = os.path.abspath("{0}.midx".format(args.reference.name))
ref_gff = isinstance(args.reference, GFF3)

if args.shm is True:
shm_handle = create_shm_handle()
else:
shm_handle = tempfile.NamedTemporaryFile(suffix=".midx")

if args.index is True:
create_index(args.reference, queue_logger=queue_logger, index_name=index_name,
ref_gff=ref_gff,
ref_gff=ref_gff, use_shm=args.shm,
exclude_utr=False, protein_coding=False)
assert os.path.exists(index_name), \
"Index {} should have been created but is now absent! File system problems?".format(index_name)
Expand All @@ -27,7 +37,7 @@ def prepare_index(args, queue_logger):
index_name = __index.name

ref_gff = isinstance(args.reference, GFF3)
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff,
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff, use_shm=args.shm,
protein_coding=args.protein_coding, exclude_utr=args.exclude_utr)
elif os.path.exists(index_name):
# queue_logger.info("Starting loading the indexed reference")
Expand All @@ -45,16 +55,27 @@ def prepare_index(args, queue_logger):
"I cannot delete the old index due to permission errors. I will create a temporary one instead.")
__index = tempfile.NamedTemporaryFile(suffix=".midx")
index_name = __index.name
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff,
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff, use_shm=args.shm,
protein_coding=args.protein_coding, exclude_utr=args.exclude_utr)
else:
if args.no_save_index is True:
__index = tempfile.NamedTemporaryFile(suffix=".midx", delete=False)
index_name = __index.name
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff,
create_index(args.reference, queue_logger, index_name, ref_gff=ref_gff, use_shm=args.shm,
protein_coding=args.protein_coding, exclude_utr=args.exclude_utr)
assert os.path.exists(index_name), "Index file {} should have been created but is now absent!".format(
index_name)

try:
if shm_handle is not None:
shm_handle.close()
queue_logger.info(f"Copying the database into the temporary index {shm_handle.name}")
shutil.copy(index_name, shm_handle.name)
index_name = shm_handle.name
queue_logger.info(f"Copied the database into the temporary index {shm_handle.name}")
except (PermissionError, FileNotFoundError, OSError, TypeError, ValueError) as exc:
queue_logger.error("Failed to copy the database to SHM. Using the in-disk copy.")
queue_logger.error(f"Specific exception: {exc}")

assert os.path.exists(index_name), (args.reference, index_name)
return index_name
20 changes: 13 additions & 7 deletions Mikado/scales/reference_preparation/indexing.py
@@ -1,4 +1,6 @@
import sqlite3

from ...utilities import create_shm_handle
from ...utilities.file_type import filetype
from ...exceptions import CorruptIndex
import collections
Expand Down Expand Up @@ -79,7 +81,7 @@ def load_index(args, queue_logger):

def check_index(reference, queue_logger):

if reference.endswith("midx"):
if reference.endswith(("midx", "db")):
reference = reference
else:
reference = "{}.midx".format(reference)
Expand All @@ -89,15 +91,15 @@ def check_index(reference, queue_logger):
cursor = conn.cursor()
tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()
if sorted(tables) != sorted([("positions",), ("genes",)]):
raise CorruptIndex("Invalid database file")
raise CorruptIndex(f"Invalid database file: {reference}")
gid, obj = cursor.execute("SELECT * from genes").fetchone()
try:
obj = msgpack.loads(obj, raw=False)
except TypeError:
try:
obj = json.loads(obj)
except (ValueError, TypeError, json.decoder.JSONDecodeError):
raise CorruptIndex("Corrupt index")
raise CorruptIndex(f"Corrupt index: {reference}")
raise CorruptIndex("Old index, deleting and rebuilding")

gene = Gene(None)
Expand All @@ -107,16 +109,20 @@ def check_index(reference, queue_logger):
raise CorruptIndex("Invalid value for genes, indicating a corrupt index. Deleting and rebuilding.")

except sqlite3.DatabaseError:
raise CorruptIndex("Invalid database file")
raise CorruptIndex(f"Invalid database file: {reference}")


def create_index(reference, queue_logger, index_name, ref_gff=False,
exclude_utr=False, protein_coding=False):
exclude_utr=False, protein_coding=False, use_shm=True):

"""Method to create the simple indexed database for features."""

with tempfile.NamedTemporaryFile(suffix=".db") as temp_db:
queue_logger.info("Starting to create an index for %s", reference.name)
use_shm = use_shm and os.path.exists("/dev/shm") and os.access("/dev/shm", os.W_OK)
queue_logger.info(f"Using /dev/shm: {use_shm}")
with tempfile.NamedTemporaryFile(suffix=".db",
dir=tempfile.tempdir if use_shm is False else "/dev/shm") as temp_db:
queue_logger.info(f"Starting to create an index for {reference.name} using "
f"{temp_db.name} as temporary database.")
if os.path.exists("{0}.midx".format(reference.name)):
queue_logger.warning("Removing the old index")
try:
Expand Down
8 changes: 4 additions & 4 deletions Mikado/serializers/orf.py
Expand Up @@ -414,12 +414,12 @@ def __serialize_multiple_threads(self):
self.query_cache[current_query.query_name] = current_query.query_id
current_query = current_query.query_id
else:
self.logger.critical(
"The provided ORFs do not match the transcripts provided and already present in the database.\
exc = "The provided ORFs do not match the transcripts provided and already present in the database.\
This could be due to having called the ORFs on a FASTA file different from `mikado_prepared.fasta`, the output of \
mikado prepare. If this is the case, please use mikado_prepared.fasta to call the ORFs and then restart \
`mikado serialise` using them as input.")
raise InvalidSerialization
`mikado serialise` using them as input."
self.logger.critical(exc)
raise InvalidSerialization(exc)

loaded_obj["query_id"] = current_query
objects.append(loaded_obj)
Expand Down
5 changes: 5 additions & 0 deletions Mikado/subprograms/compare.py
Expand Up @@ -57,6 +57,11 @@ def get_procs(arg):
action="store_true",
help="""Flag. If set, compare will stop after
having generated the GFF index for the reference.""")
shm = parser.add_mutually_exclusive_group()
shm.add_argument("--no-shm", action="store_false", default=False, dest="shm",
help="Switch off /dev/shm usage.")
shm.add_argument("--shm", action="store_true", default=False, dest="shm",
help="Switch on /dev/shm usage.")
parser.add_argument('--distance', type=int, default=2000,
help='''Maximum distance for a transcript to be considered
a polymerase run-on. Default: %(default)s''')
Expand Down
10 changes: 7 additions & 3 deletions Mikado/subprograms/pick.py
Expand Up @@ -71,7 +71,7 @@ def _set_pick_run_options(conf: Union[DaijinConfiguration, MikadoConfiguration],
conf.pick.run_options.exclude_cds = True if args.no_cds is True else conf.pick.run_options.exclude_cds
conf.pick.run_options.intron_range = tuple(sorted(args.intron_range)) if args.intron_range is not None \
else conf.pick.run_options.intron_range
conf.pick.run_options.shm = True if args.shm is not None else conf.pick.run_options.shm
conf.pick.run_options.shm = args.shm if args.shm is not None else conf.pick.run_options.shm
if args.only_reference_update is True:
conf.pick.run_options.only_reference_update = True
conf.pick.run_options.reference_update = True
Expand Down Expand Up @@ -276,9 +276,13 @@ def pick_parser():
parser.add_argument("--start-method", dest="start_method",
choices=["fork", "spawn", "forkserver"],
default=None, help="Multiprocessing start method.")
parser.add_argument("--shm", default=False, action="store_true",
help="Flag. If switched, Mikado pick will copy the database to RAM (ie SHM) for faster access \
shm = parser.add_mutually_exclusive_group()
shm.add_argument("--shm", default=None, action="store_true", dest="shm",
help="Flag. If switched, Mikado pick will copy the database to RAM (ie SHM) for faster access \
during the run.")
shm.add_argument("--no-shm", default=None, action="store_false", dest="shm",
help="Flag. If switched, Mikado will force using the database on location instead of "
"copying it to /dev/shm for faster access.")
parser.add_argument("-p", "--procs", type=int, default=None,
help="""Number of processors to use. \
Default: look in the configuration file (1 if undefined)""")
Expand Down
9 changes: 7 additions & 2 deletions Mikado/subprograms/prepare.py
Expand Up @@ -10,6 +10,9 @@
import argparse
import logging
import logging.handlers

from Mikado import version

from ._utils import check_log_settings_and_create_logger
from ..configuration import MikadoConfiguration, DaijinConfiguration, parse_list_file
from ..configuration.configurator import load_and_validate_config
Expand Down Expand Up @@ -170,8 +173,10 @@ def setup(args, logger=None) -> (argparse.Namespace, Union[MikadoConfiguration,

mikado_config, logger = check_log_settings_and_create_logger(mikado_config, args.log, args.log_level,
section="prepare")
logger.info("Command line: %s", " ".join(sys.argv))
logger.info("Random seed: %s", mikado_config.seed)

logger.info(f"Mikado version: {version.__version__}")
logger.info(f"Command line: {' '.join(sys.argv)}")
logger.info(f"Random seed: {mikado_config.seed}")

mikado_config.prepare.files.out = os.path.basename(mikado_config.prepare.files.out)
if getattr(args, "out") not in (None, False):
Expand Down

0 comments on commit 40f887b

Please sign in to comment.