From 1954b1229cb9a83c9d4ae880df053e86e1bcddb4 Mon Sep 17 00:00:00 2001 From: Sam Minot Date: Sun, 19 Jul 2020 08:23:57 -0700 Subject: [PATCH] Refactor assemble abundances Having attempted to run the previous version of the pipeline on much larger datasets, it's clear that assembling a single table with all gene-level abundances is not computationally tractable. The only impact of these changes on the output of the pipeline is that the /abund/gene/wide table is no longer present (neither is the matching feather file). In practice, the execution of the rest of the pipeline is unchanged. --- bin/validate_geneshot_output.py | 1 - local_tests/test.sh | 23 ++-- main.nf | 5 - modules/alignment.nf | 187 ++++++++++++++++---------------- modules/general.nf | 29 +---- modules/make_cags.nf | 164 +++++++++++++++++++++------- 6 files changed, 235 insertions(+), 174 deletions(-) diff --git a/bin/validate_geneshot_output.py b/bin/validate_geneshot_output.py index 9b4cca5..c2bd213 100644 --- a/bin/validate_geneshot_output.py +++ b/bin/validate_geneshot_output.py @@ -27,7 +27,6 @@ def validate_results_hdf(results_hdf, check_corncob = False): "/annot/gene/cag", "/annot/gene/all", "/annot/cag/all", - "/abund/gene/wide", "/abund/cag/wide", "/ordination/pca", "/ordination/tsne", diff --git a/local_tests/test.sh b/local_tests/test.sh index c0a9de2..b577fae 100755 --- a/local_tests/test.sh +++ b/local_tests/test.sh @@ -12,21 +12,22 @@ set -e # 3. Run this script # Test with preprocessing and a formula -NXF_VER=19.10.0 nextflow run main.nf \ +NXF_VER=20.04.1 nextflow run main.nf \ -c nextflow.config \ -profile testing \ - --manifest data/mock.manifest.2.csv \ + --manifest data/mock.manifest.csv \ --nopreprocess \ - --output output0 \ + --output output \ --hg_index data/hg_chr_21_bwa_index.tar.gz \ --formula "label1 + label2" \ --distance_threshold 0.5 \ -w work/ \ --noannot \ - -resume + -resume \ + -with-docker ubuntu:20.04 # # Test with preprocessing and a formula -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --manifest data/mock.manifest.csv \ @@ -41,7 +42,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # -resume # # Test with preprocessing and no formula -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --manifest data/mock.manifest.csv \ @@ -54,7 +55,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # -resume # # Test with formula and no preprocessing -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --nopreprocess \ @@ -68,7 +69,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # -resume # # Test with no formula and no preprocessing -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --nopreprocess \ @@ -80,7 +81,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # -resume # # Test with the gene catalog made in a previous round -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --gene_fasta output1/ref/genes.fasta.gz \ @@ -94,7 +95,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # # Test with the gene catalog made in a previous round and whole genome alignment -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --gene_fasta output1/ref/genes.fasta.gz \ @@ -111,7 +112,7 @@ NXF_VER=19.10.0 nextflow run main.nf \ # -resume # # Test with de novo assembly and whole genome alignment -# NXF_VER=19.10.0 nextflow run main.nf \ +# NXF_VER=20.04.1 nextflow run main.nf \ # -c nextflow.config \ # -profile testing \ # --nopreprocess \ diff --git a/main.nf b/main.nf index 1356429..a196655 100755 --- a/main.nf +++ b/main.nf @@ -399,10 +399,6 @@ workflow { combineReads.out, params.output_prefix ) - // Publish the gene abundance feather file - publishGeneAbundances( - alignment_wf.out.gene_abund_feather - ) // ######################## // # STATISTICAL ANALYSIS # @@ -439,7 +435,6 @@ workflow { collectAbundances( alignment_wf.out.cag_csv, - alignment_wf.out.gene_abund_feather, alignment_wf.out.cag_abund_feather, countReadsSummary.out, manifest_file, diff --git a/modules/alignment.nf b/modules/alignment.nf index 5b27669..5d4dbe9 100644 --- a/modules/alignment.nf +++ b/modules/alignment.nf @@ -77,53 +77,52 @@ workflow alignment_wf { // Group shards of genes into Co-Abundant Gene Groups (CAGs) makeInitialCAGs( - assembleAbundances.out[0], - assembleAbundances.out[1].flatten() + assembleAbundances.out[2], + assembleAbundances.out[0].flatten() ) // Perform multiple rounds of combining shards to make ever-larger CAGs refineCAGs_round1( - assembleAbundances.out[0], + assembleAbundances.out[2], makeInitialCAGs.out.toSortedList().flatten().collate(2) ) refineCAGs_round2( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_round1.out.toSortedList().flatten().collate(2) ) refineCAGs_round3( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_round2.out.toSortedList().flatten().collate(2) ) refineCAGs_round4( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_round3.out.toSortedList().flatten().collate(2) ) refineCAGs_round5( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_round4.out.toSortedList().flatten().collate(2) ) // Combine the shards and make a new set of CAGs refineCAGs_final( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_round5.out.collect() ) // Calculate the relative abundance of each CAG in these samples calcCAGabund( - assembleAbundances.out[0], + assembleAbundances.out[2], refineCAGs_final.out ) emit: cag_csv = refineCAGs_final.out - gene_abund_feather = assembleAbundances.out[0] cag_abund_feather = calcCAGabund.out famli_json_list = famli.out.toSortedList() - specimen_gene_count_csv = assembleAbundances.out[2] - specimen_reads_aligned_csv = assembleAbundances.out[5] - detailed_hdf = assembleAbundances.out[3] - gene_length_csv = assembleAbundances.out[4] + specimen_gene_count_csv = assembleAbundances.out[1] + specimen_reads_aligned_csv = assembleAbundances.out[4] + detailed_hdf = assembleAbundances.out[2] + gene_length_csv = assembleAbundances.out[3] } // Align each sample against the reference database of genes using DIAMOND @@ -241,7 +240,6 @@ process assembleAbundances { val output_prefix output: - file "gene.abund.feather" file "gene_list.*.csv.gz" file "specimen_gene_count.csv.gz" file "${output_prefix}.details.hdf5" @@ -282,10 +280,11 @@ def read_json(fp): # Parse the file list sample_jsons = "${sample_jsons}".split(" ") -logging.info("Getting ready to read in data for %d sample" % len(sample_jsons)) +logging.info("Getting ready to read in data for %d samples" % len(sample_jsons)) + +# Keep track of the complete set of sample names observed in these samples +all_sample_names = set([]) -# All of the abundances will go into a single dict -all_abund = dict() # Also keep track of the complete set of gene names observed in these samples all_gene_names = set([]) @@ -298,6 +297,9 @@ gene_length_dict = dict() # Keep track of the number of reads aligned per sample specimen_reads_aligned = dict() +# Keep track of the number of genes detected per sample +specimen_genes_detected = dict() + # Iterate over the list of files for fp in sample_jsons: # Get the sample name from the file name @@ -311,11 +313,11 @@ for fp in sample_jsons: logging.info("Saving to HDF5") df.to_hdf(store, "/abund/gene/long/%s" % sample_name) - logging.info("Saving depth in memory") - all_abund[sample_name] = df.set_index("id")["depth"].to_dict() + # Add the sample name to the total list + all_sample_names.add(sample_name) # Add the gene names to the total list - for gene_name in all_abund[sample_name]: + for gene_name in df["id"].values: all_gene_names.add(gene_name) # Add the gene lengths @@ -325,56 +327,24 @@ for fp in sample_jsons: # Add in the number of reads aligned specimen_reads_aligned[sample_name] = df["nreads"].sum() + # Add in the number of genes detected + specimen_genes_detected[sample_name] = df.shape[0] + store.close() # Serialize sample and gene names -sample_names = list(all_abund.keys()) +sample_names = list(all_sample_names) all_gene_names = list(all_gene_names) -# Now make a DataFrame for all of this data -df = pd.DataFrame( - np.zeros((len(all_gene_names), len(sample_names)), dtype=np.float32), - columns=sample_names, - index=all_gene_names -) - -# Add all of the data to the DataFrame -for sample_name, sample_abund in all_abund.items(): - # Format as a Pandas Series - sample_abund = pd.Series( - sample_abund - ) - - # Divide by the sum to get the proportional abundance - sample_abund = sample_abund / sample_abund.sum() - - # Add the values to the table - df.values[ - :, sample_names.index(sample_name) - ] = sample_abund.reindex( - index=all_gene_names - ).fillna( - 0 - ).apply( - np.float32 - ).values - # Write out the number of genes detected per sample pd.DataFrame( dict( - [ - ( - "n_genes_aligned", - (df > 0).sum() - ) - ] + n_genes_aligned = specimen_genes_detected ) ).reset_index( ).rename( columns = dict( - [ - ("index", "specimen") - ] + index = "specimen" ) ).to_csv( "specimen_gene_count.csv.gz", @@ -385,19 +355,12 @@ pd.DataFrame( # Write out the number of reads aligned per sample pd.DataFrame( dict( - [ - ( - "n_reads_aligned", - specimen_reads_aligned - ) - ] + n_reads_aligned = specimen_reads_aligned ) ).reset_index( ).rename( columns = dict( - [ - ("index", "specimen") - ] + index = "specimen" ) ).to_csv( "specimen_reads_aligned.csv.gz", @@ -407,7 +370,10 @@ pd.DataFrame( # Write out the CSV table with the length of each gene pd.DataFrame([ - dict([("gene", gene), ("length", length)]) + dict( + gene = gene, + length = length + ) for gene, length in gene_length_dict.items() ]).to_csv( "gene_length.csv.gz", @@ -415,11 +381,6 @@ pd.DataFrame([ compression = "gzip" ) -# Write out the abundances to a feather file -logging.info("Writing to disk") -df.reset_index(inplace=True) -df.to_feather("gene.abund.feather") - # Write out the gene names in batches of ${params.cag_batchsize} for ix, gene_list in enumerate([ all_gene_names[ix: (ix + ${cag_batchsize})] @@ -445,7 +406,7 @@ process calcCAGabund { publishDir "${params.output_folder}/abund/", mode: "copy" input: - path gene_feather + path details_hdf path cag_csv_gz output: @@ -454,7 +415,8 @@ process calcCAGabund { """ #!/usr/bin/env python3 -import feather +from collections import defaultdict +import os import pandas as pd import logging @@ -470,30 +432,67 @@ consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) rootLogger.addHandler(consoleHandler) -# Read in the table of CAGs -cags_df = pd.read_csv( +# Read in the dictionary linking genes and CAGs +cags_dict = pd.read_csv( "${cag_csv_gz}", compression="gzip" -) - -# Read in the table of gene abundances -abund_df = pd.read_feather( - "${gene_feather}" ).set_index( - "index" -) + "gene" +)[ + "CAG" +] -# Annotate each gene with the CAG it was assigned to -abund_df["CAG"] = cags_df.set_index("gene")["CAG"] +# Set the file path to the HDF5 with gene abundances +details_hdf = "${details_hdf}" -# Make sure that every gene was assigned to a CAG -assert abund_df["CAG"].isnull().sum() == 0 +# Make sure the files exist +assert os.path.exists(details_hdf), details_hdf -# Now sum up the gene relative abundance by CAGs -# and write out to a feather file -abund_df.groupby( - "CAG" -).sum( +# Read in the table of gene abundances +abund_df = defaultdict(lambda: defaultdict(float)) + +# Open the HDF5 store +with pd.HDFStore(details_hdf, "r") as store: + + # Iterate over keys in the store + for k in store: + + # Sample abundances have a certain prefix + if k.startswith("/abund/gene/long/"): + + # Parse the sample name + sample_name = k.replace("/abund/gene/long/", "") + logging.info("Reading gene abundances for {}".format(sample_name)) + + # Read the depth of sequencing for each gene + sample_depth = pd.read_hdf( + store, + k + ).set_index( + "id" + )[ + "depth" + ] + + # Normalize to sum to 1 + sample_depth = sample_depth / sample_depth.sum() + + # Add the abundance of each gene to its linked CAG + for gene_name, gene_prop in sample_depth.items(): + abund_df[ + sample_name + ][ + cags_dict[ + gene_name + ] + ] += gene_prop + +# Now write out to a feather file +logging.info("Building a single DataFrame of CAG abundances") +pd.DataFrame( + abund_df +).fillna( + 0 ).reset_index( ).to_feather( "CAG.abund.feather" diff --git a/modules/general.nf b/modules/general.nf index d88e0e4..75e367b 100644 --- a/modules/general.nf +++ b/modules/general.nf @@ -204,7 +204,6 @@ process collectAbundances{ input: path cag_csv - path gene_abund_feather path cag_abund_feather path readcount_csv path manifest_csv @@ -232,7 +231,6 @@ import pickle pickle.HIGHEST_PROTOCOL = 4 cag_csv = "${cag_csv}" -gene_abund_feather = "${gene_abund_feather}" cag_abund_feather = "${cag_abund_feather}" readcount_csv = "${readcount_csv}" specimen_gene_count_csv = "${specimen_gene_count_csv}" @@ -426,6 +424,8 @@ with pd.HDFStore("${params.output_prefix}.results.hdf5", "w") as store: # Read in the table with the CAG-level abundances across all samples cag_abund_df = pd.read_feather( cag_abund_feather + ).rename( + columns={"index": "CAG"} ) print( "Read in abundances for %d CAGs across %d samples" % @@ -469,21 +469,6 @@ with pd.HDFStore("${params.output_prefix}.results.hdf5", "w") as store: "/distances/%s" % metric ) - # Read in the table with the gene-level abundances across all samples - gene_abund_df = pd.read_feather( - gene_abund_feather - ) - print( - "Read in abundances for %d genes across %d samples" % - (gene_abund_df.shape[0], gene_abund_df.shape[1] - 1) - ) - # Add some descriptive statistics - summary_dict["num_genes"] = gene_abund_df.shape[0] - - - # Write to HDF5 - gene_abund_df.to_hdf(store, "/abund/gene/wide") - # Read in the table describing which genes are grouped into which CAGs # This is being called 'cag_df', but it's really a table of CAG annotations per-gene, # so there is one row per gene. @@ -494,6 +479,9 @@ with pd.HDFStore("${params.output_prefix}.results.hdf5", "w") as store: (cag_df.shape[0], cag_df["CAG"].unique().shape[0]) ) + # Save the total number of genes + summary_dict["num_genes"] = cag_df.shape[0] + # Write to HDF5 cag_df.to_hdf( store, @@ -502,15 +490,8 @@ with pd.HDFStore("${params.output_prefix}.results.hdf5", "w") as store: data_columns = ["gene", "CAG"] ) - # Calculate prevalence and abundance information for each gene - gene_abund_df.set_index("index", inplace=True) - gene_abundance = gene_abund_df.mean(axis=1) - gene_prevalence = (gene_abund_df > 0).mean(axis=1) - # Add that information on the gene abundance and prevalence to this gene summary table cag_df = cag_df.assign( - prevalence = cag_df["gene"].apply(gene_prevalence.get), - abundance = cag_df["gene"].apply(gene_abundance.get), length = cag_df["gene"].apply(gene_length_df.get) ) diff --git a/modules/make_cags.nf b/modules/make_cags.nf index 2abe4ea..513d4ad 100644 --- a/modules/make_cags.nf +++ b/modules/make_cags.nf @@ -1,4 +1,4 @@ -container__find_cags = "quay.io/fhcrc-microbiome/find-cags:v0.12.1" +container__find_cags = "quay.io/fhcrc-microbiome/find-cags:v0.12.2" // Default options params.distance_threshold = 0.5 @@ -13,7 +13,7 @@ process makeInitialCAGs { errorStrategy 'retry' input: - path gene_feather + path details_hdf path gene_list_csv output: @@ -22,7 +22,6 @@ process makeInitialCAGs { """ #!/usr/bin/env python3 -import feather import gzip import json import logging @@ -51,37 +50,73 @@ rootLogger.addHandler(consoleHandler) threads = int("${task.cpus}") pool = Pool(threads) -# Set the file path to the feather with gene abundances -gene_feather = "${gene_feather}" +# Set the file path to the HDF5 with gene abundances +details_hdf = "${details_hdf}" # Set the file path to the genes for this subset gene_list_csv = "${gene_list_csv}" # Make sure the files exist -assert os.path.exists(gene_feather), gene_feather +assert os.path.exists(details_hdf), details_hdf assert os.path.exists(gene_list_csv), gene_list_csv -logging.info("Reading in gene abundances from %s" % gene_feather) -df = feather.read_dataframe( - gene_feather -).set_index("index").applymap( - np.float16 -) - logging.info("Reading in the list of genes for this shard from %s" % (gene_list_csv)) -gene_list = [ +gene_list = set([ line.rstrip("\\n") for line in gzip.open(gene_list_csv, "rt") -] +]) logging.info("This shard contains %d genes" % (len(gene_list))) -# Subset the gene abundances -logging.info("Making sure that the gene names match between the feather and CSV") -n_mismatch = len(set(gene_list) - set(df.index.values)) -msg = "Gene names do not match between the feather and CSV (n= %d / %d)" % (n_mismatch, len(gene_list)) -assert n_mismatch == 0, msg -df = df.reindex(index=gene_list) -logging.info("Subset down to %d genes for this shard" % (len(gene_list))) +logging.info("Reading in gene abundances from %s" % details_hdf) +df = {} +# Open the HDF5 store +with pd.HDFStore(details_hdf, "r") as store: + + # Iterate over keys in the store + for k in store: + + # Sample abundances have a certain prefix + if k.startswith("/abund/gene/long/"): + + # Parse the sample name + sample_name = k.replace("/abund/gene/long/", "") + logging.info("Reading gene abundances for {}".format(sample_name)) + + # Read the depth of sequencing for each gene + sample_depth = pd.read_hdf( + store, + k + ).set_index( + "id" + )[ + "depth" + ] + + # Normalize to sum to 1 + sample_depth = sample_depth / sample_depth.sum() + + # Subset down to the genes in this list + genes_in_sample = list( + set(sample_depth.index.values) & gene_list + ) + + # Add genes from this list to the table + if len(genes_in_sample) > 0: + df[ + sample_name + ] = sample_depth.reindex( + index = genes_in_sample + ) + +# Make the DataFrame +logging.info("Formatting DataFrame") +df = pd.DataFrame( + df +).fillna( + 0 +).applymap( + np.float16 +) max_dist = float("${params.distance_threshold}") logging.info("Maximum cosine distance: %s" % max_dist) @@ -162,7 +197,7 @@ process refineCAGs { errorStrategy 'retry' input: - path gene_feather + path details_hdf path "shard.CAG.*.csv.gz" output: @@ -171,7 +206,6 @@ process refineCAGs { """ #!/usr/bin/env python3 -import feather import gzip import json import logging @@ -200,11 +234,11 @@ rootLogger.addHandler(consoleHandler) threads = int("${task.cpus}") pool = Pool(threads) -# Set the file path to the feather with gene abundances -gene_feather = "${gene_feather}" +# Set the file path to the HDF5 with gene abundances +details_hdf = "${details_hdf}" # Make sure the files exist -assert os.path.exists(gene_feather), gene_feather +assert os.path.exists(details_hdf), details_hdf # Set the file path to the CAGs made for each subset cag_csv_list = [ @@ -219,24 +253,76 @@ for fp in cag_csv_list: assert len(cag_csv_list) > 0, "Didn't find CAGs from any previous shard" -logging.info("Reading in gene abundances from %s" % gene_feather) -df = feather.read_dataframe( - gene_feather -).set_index("index").applymap( - np.float16 -) - logging.info("Reading in CAGs from previous shard") cags = dict() ix = 0 n = 0 +gene_list = set() for fp in cag_csv_list: shard_cags = pd.read_csv(fp, compression="gzip", sep=",") - for _, gene_list in shard_cags.groupby("CAG"): - cags[ix] = gene_list['gene'].tolist() + for _, shard_df in shard_cags.groupby("CAG"): + cags[ix] = shard_df['gene'].tolist() + gene_list.update(set(cags[ix])) ix += 1 - n += gene_list.shape[0] -logging.info("Read in %d CAGs from %d shards covering %d genes" % (len(cags), len(cag_csv_list), n)) + +logging.info( + "Read in %d CAGs from %d shards covering %d genes" % ( + len(cags), + len(cag_csv_list), + len(gene_list) + ) +) + +logging.info("Reading in gene abundances from %s" % details_hdf) +df = {} +# Open the HDF5 store +with pd.HDFStore(details_hdf, "r") as store: + + # Iterate over keys in the store + for k in store: + + # Sample abundances have a certain prefix + if k.startswith("/abund/gene/long/"): + + # Parse the sample name + sample_name = k.replace("/abund/gene/long/", "") + logging.info("Reading gene abundances for {}".format(sample_name)) + + # Read the depth of sequencing for each gene + sample_depth = pd.read_hdf( + store, + k + ).set_index( + "id" + )[ + "depth" + ] + + # Normalize to sum to 1 + sample_depth = sample_depth / sample_depth.sum() + + # Subset down to the genes in this list + genes_in_sample = list( + set(sample_depth.index.values) & gene_list + ) + + # Add genes from this list to the table + if len(genes_in_sample) > 0: + df[ + sample_name + ] = sample_depth.reindex( + index = genes_in_sample + ) + +# Make the DataFrame +logging.info("Formatting DataFrame") +df = pd.DataFrame( + df +).fillna( + 0 +).applymap( + np.float16 +) max_dist = float("${params.distance_threshold}") logging.info("Maximum cosine distance: %s" % max_dist) @@ -280,4 +366,4 @@ cags_df.to_csv(fp_out, compression="gzip", index=None) logging.info("Done") """ -} \ No newline at end of file +}