diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2b934b5..67e2d22 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -29,7 +29,10 @@ jobs: NXF_VER=20.04.1 nextflow run run_corncob.nf -c nextflow.config -profile testing --input_hdf output/geneshot.results.hdf5 --input_folder output/ --output_folder output_label2 --output_prefix geneshot_label2 --formula "label2" -w work/ -with-docker ubuntu:latest - name: Validate results run: | - docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob + - name: Validate results (python 3.7) + run: | + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:py37 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob no_preprocessing: runs-on: ubuntu-latest @@ -53,7 +56,7 @@ jobs: NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.5 -w work/ --noannot -with-docker ubuntu:latest --nopreprocess - name: Validate results run: | - docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob add_formula: runs-on: ubuntu-latest @@ -86,7 +89,7 @@ jobs: NXF_VER=20.04.1 nextflow run run_corncob.nf -c nextflow.config -profile testing --input_hdf output_with_annotations/geneshot.results.hdf5 --input_folder output/ --output_folder output_with_corncob --output_prefix geneshot --formula "label1,label2" -w work/ -with-docker ubuntu:latest - name: Validate results run: | - docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output_with_corncob/geneshot.label1_label2.corncob.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob no_assembly: runs-on: ubuntu-latest @@ -110,7 +113,7 @@ jobs: NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.5 -w work/ --noannot -with-docker ubuntu:latest --gene_fasta data/genes.fasta.2.gz - name: Validate results run: | - docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly --check-corncob eggnog: runs-on: ubuntu-latest @@ -137,7 +140,7 @@ jobs: NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.15 -w work/ -with-docker ubuntu:latest --gene_fasta data/genes.fasta.2.gz --eggnog_dmnd eggnog_proteins.dmnd --eggnog_db eggnog.db --taxonomic_dmnd false - name: Validate results run: | - docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly + docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly --check-corncob composition: runs-on: ubuntu-latest diff --git a/bin/add_corncob_results.py b/bin/add_corncob_results.py index c55df25..f11ea94 100755 --- a/bin/add_corncob_results.py +++ b/bin/add_corncob_results.py @@ -5,6 +5,8 @@ from scipy import stats from statsmodels.stats.multitest import multipletests import sys +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Set the path to the HDF hdf_fp = sys.argv[1] @@ -42,6 +44,12 @@ def read_corncob_results(fdr_method=fdr_method, corncob_csv=corncob_csv): q_value = multipletests(corncob_wide.p_value.fillna(1), 0.2, fdr_method)[1] ) + # Add the wald metric + corncob_wide = corncob_wide.assign( + wald=corncob_wide["estimate"] / corncob_wide["std_error"] + ) + + return corncob_wide @@ -156,6 +164,14 @@ def write_corncob_by_annot(corncob_wide, gene_annot, col_name_list, fp_out): batch = [] for i in df: + + # Skip any annotations which are shared by >= 50k CAGs + # This is done entirely for performance reasons + if i.shape[0] >= 50000: + print("Skipping the annotation %s -- too many (%d) CAGs found" % (i["label"].values[0], i.shape[0])) + continue + + # Add this label to the batch and continue batch.append(i) batch_size += i.shape[0] diff --git a/bin/validate_geneshot_output.py b/bin/validate_geneshot_output.py index 3339732..9b4cca5 100644 --- a/bin/validate_geneshot_output.py +++ b/bin/validate_geneshot_output.py @@ -17,7 +17,8 @@ consoleHandler.setFormatter(logFormatter) rootLogger.addHandler(consoleHandler) -def validate_results_hdf(results_hdf): + +def validate_results_hdf(results_hdf, check_corncob = False): """Validate that the results HDF has all expected data.""" for key_name in [ @@ -41,6 +42,15 @@ def validate_results_hdf(results_hdf): # Report errors with empty rows assert df.shape[0] > 0, "{} in {} has 0 rows".format(key_name, results_hdf) + if check_corncob: + + # Read the results from running corncob + df = read_table_from_hdf(results_hdf, "/stats/cag/corncob") + + # Report errors with empty rows + assert df.shape[0] > 0, "{} in {} has 0 rows".format(key_name, results_hdf) + + # Some tables should be indexed -- let's check for that for key_name, where_str in [ ("/abund/cag/wide", "CAG == 0"), @@ -86,14 +96,14 @@ def read_table_from_hdf(hdf_fp, key_name, **kwargs): return pd.read_hdf(store, key_name, **kwargs) -def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False): +def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False, check_corncob = False): """Validate that the geneshot outputs have all expected data.""" assert os.path.exists(results_hdf) assert os.path.exists(details_hdf) # Validate the results HDF - validate_results_hdf(results_hdf) + validate_results_hdf(results_hdf, check_corncob=check_corncob) # Read the manifest from the results HDF manifest_df = read_table_from_hdf(results_hdf, "/manifest") @@ -119,11 +129,17 @@ def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False): help = "If specified, skip the detailed assembly information", action = "store_true" ) + parser.add_argument( + "--check-corncob", + help = "If specified, check for corncob output", + action = "store_true" + ) args = parser.parse_args() validate_geneshot_output( args.results_hdf, args.details_hdf, - skip_assembly = args.skip_assembly + skip_assembly = args.skip_assembly, + check_corncob = args.check_corncob ) diff --git a/main.nf b/main.nf index 09bfa08..1356429 100755 --- a/main.nf +++ b/main.nf @@ -67,6 +67,7 @@ params.linkage_type = "average" // Statistical analysis options params.formula = false params.fdr_method = "fdr_bh" +params.corncob_batches = 10 // Compositional analysis options params.composition = false @@ -132,6 +133,7 @@ def helpMessage() { For Statistical Analysis: --formula Optional formula used to estimate associations with CAG relative abundance --fdr_method FDR method used to calculate q-values for associations (default: 'fdr_bh') + --corncob_batches Number of parallel processes to use processing each formula For Compositional Analysis: --composition When included, metaPhlAn2 will be run on all specimens @@ -255,11 +257,13 @@ include alignment_wf from './modules/alignment' params( // Import the workflows used for statistical analysis include validation_wf from './modules/statistics' params( output_folder: output_folder, - formula: params.formula + formula: params.formula, + corncob_batches: params.corncob_batches ) include corncob_wf from './modules/statistics' params( output_folder: output_folder, - formula: params.formula + formula: params.formula, + corncob_batches: params.corncob_batches ) include runBetta from './modules/statistics' include addBetta from './modules/statistics' params( @@ -517,7 +521,7 @@ workflow { addBetta( addCorncobResults.out[0], - runBetta.out + runBetta.out.toSortedList() ) resultsHDF = addBetta.out[0] diff --git a/modules/alignment.nf b/modules/alignment.nf index 24c0208..5b27669 100644 --- a/modules/alignment.nf +++ b/modules/alignment.nf @@ -258,6 +258,8 @@ import os import pandas as pd import gzip import json +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Set up logging logFormatter = logging.Formatter( diff --git a/modules/general.nf b/modules/general.nf index ef3640b..d88e0e4 100644 --- a/modules/general.nf +++ b/modules/general.nf @@ -228,6 +228,8 @@ from sklearn.manifold import TSNE from scipy.spatial.distance import pdist, squareform from scipy.stats.mstats import gmean from scipy.stats import entropy +import pickle +pickle.HIGHEST_PROTOCOL = 4 cag_csv = "${cag_csv}" gene_abund_feather = "${gene_abund_feather}" @@ -589,6 +591,8 @@ process addGeneAssembly{ import pandas as pd import os +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Count up the number of genes assembled per-specimen n_genes_assembled_per_specimen = dict() @@ -706,6 +710,8 @@ process addMetaPhlAn2Results{ #!/usr/bin/env python3 import pandas as pd +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Open a connection to the HDF5 with pd.HDFStore("${results_hdf}", "a") as store: @@ -759,6 +765,8 @@ process addEggnogResults { import os import pandas as pd +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Get the list of files with eggNOG results eggnog_csv_list = [ @@ -1010,6 +1018,8 @@ process addTaxResults { import os import pandas as pd +import pickle +pickle.HIGHEST_PROTOCOL = 4 diamond_tax_csv_list = [ fp diff --git a/modules/statistics.nf b/modules/statistics.nf index 6f54b8c..59bceb8 100644 --- a/modules/statistics.nf +++ b/modules/statistics.nf @@ -16,8 +16,12 @@ workflow validation_wf { manifest_csv ) + splitCorncob( + mockData.out + ) + runCorncob( - mockData.out, + splitCorncob.out.flatten(), manifest_csv, formula_ch ) @@ -51,8 +55,12 @@ workflow corncob_wf { cag_csv ) + splitCorncob( + extractCounts.out + ) + runCorncob( - extractCounts.out, + splitCorncob.out.flatten(), manifest_csv, formula_ch ) @@ -67,7 +75,7 @@ workflow corncob_wf { process mockData { tag "Simulate dataset for validation" - container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3" + container "${container__pandas}" label 'io_limited' input: @@ -122,7 +130,7 @@ df.reset_index( process validateFormula { tag "Validate user-provided formula" - container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3" + container "${container__pandas}" label 'io_limited' input: @@ -165,7 +173,7 @@ for cag_id, cag_df in df.groupby("CAG"): // and so it needs to have access to those integer values process extractCounts { tag "Make CAG ~ sample read-count matrix" - container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3" + container "${container__pandas}" label 'mem_veryhigh' publishDir "${params.output_folder}/abund/", mode: "copy" @@ -280,17 +288,62 @@ logging.info("Done") """ } +process splitCorncob { + container "${container__pandas}" + label "mem_veryhigh" + errorStrategy "retry" + + input: + file readcounts_csv_gz + + output: + file "readcounts.*.csv.gz" + +"""#!/usr/bin/env python3 + +import pandas as pd + +# Read in the entire set of readcounts +print("Reading in ${readcounts_csv_gz}") +df = pd.read_csv("${readcounts_csv_gz}") +print("Read in %d rows and %d columns" % (df.shape[0], df.shape[1])) + +# Write out shards of the data +for shard_ix in range(${params.corncob_batches}): + + # Get the list of CAGs to write out + cag_list = [ + n + for ix, n in enumerate(df.columns.values) + if ix % ${params.corncob_batches} == shard_ix or n in ["specimen", "total"] + ] + + # Skip if there are too few CAGs for this shard + if len(cag_list) <= 2: + print("Skipping shard %d, too few CAGs to populate" % shard_ix) + continue + + # Write out the shard + print("Writing out %d columns to shard %d" % (len(cag_list), shard_ix)) + + df.reindex(columns=cag_list).to_csv("readcounts.%d.csv.gz" % shard_ix, index=None) + print("Done with shard %d" % shard_ix) + +print("Done with all shards") +""" +} + // Extract the counts table from the results HDF5 process runCorncob { tag "Perform statistical analysis" container "quay.io/fhcrc-microbiome/corncob" - label "mem_veryhigh" + label "mem_medium" errorStrategy "retry" input: file readcounts_csv_gz file metadata_csv - val formula + each formula output: file "corncob.results.csv" @@ -346,9 +399,8 @@ print("Merging total counts with metadata") total_and_meta <- metadata %>% left_join(total_counts, by = c("specimen" = "specimen")) - -#### Run the analysis for every individual CAG -print(sprintf("Starting to process %s columns (CAGs)", dim(counts)[2])) +#### Run the analysis for every individual CAG (in this shard) +print(sprintf("Starting to process %s columns (CAGs)", length(c(2:(dim(counts)[2] - 1))))) corn_tib <- do.call(rbind, mclapply( c(2:(dim(counts)[2] - 1)), function(i){ @@ -679,6 +731,8 @@ process addBetta{ import os import pandas as pd from statsmodels.stats.multitest import multipletests +import pickle +pickle.HIGHEST_PROTOCOL = 4 betta_csv_list = "${betta_csv_list}".split(" ") @@ -710,6 +764,11 @@ if df.shape[0] > 1: )[1] ) + # Add the wald + df = df.assign( + wald = df["estimate"] / df["std_error"], + ) + # Open a connection to the HDF5 with pd.HDFStore("${results_hdf}", "a") as store: diff --git a/run_corncob.nf b/run_corncob.nf index 8c90eb1..51e10c5 100644 --- a/run_corncob.nf +++ b/run_corncob.nf @@ -18,6 +18,7 @@ params.output_folder = false params.output_prefix = false params.formula = false params.fdr_method = "fdr_bh" +params.corncob_batches = 10 params.help = false // Docker containers @@ -44,6 +45,7 @@ def helpMessage() { For Statistical Analysis: --formula Optional formula used to estimate associations with CAG relative abundance --fdr_method FDR method used to calculate q-values for associations (default: 'fdr_bh') + --corncob_batches Number of parallel processes to use processing each formula """.stripIndent() } @@ -56,6 +58,11 @@ if (params.help || params.input_hdf == false || params.input_folder == false || exit 0 } +// Import the process used to shard CAGs for corncob +include splitCorncob from './modules/statistics' params( + corncob_batches: params.corncob_batches +) + // Import the process used for statistical analysis include runCorncob from './modules/statistics' params( formula: params.formula @@ -109,6 +116,8 @@ process updateFormula{ import pandas as pd import shutil +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Come up with a new name for the output file formula_name = "${params.formula}".replace( @@ -211,9 +220,13 @@ workflow { params.formula.split(",") ) + splitCorncob( + input_csv + ) + // Now run corncob on the extracted manifest, as well as the gene counts table runCorncob( - input_csv, + splitCorncob.out.flatten(), updateFormula.out[1], formula_ch ) diff --git a/update_manifest.nf b/update_manifest.nf index f50c986..7c28c1c 100644 --- a/update_manifest.nf +++ b/update_manifest.nf @@ -70,6 +70,8 @@ process replaceManifest { import pandas as pd import shutil +import pickle +pickle.HIGHEST_PROTOCOL = 4 # Read in the new manifest new_manifest = pd.read_csv("${manifest_csv}", sep=",")