Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of aggregating file with associated metadata #40

Open
nick-youngblut opened this issue Jan 16, 2023 · 2 comments
Open

Example of aggregating file with associated metadata #40

nick-youngblut opened this issue Jan 16, 2023 · 2 comments

Comments

@nick-youngblut
Copy link

Currently, there is no example of aggregating files AND associated metadata. For instance, in many/most nf-core pipelines the process outputs are something like:

output:
tuple val(meta), path("file.txt")

...but what if one wants to then aggregate all of the file.txt outputs into one table AND include the meta metadata in that output table?

As far as I can tell from scouring the nextflow slack channel, one must "embed" the metadata in the file paths and then parse the file paths in the aggregation step. For example:

Per-file process:

output:
tuple val(meta), path("${meta}.txt")

Aggregation process:

input:
path("*")

script:
"""
[somehow parse {meta} from input file path] 
"""

Is there a better way, especially given the substantial limitations of trying to embed metadata into a file path (eg., dealing with multiple values and special characters in the metadata values)?

I'm sure a lot of pipeline developers would like a best-practices example of how to deal with this situation (without having to decipher how meta is dealt with in aggregation steps of nf-core pipelines).

@nick-youngblut
Copy link
Author

As another example, what is the best practice for aggregating multiple sets of files. For example:

per-file job:

input:
tuple val(sample_id), path("file.txt")

output:
path "${sample_id}_output1.txt", emit: output1
path "${sample_id}_output2.txt", emit: output2

For one output channel, one could just do the following:

process SUMMARY{
  input:
  path ("*")

  output:
  path "summary.txt"

  script:
  """
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  """

...but for multiple output channels, the following doesn't work:

process SUMMARY{
  input:
  path ("*")
  path ("*"

  output:
  path "summary.txt"

  script:
  """
  # for output1:
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  # for output2:
  [somehow get {sample_id} from the file path, and then aggregate the files, while including {sample_id}]
  """

Must one create a SUMMARY process for each set of files (each path channel) emitted from the per-file process (e.g., process SUMMARY_OUTPUT1 and process SUMMARY_OUTPUT2), or is there a better, more scalable & maintainable approach?

@nick-youngblut
Copy link
Author

nick-youngblut commented Jan 18, 2023

Assuming that one must provide metadata values via the file paths, when aggregating multiple files in Nextflow, I created the following script to aggregate files (assumes structured table files) and extracts values from the input file names (e.g., the sample name from files named "{sample_name}.tsv") and adds those extracted values as extra columns in the aggregated output table. I just used the same python code as used in snakemake for the same purpose.

concat_tables.py
#!/usr/bin/env python
from __future__ import print_function
import os
import re
import sys
import argparse
import logging
import collections
from itertools import chain
from glob import glob

# logging
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.DEBUG)

# argparse
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter,
                      argparse.RawDescriptionHelpFormatter):
    pass

desc = 'Concatenate tables with the same headers'
epi = """DESCRIPTION:

Aggregate >=1 structured table. 
The columns are assumed to be all of the same among the tables.

Values can be extracted from the input file names (e.g., sample_id from sample1.txt)
and added as extra columns in the aggregated output table.

Use a glob pattern to identify >=1 input file.
Use --pattern to pull out values from the file path.
  For example, use "{sample_id}.txt" to get the sample
  id from files named "sample1.txt", "sample2.txt", etc.

Output written to STDOUT.
"""
parser = argparse.ArgumentParser(description=desc, epilog=epi,
                                 formatter_class=CustomFormatter)
parser.add_argument('glob', type=str,
                    help='Glob pattern to find the input files, Example: "*.stats.tsv"')
parser.add_argument('-p', '--pattern', type=str, default=None,
                    help='Matching pattern in file naming scheme. Example: "{sample_id}_taxid-{taxid}"')
parser.add_argument('-f', '--fields', type=str, default=None, nargs='+',
                    help='Pattern matching fields to use in output. Example: "taxid"')
parser.add_argument('-d', '--delimiter', type=str, default='\t',
                    help='Field delimiter')
parser.add_argument('-H', '--no-header', action='store_true', default=False,
                    help='No headers?')
parser.add_argument('--version', action='version', version='0.0.1')

_wildcard_regex = re.compile(
    r"""
    \{
        (?=(   # This lookahead assertion emulates an 'atomic group'
               # which is required for performance
            \s*(?P<name>\w+)                    # wildcard name
            (\s*,\s*
                (?P<constraint>                 # an optional constraint
                    ([^{}]+ | \{\d+(,\d+)?\})*  # allow curly braces to nest one level
                )                               # ...  as in '{w,a{3,5}}'
            )?\s*
        ))\1
    \}
    """,
    re.VERBOSE,
)

def regex(filepattern):
    """
    # Function taken from snakemake repo
    """
    f = []
    last = 0
    wildcards = set()
    for match in _wildcard_regex.finditer(filepattern):
        f.append(re.escape(filepattern[last : match.start()]))
        wildcard = match.group("name")
        if wildcard in wildcards:
            if match.group("constraint"):
                raise ValueError(
                    "Constraint regex must be defined only in the first "
                    "occurence of the wildcard in a string."
                )
            f.append("(?P={})".format(wildcard))
        else:
            wildcards.add(wildcard)
            f.append(
                "(?P<{}>{})".format(
                    wildcard,
                    match.group("constraint") if match.group("constraint") else ".+",
                )
            )
        last = match.end()
    f.append(re.escape(filepattern[last:]))
    f.append("$")  # ensure that the match spans the whole file
    return "".join(f)

def glob_wildcards(pattern, files=None, followlinks=False):
    """
    # Function taken from snakemake repo
    Glob the values of the wildcards by matching the given pattern to the filesystem.
    Returns a named tuple with a list of values for each wildcard.
    """
    pattern = os.path.normpath(pattern)
    first_wildcard = re.search("{[^{]", pattern)
    dirname = (
        os.path.dirname(pattern[: first_wildcard.start()])
        if first_wildcard
        else os.path.dirname(pattern)
    )
    if not dirname:
        dirname = "."

    names = [match.group("name") for match in _wildcard_regex.finditer(pattern)]
    Wildcards = collections.namedtuple("Wildcards", names)
    wildcards = Wildcards(*[list() for name in names])

    pattern = re.compile(regex(pattern))

    if files is None:
        files = (
            os.path.normpath(os.path.join(dirpath, f))
            for dirpath, dirnames, filenames in os.walk(
                dirname, followlinks=followlinks
            )
            for f in chain(filenames, dirnames)
        )

    for f in files:
        match = re.match(pattern, f)
        if match:
            for name, value in match.groupdict().items():
                getattr(wildcards, name).append(value)
    return wildcards

def main(args):
    # listing files
    infiles = glob(args.glob)
    logging.info(f'No. of files found: {len(infiles)}')

    # reading
    header_written = False
    for F in infiles:
        # file path pattern matching
        if args.pattern is not None:
            wcs = glob_wildcards(args.pattern, files=[F])
        else:
            wcs = None
        ## select just certain fields
        if args.fields is not None:
            wcs_fields = args.fields
        else:
            wcs_fields = list(wcs._fields)
        ## pattern didn't match?
        if (wcs is not None and
            any([len(getattr(wcs, x)) == 0 for x in wcs_fields])):
                logging.warning(f'>=1 pattern field is empty for file: {F}')
                wcs = None
        # reading file
        with open(F) as inF:
            logging.info(f'Reading file: {F}')
            skip_cnt = 0
            for i,line in enumerate(inF):
                # parsing
                line = line.rstrip().split(args.delimiter)
                # skipping lines
                if line[0] == '' or line[0].startswith('#'):
                    skip_cnt += 1
                    continue
                # header
                if not args.no_header and i - skip_cnt == 0:
                    if header_written:
                        continue
                    header_written = True
                    # adding extra fields
                    if wcs is not None:
                        line = wcs_fields + line
                else:
                    # adding extra fields
                    if wcs is not None:
                        line = [getattr(wcs, x)[0] for x in wcs_fields] + line
                # printing line to STDOUT
                print(args.delimiter.join(line))

## script main
if __name__ == '__main__':
    args = parser.parse_args()
    main(args)

Is this the "best" way of aggregating files and associated file metadata?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant