Skip to content

Commit

Permalink
Improve efficiency of corncob analysis (#40)
Browse files Browse the repository at this point in the history
* Corncob is now run across multiple shards
* Added the wald statistic to corncob and betta outputs
* Set the maximum pickle protocol to 4 for backwards compatibility with python < 3.8
* Added validation of corncob output, including pickle protocol 4 and python 3.7
  • Loading branch information
sminot committed Jun 25, 2020
1 parent 7a3ea19 commit 7a6d929
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 23 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/test.yaml
Expand Up @@ -29,7 +29,10 @@ jobs:
NXF_VER=20.04.1 nextflow run run_corncob.nf -c nextflow.config -profile testing --input_hdf output/geneshot.results.hdf5 --input_folder output/ --output_folder output_label2 --output_prefix geneshot_label2 --formula "label2" -w work/ -with-docker ubuntu:latest
- name: Validate results
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob
- name: Validate results (python 3.7)
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:py37 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob
no_preprocessing:
runs-on: ubuntu-latest
Expand All @@ -53,7 +56,7 @@ jobs:
NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.5 -w work/ --noannot -with-docker ubuntu:latest --nopreprocess
- name: Validate results
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob
add_formula:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -86,7 +89,7 @@ jobs:
NXF_VER=20.04.1 nextflow run run_corncob.nf -c nextflow.config -profile testing --input_hdf output_with_annotations/geneshot.results.hdf5 --input_folder output/ --output_folder output_with_corncob --output_prefix geneshot --formula "label1,label2" -w work/ -with-docker ubuntu:latest
- name: Validate results
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output_with_corncob/geneshot.label1_label2.corncob.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --check-corncob
no_assembly:
runs-on: ubuntu-latest
Expand All @@ -110,7 +113,7 @@ jobs:
NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.5 -w work/ --noannot -with-docker ubuntu:latest --gene_fasta data/genes.fasta.2.gz
- name: Validate results
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly --check-corncob
eggnog:
runs-on: ubuntu-latest
Expand All @@ -137,7 +140,7 @@ jobs:
NXF_VER=20.04.1 nextflow run main.nf -c nextflow.config -profile testing --manifest data/mock.manifest.csv --output output --hg_index data/hg_chr_21_bwa_index.tar.gz --formula "label1 + label2" --distance_threshold 0.15 -w work/ -with-docker ubuntu:latest --gene_fasta data/genes.fasta.2.gz --eggnog_dmnd eggnog_proteins.dmnd --eggnog_db eggnog.db --taxonomic_dmnd false
- name: Validate results
run: |
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly
docker run --rm -v $PWD:/share quay.io/fhcrc-microbiome/python-pandas:v1.0.3 python3 /share/bin/validate_geneshot_output.py --results-hdf /share/output/geneshot.results.hdf5 --details-hdf /share/output/geneshot.details.hdf5 --skip-assembly --check-corncob
composition:
runs-on: ubuntu-latest
Expand Down
16 changes: 16 additions & 0 deletions bin/add_corncob_results.py
Expand Up @@ -5,6 +5,8 @@
from scipy import stats
from statsmodels.stats.multitest import multipletests
import sys
import pickle
pickle.HIGHEST_PROTOCOL = 4

# Set the path to the HDF
hdf_fp = sys.argv[1]
Expand Down Expand Up @@ -42,6 +44,12 @@ def read_corncob_results(fdr_method=fdr_method, corncob_csv=corncob_csv):
q_value = multipletests(corncob_wide.p_value.fillna(1), 0.2, fdr_method)[1]
)

# Add the wald metric
corncob_wide = corncob_wide.assign(
wald=corncob_wide["estimate"] / corncob_wide["std_error"]
)


return corncob_wide


Expand Down Expand Up @@ -156,6 +164,14 @@ def write_corncob_by_annot(corncob_wide, gene_annot, col_name_list, fp_out):
batch = []

for i in df:

# Skip any annotations which are shared by >= 50k CAGs
# This is done entirely for performance reasons
if i.shape[0] >= 50000:
print("Skipping the annotation %s -- too many (%d) CAGs found" % (i["label"].values[0], i.shape[0]))
continue

# Add this label to the batch and continue
batch.append(i)
batch_size += i.shape[0]

Expand Down
24 changes: 20 additions & 4 deletions bin/validate_geneshot_output.py
Expand Up @@ -17,7 +17,8 @@
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

def validate_results_hdf(results_hdf):

def validate_results_hdf(results_hdf, check_corncob = False):
"""Validate that the results HDF has all expected data."""

for key_name in [
Expand All @@ -41,6 +42,15 @@ def validate_results_hdf(results_hdf):
# Report errors with empty rows
assert df.shape[0] > 0, "{} in {} has 0 rows".format(key_name, results_hdf)

if check_corncob:

# Read the results from running corncob
df = read_table_from_hdf(results_hdf, "/stats/cag/corncob")

# Report errors with empty rows
assert df.shape[0] > 0, "{} in {} has 0 rows".format(key_name, results_hdf)


# Some tables should be indexed -- let's check for that
for key_name, where_str in [
("/abund/cag/wide", "CAG == 0"),
Expand Down Expand Up @@ -86,14 +96,14 @@ def read_table_from_hdf(hdf_fp, key_name, **kwargs):
return pd.read_hdf(store, key_name, **kwargs)


def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False):
def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False, check_corncob = False):
"""Validate that the geneshot outputs have all expected data."""
assert os.path.exists(results_hdf)
assert os.path.exists(details_hdf)


# Validate the results HDF
validate_results_hdf(results_hdf)
validate_results_hdf(results_hdf, check_corncob=check_corncob)

# Read the manifest from the results HDF
manifest_df = read_table_from_hdf(results_hdf, "/manifest")
Expand All @@ -119,11 +129,17 @@ def validate_geneshot_output(results_hdf, details_hdf, skip_assembly = False):
help = "If specified, skip the detailed assembly information",
action = "store_true"
)
parser.add_argument(
"--check-corncob",
help = "If specified, check for corncob output",
action = "store_true"
)

args = parser.parse_args()

validate_geneshot_output(
args.results_hdf,
args.details_hdf,
skip_assembly = args.skip_assembly
skip_assembly = args.skip_assembly,
check_corncob = args.check_corncob
)
10 changes: 7 additions & 3 deletions main.nf
Expand Up @@ -67,6 +67,7 @@ params.linkage_type = "average"
// Statistical analysis options
params.formula = false
params.fdr_method = "fdr_bh"
params.corncob_batches = 10

// Compositional analysis options
params.composition = false
Expand Down Expand Up @@ -132,6 +133,7 @@ def helpMessage() {
For Statistical Analysis:
--formula Optional formula used to estimate associations with CAG relative abundance
--fdr_method FDR method used to calculate q-values for associations (default: 'fdr_bh')
--corncob_batches Number of parallel processes to use processing each formula
For Compositional Analysis:
--composition When included, metaPhlAn2 will be run on all specimens
Expand Down Expand Up @@ -255,11 +257,13 @@ include alignment_wf from './modules/alignment' params(
// Import the workflows used for statistical analysis
include validation_wf from './modules/statistics' params(
output_folder: output_folder,
formula: params.formula
formula: params.formula,
corncob_batches: params.corncob_batches
)
include corncob_wf from './modules/statistics' params(
output_folder: output_folder,
formula: params.formula
formula: params.formula,
corncob_batches: params.corncob_batches
)
include runBetta from './modules/statistics'
include addBetta from './modules/statistics' params(
Expand Down Expand Up @@ -517,7 +521,7 @@ workflow {

addBetta(
addCorncobResults.out[0],
runBetta.out
runBetta.out.toSortedList()
)

resultsHDF = addBetta.out[0]
Expand Down
2 changes: 2 additions & 0 deletions modules/alignment.nf
Expand Up @@ -258,6 +258,8 @@ import os
import pandas as pd
import gzip
import json
import pickle
pickle.HIGHEST_PROTOCOL = 4
# Set up logging
logFormatter = logging.Formatter(
Expand Down
10 changes: 10 additions & 0 deletions modules/general.nf
Expand Up @@ -228,6 +228,8 @@ from sklearn.manifold import TSNE
from scipy.spatial.distance import pdist, squareform
from scipy.stats.mstats import gmean
from scipy.stats import entropy
import pickle
pickle.HIGHEST_PROTOCOL = 4
cag_csv = "${cag_csv}"
gene_abund_feather = "${gene_abund_feather}"
Expand Down Expand Up @@ -589,6 +591,8 @@ process addGeneAssembly{
import pandas as pd
import os
import pickle
pickle.HIGHEST_PROTOCOL = 4
# Count up the number of genes assembled per-specimen
n_genes_assembled_per_specimen = dict()
Expand Down Expand Up @@ -706,6 +710,8 @@ process addMetaPhlAn2Results{
#!/usr/bin/env python3
import pandas as pd
import pickle
pickle.HIGHEST_PROTOCOL = 4
# Open a connection to the HDF5
with pd.HDFStore("${results_hdf}", "a") as store:
Expand Down Expand Up @@ -759,6 +765,8 @@ process addEggnogResults {
import os
import pandas as pd
import pickle
pickle.HIGHEST_PROTOCOL = 4
# Get the list of files with eggNOG results
eggnog_csv_list = [
Expand Down Expand Up @@ -1010,6 +1018,8 @@ process addTaxResults {
import os
import pandas as pd
import pickle
pickle.HIGHEST_PROTOCOL = 4
diamond_tax_csv_list = [
fp
Expand Down
79 changes: 69 additions & 10 deletions modules/statistics.nf
Expand Up @@ -16,8 +16,12 @@ workflow validation_wf {
manifest_csv
)

splitCorncob(
mockData.out
)

runCorncob(
mockData.out,
splitCorncob.out.flatten(),
manifest_csv,
formula_ch
)
Expand Down Expand Up @@ -51,8 +55,12 @@ workflow corncob_wf {
cag_csv
)

splitCorncob(
extractCounts.out
)

runCorncob(
extractCounts.out,
splitCorncob.out.flatten(),
manifest_csv,
formula_ch
)
Expand All @@ -67,7 +75,7 @@ workflow corncob_wf {

process mockData {
tag "Simulate dataset for validation"
container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3"
container "${container__pandas}"
label 'io_limited'

input:
Expand Down Expand Up @@ -122,7 +130,7 @@ df.reset_index(

process validateFormula {
tag "Validate user-provided formula"
container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3"
container "${container__pandas}"
label 'io_limited'

input:
Expand Down Expand Up @@ -165,7 +173,7 @@ for cag_id, cag_df in df.groupby("CAG"):
// and so it needs to have access to those integer values
process extractCounts {
tag "Make CAG ~ sample read-count matrix"
container "quay.io/fhcrc-microbiome/python-pandas:v1.0.3"
container "${container__pandas}"
label 'mem_veryhigh'
publishDir "${params.output_folder}/abund/", mode: "copy"

Expand Down Expand Up @@ -280,17 +288,62 @@ logging.info("Done")
"""
}

process splitCorncob {
container "${container__pandas}"
label "mem_veryhigh"
errorStrategy "retry"

input:
file readcounts_csv_gz

output:
file "readcounts.*.csv.gz"

"""#!/usr/bin/env python3
import pandas as pd
# Read in the entire set of readcounts
print("Reading in ${readcounts_csv_gz}")
df = pd.read_csv("${readcounts_csv_gz}")
print("Read in %d rows and %d columns" % (df.shape[0], df.shape[1]))
# Write out shards of the data
for shard_ix in range(${params.corncob_batches}):
# Get the list of CAGs to write out
cag_list = [
n
for ix, n in enumerate(df.columns.values)
if ix % ${params.corncob_batches} == shard_ix or n in ["specimen", "total"]
]
# Skip if there are too few CAGs for this shard
if len(cag_list) <= 2:
print("Skipping shard %d, too few CAGs to populate" % shard_ix)
continue
# Write out the shard
print("Writing out %d columns to shard %d" % (len(cag_list), shard_ix))
df.reindex(columns=cag_list).to_csv("readcounts.%d.csv.gz" % shard_ix, index=None)
print("Done with shard %d" % shard_ix)
print("Done with all shards")
"""
}

// Extract the counts table from the results HDF5
process runCorncob {
tag "Perform statistical analysis"
container "quay.io/fhcrc-microbiome/corncob"
label "mem_veryhigh"
label "mem_medium"
errorStrategy "retry"

input:
file readcounts_csv_gz
file metadata_csv
val formula
each formula

output:
file "corncob.results.csv"
Expand Down Expand Up @@ -346,9 +399,8 @@ print("Merging total counts with metadata")
total_and_meta <- metadata %>%
left_join(total_counts, by = c("specimen" = "specimen"))
#### Run the analysis for every individual CAG
print(sprintf("Starting to process %s columns (CAGs)", dim(counts)[2]))
#### Run the analysis for every individual CAG (in this shard)
print(sprintf("Starting to process %s columns (CAGs)", length(c(2:(dim(counts)[2] - 1)))))
corn_tib <- do.call(rbind, mclapply(
c(2:(dim(counts)[2] - 1)),
function(i){
Expand Down Expand Up @@ -679,6 +731,8 @@ process addBetta{
import os
import pandas as pd
from statsmodels.stats.multitest import multipletests
import pickle
pickle.HIGHEST_PROTOCOL = 4
betta_csv_list = "${betta_csv_list}".split(" ")
Expand Down Expand Up @@ -710,6 +764,11 @@ if df.shape[0] > 1:
)[1]
)
# Add the wald
df = df.assign(
wald = df["estimate"] / df["std_error"],
)
# Open a connection to the HDF5
with pd.HDFStore("${results_hdf}", "a") as store:
Expand Down

0 comments on commit 7a6d929

Please sign in to comment.