Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new File breaks cloud execution in bcl_demultiplex #5612

Open
2 tasks done
k1sauce opened this issue May 15, 2024 · 6 comments · May be fixed by #5720 or #5781
Open
2 tasks done

new File breaks cloud execution in bcl_demultiplex #5612

k1sauce opened this issue May 15, 2024 · 6 comments · May be fixed by #5720 or #5781
Labels
bug Something isn't working

Comments

@k1sauce
Copy link
Contributor

k1sauce commented May 15, 2024

Have you checked the docs?

Description of the bug

This bug relates to the bcl_demultiplex sub workflow.

new File will not work with cloud storage like s3, see nf-core/tools#354 for reference, need to use file instead.

Command used and terminal output

No response

Relevant files

No response

System information

No response

@glichtenstein
Copy link
Contributor

glichtenstein commented May 16, 2024

@k1sauce @Aratz @edmundmiller Hey, how are you doing?

An idea to solve this issue is to have a parameter called --log_skipped_fastqs so that the user can decide if the file gets created or not.

nextflow run glichtenstein/demultiplex -r 3806b6a \
  --input nf-core-samplesheet.csv \
  --outdir /scratch/output \
  -profile docker \
  -work-dir /scratch/workdir \
  --log_skipped_fastqs=false

if false, the file named skipped_fastqs.log wont be created, if set to true (the default) the file is created in the outdir path.
The invalid fastqs will be skipped regardless, so that the workflow can continue with falco.

what do you think? I tested it and opened two pull requests for the demultiplex.nf and the module counterpart:

nf-core/modules/pull/5638

nf-core/demultiplex/pull/190

@glichtenstein
Copy link
Contributor

glichtenstein commented May 24, 2024

@k1sauce how are you doing? would it be possible to ask you ro run a test with the branch I am working on, to check if it fixes the s3 issue? The idea behind it, is that one can set a flag: --log_skipped_fastqs=false so that the log file is not created. The empty fastqs will still be skipped so the workflow can complete, but the log file containing a list of invalid fastqs wont be created in the --outdir path. Currently, I cannot test in a cloud aws s3 environment. I've left you an example in the PR nf-core/demultiplex#190, you may run it using the latest commit revision id (fffb21d). I.e.,

Testing

true
nextflow run glichtenstein/demultiplex -r 7d9538e --input nf-core-samplesheet.csv --outdir /data/scratch/iseq-DI/output -profile docker -work-dir /data/scratch/iseq-DI/workdir -resume --skip_tools fastp --log_empty_fastqs=true
false
nextflow run glichtenstein/demultiplex -r 7d9538e --input nf-core-samplesheet.csv --outdir /data/scratch/iseq-DI/output -profile docker -work-dir /data/scratch/iseq-DI/workdir -resume --skip_tools fastp --log_empty_fastqs=false

@k1sauce
Copy link
Contributor Author

k1sauce commented May 24, 2024

@glichtenstein Thanks working on this. I spent some time thinking more about this too and I think I would like to tackle it a different way, here are my thoughts:

  1. I do not think we should be writing a log file in the sub workflow. I think it does not fit with the idea of a sub workflow. As a user, I think you expect everything that gets written in a sub workflow to come from a process publishDir. Also I think anything that is generated by the sub workflow would usually be available via the workflow emit. That way the user can decide what they want to do with those emits.
  2. I think the bool in the meta variable to signal to downstream applications that the file size is 0 is not necessary either (sorry for suggesting it). We should probably just pass along all files and let the downstream consumers of the channel implement their own filters. This makes sense too when you mix/merge the downstream FASTQ channels and want to filter on the file size. The alternative of calculating the files size in the sub workflow will lead to code duplication across the demux sub workflows.
  3. Perhaps this is a different issue, but I also do not approve of the way the read group parsing is done, as it will stage all the files on the head node. I think that if read group parsing is done at all it should be done in a process. Running things on that head node means that all the FASTQ files need to be staged. That becomes a bottle neck when I have 1000s+ of FASTQ files that all need to be staged to the same node. (Also this code is duplicated across the demux sub workflows)

So that being said I also have a branch that I am testing out some changes on that would address this, if you are in agreement I can open a PR after a bit more development and we can review that one instead?

@k1sauce
Copy link
Contributor Author

k1sauce commented May 24, 2024

@glichtenstein I don't want to get ahead of myself but I am thinking of something simple like this


//
// Demultiplex Illumina BCL data using bcl-convert or bcl2fastq
//

include { BCLCONVERT } from "../../../modules/nf-core/bclconvert/main"
include { BCL2FASTQ  } from "../../../modules/nf-core/bcl2fastq/main"

workflow BCL_DEMULTIPLEX {
    take:
        ch_flowcell     // [[id:"", lane:""],samplesheet.csv, path/to/bcl/files]
        demultiplexer   // bclconvert or bcl2fastq

    main:
        ch_versions      = Channel.empty()
        ch_fastq         = Channel.empty()
        ch_reports       = Channel.empty()
        ch_stats         = Channel.empty()
        ch_interop       = Channel.empty()

        // Split flowcells into separate channels containing run as tar and run as path
        // https://nextflow.slack.com/archives/C02T98A23U7/p1650963988498929
        ch_flowcell
            .branch { meta, samplesheet, run ->
                tar: run.toString().endsWith(".tar.gz")
                dir: true
            }.set { ch_flowcells }

        ch_flowcells.tar
            .multiMap { meta, samplesheet, run ->
                samplesheets: [ meta, samplesheet ]
                run_dirs: [ meta, run ]
            }.set { ch_flowcells_tar }

        // Runs when run_dir is a tar archive
        // Re-join the metadata and the untarred run directory with the samplesheet
        ch_flowcells_tar_merged = ch_flowcells_tar
                                    .samplesheets
                                    .join( ch_flowcells_tar.run_dirs )

        // Merge the two channels back together
        ch_flowcells = ch_flowcells.dir.mix(ch_flowcells_tar_merged)

        // MODULE: bclconvert
        // Demultiplex the bcl files
        if (demultiplexer == "bclconvert") {
            BCLCONVERT( ch_flowcells )
            ch_fastq    = ch_fastq.mix(BCLCONVERT.out.fastq)
            ch_interop  = ch_interop.mix(BCLCONVERT.out.interop)
            ch_reports  = ch_reports.mix(BCLCONVERT.out.reports)
            ch_versions = ch_versions.mix(BCLCONVERT.out.versions)
        }

        // MODULE: bcl2fastq
        // Demultiplex the bcl files
        if (demultiplexer == "bcl2fastq") {
            BCL2FASTQ( ch_flowcells )
            ch_fastq    = ch_fastq.mix(BCL2FASTQ.out.fastq)
            ch_interop  = ch_interop.mix(BCL2FASTQ.out.interop)
            ch_reports  = ch_reports.mix(BCL2FASTQ.out.reports)
            ch_stats    = ch_stats.mix(BCL2FASTQ.out.stats)
            ch_versions = ch_versions.mix(BCL2FASTQ.out.versions)
        }

        // Generate meta for each fastq
        ch_fastq_with_meta = ch_fastq
            .map{ fc_meta, fastq -> 
                def meta = [
                    "id": fastq.getSimpleName().toString() - ~/_R[0-9]_001.*$/,
                    "samplename": fastq.getSimpleName().toString() - ~/_S[0-9]+.*$/,
                    "fcid": fc_meta.id,
                    "lane": fc_meta.lane,
                    "empty": (fastq.size() == 0)
                ]
                [meta, fastq] 
            }
            .groupTuple(by: [0])
            .map { meta, fastq -> // Add meta.single_end
                meta.single_end = fastq.size() == 1
                return [meta, fastq.flatten()]
            }

    emit:
        fastq    = ch_fastq_with_meta
        reports  = ch_reports
        stats    = ch_stats
        interop  = ch_interop
        versions = ch_versions
}

@k1sauce k1sauce linked a pull request May 29, 2024 that will close this issue
@k1sauce k1sauce linked a pull request May 29, 2024 that will close this issue
@k1sauce
Copy link
Contributor Author

k1sauce commented May 29, 2024

@glichtenstein Ok how does this look #5720

If it's ok with you I think this may be the right approach. Then we can add a filter on file size before Falco in the demultiplex workflow.

@glichtenstein
Copy link
Contributor

glichtenstein commented Jun 9, 2024

@k1sauce, how are you? I've work in this proposal to fix the new File() issue in PR 5781, using file() as suggested. If posible it needs to be tested in s3, as I do not have access to that environment. #5781. If this fixes it, we can look to refactor demultiplex.nf and look at the file staging issues in another branch if you would like.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: No status
2 participants