Skip to content

Commit

Permalink
Refactor CAG construction using Zarr (#43)
Browse files Browse the repository at this point in the history
The overall goal of this refactor is to make the CAG construction process much more efficient. The changes implemented for that goal are:

- Saving all gene abundances in Zarr format to speed up the process of reading subsets in each shard
- Grouping genes into CAGs before constructing each DataFrame to reduce the total memory burden
- Increasing the number of shards used to initiate the CAG construction by 10X
  • Loading branch information
sminot committed Aug 5, 2020
1 parent 1954b12 commit f5368eb
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 192 deletions.
223 changes: 146 additions & 77 deletions modules/alignment.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Processes used for alignment of reads against gene databases

params.cag_batchsize = 100000
params.cag_batchsize = 10000

// Default options
params.distance_threshold = 0.5
Expand Down Expand Up @@ -77,41 +77,41 @@ workflow alignment_wf {

// Group shards of genes into Co-Abundant Gene Groups (CAGs)
makeInitialCAGs(
assembleAbundances.out[2],
assembleAbundances.out[5],
assembleAbundances.out[0].flatten()
)

// Perform multiple rounds of combining shards to make ever-larger CAGs
refineCAGs_round1(
assembleAbundances.out[2],
makeInitialCAGs.out.toSortedList().flatten().collate(2)
assembleAbundances.out[5],
makeInitialCAGs.out.toSortedList().flatten().collate(4)
)
refineCAGs_round2(
assembleAbundances.out[2],
refineCAGs_round1.out.toSortedList().flatten().collate(2)
assembleAbundances.out[5],
refineCAGs_round1.out.toSortedList().flatten().collate(4)
)
refineCAGs_round3(
assembleAbundances.out[2],
refineCAGs_round2.out.toSortedList().flatten().collate(2)
assembleAbundances.out[5],
refineCAGs_round2.out.toSortedList().flatten().collate(4)
)
refineCAGs_round4(
assembleAbundances.out[2],
refineCAGs_round3.out.toSortedList().flatten().collate(2)
assembleAbundances.out[5],
refineCAGs_round3.out.toSortedList().flatten().collate(4)
)
refineCAGs_round5(
assembleAbundances.out[2],
refineCAGs_round4.out.toSortedList().flatten().collate(2)
assembleAbundances.out[5],
refineCAGs_round4.out.toSortedList().flatten().collate(4)
)

// Combine the shards and make a new set of CAGs
refineCAGs_final(
assembleAbundances.out[2],
assembleAbundances.out[5],
refineCAGs_round5.out.collect()
)

// Calculate the relative abundance of each CAG in these samples
calcCAGabund(
assembleAbundances.out[2],
assembleAbundances.out[5],
refineCAGs_final.out
)

Expand All @@ -128,7 +128,7 @@ workflow alignment_wf {
// Align each sample against the reference database of genes using DIAMOND
process diamondDB {
tag "Make a DIAMOND database"
container "quay.io/fhcrc-microbiome/famli@sha256:25c34c73964f06653234dd7804c3cf5d9cf520bc063723e856dae8b16ba74b0c"
container "quay.io/fhcrc-microbiome/famli:v1.5"
label 'mem_veryhigh'
errorStrategy 'retry'
publishDir "${params.output_folder}/ref/", mode: "copy"
Expand All @@ -154,7 +154,7 @@ process diamondDB {
// Align each sample against the reference database of genes using DIAMOND
process diamond {
tag "Align to the gene catalog"
container "quay.io/fhcrc-microbiome/famli@sha256:25c34c73964f06653234dd7804c3cf5d9cf520bc063723e856dae8b16ba74b0c"
container "quay.io/fhcrc-microbiome/famli:v1.5"
label 'mem_veryhigh'
errorStrategy 'retry'

Expand Down Expand Up @@ -230,7 +230,7 @@ process famli {
// Make a single feather file with the abundance of every gene across every sample
process assembleAbundances {
tag "Make gene ~ sample abundance matrix"
container "quay.io/fhcrc-microbiome/experiment-collection@sha256:fae756a380a3d3335241b68251942a8ed0bf1ae31a33a882a430085b492e44fe"
container "quay.io/fhcrc-microbiome/experiment-collection:v0.2"
label "mem_veryhigh"
errorStrategy 'retry'

Expand All @@ -245,6 +245,7 @@ process assembleAbundances {
file "${output_prefix}.details.hdf5"
path "gene_length.csv.gz"
path "specimen_reads_aligned.csv.gz"
path "gene_abundance.zarr.tar"


"""
Expand All @@ -254,9 +255,11 @@ import logging
import numpy as np
import os
import pandas as pd
import tarfile
import gzip
import json
import pickle
import zarr
pickle.HIGHEST_PROTOCOL = 4
# Set up logging
Expand Down Expand Up @@ -390,6 +393,63 @@ for ix, gene_list in enumerate([
with gzip.open("gene_list.%d.csv.gz" % ix, "wt") as handle:
handle.write("\\n".join(gene_list))
# Write out the sequencing depth of each gene in each specimen in zarr format
z = zarr.open(
"gene_abundance.zarr",
mode="w",
shape=(len(all_gene_names), len(sample_names)),
chunks=True,
dtype='f4'
)
# Iterate over the list of files
for fp in sample_jsons:
# Get the sample name from the file name
assert fp.endswith(".json.gz")
sample_name = fp[:-len(".json.gz")]
logging.info("Reading in %s from %s" % (sample_name, fp))
df = read_json(fp)
# Calculate the proportional depth of sequencing per gene
gene_depth = df.set_index(
"id"
).reindex(
index=all_gene_names
)[
"depth"
].fillna(
0
) / df[
"depth"
].sum()
logging.info("Saving %s to gene_abundance.zarr" % sample_name)
# Save the sequencing depth to zarr
z[
:,
sample_names.index(sample_name)
] = gene_depth.values
# Write out the sample names and gene names
logging.info("Writing out sample_names.json.gz")
with gzip.open("sample_names.json.gz", "wt") as fo:
json.dump(sample_names, fo)
logging.info("Writing out gene_names.json.gz")
with gzip.open("gene_names.json.gz", "wt") as fo:
json.dump(all_gene_names, fo)
logging.info("Creating gene_abundance.zarr.tar")
with tarfile.open("gene_abundance.zarr.tar", "w") as tar:
for name in [
"gene_names.json.gz",
"sample_names.json.gz",
"gene_abundance.zarr",
]:
logging.info("Adding %s to gene_abundance.zarr.tar" % name)
tar.add(name)
logging.info("Done")
"""
Expand All @@ -400,13 +460,13 @@ logging.info("Done")
// Summarize the abundance of every CAG across each sample
process calcCAGabund {
tag "Make CAG ~ sample abundance matrix"
container "quay.io/fhcrc-microbiome/experiment-collection@sha256:fae756a380a3d3335241b68251942a8ed0bf1ae31a33a882a430085b492e44fe"
container "quay.io/fhcrc-microbiome/experiment-collection:v0.2"
label "mem_veryhigh"
errorStrategy 'retry'
publishDir "${params.output_folder}/abund/", mode: "copy"

input:
path details_hdf
path gene_abundances_zarr_tar
path cag_csv_gz

output:
Expand All @@ -416,9 +476,14 @@ process calcCAGabund {
#!/usr/bin/env python3
from collections import defaultdict
import gzip
import json
import numpy as np
import os
import pandas as pd
import logging
import tarfile
import zarr
# Set up logging
logFormatter = logging.Formatter(
Expand All @@ -433,67 +498,71 @@ consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
# Read in the dictionary linking genes and CAGs
cags_dict = pd.read_csv(
"${cag_csv_gz}",
compression="gzip"
).set_index(
"gene"
)[
"CAG"
]
# Set the file path to the HDF5 with gene abundances
details_hdf = "${details_hdf}"
# Make sure the files exist
assert os.path.exists(details_hdf), details_hdf
# Read in the table of gene abundances
abund_df = defaultdict(lambda: defaultdict(float))
# Open the HDF5 store
with pd.HDFStore(details_hdf, "r") as store:
# Iterate over keys in the store
for k in store:
# Sample abundances have a certain prefix
if k.startswith("/abund/gene/long/"):
# Parse the sample name
sample_name = k.replace("/abund/gene/long/", "")
logging.info("Reading gene abundances for {}".format(sample_name))
# Read the depth of sequencing for each gene
sample_depth = pd.read_hdf(
store,
k
).set_index(
"id"
)[
"depth"
]
# Normalize to sum to 1
sample_depth = sample_depth / sample_depth.sum()
# Add the abundance of each gene to its linked CAG
for gene_name, gene_prop in sample_depth.items():
abund_df[
sample_name
][
cags_dict[
gene_name
]
] += gene_prop
cags = {
cag_id: cag_df["gene"].tolist()
for cag_id, cag_df in pd.read_csv(
"${cag_csv_gz}",
compression="gzip"
).groupby(
"CAG"
)
}
# Extract everything from the gene_abundance.zarr.tar
logging.info("Extracting ${gene_abundances_zarr_tar}")
with tarfile.open("${gene_abundances_zarr_tar}") as tar:
tar.extractall()
# Make sure that the expected contents are present
for fp in [
"gene_names.json.gz",
"sample_names.json.gz",
"gene_abundance.zarr",
]:
logging.info("Checking that %s is present" % fp)
assert os.path.exists(fp)
# Read in the gene names and sample names as indexed in the zarr
logging.info("Reading in gene_names.json.gz")
with gzip.open("gene_names.json.gz", "rt") as handle:
gene_names = json.load(handle)
logging.info("Reading in sample_names.json.gz")
with gzip.open("sample_names.json.gz", "rt") as handle:
sample_names = json.load(handle)
# Open the zarr store
logging.info("Reading in gene abundances from gene_abundance.zarr")
z = zarr.open("gene_abundance.zarr", mode="r")
# Set up an array for CAG abundances
df = pd.DataFrame(
data = np.zeros(
(len(cags), len(sample_names)),
dtype = np.float32,
),
dtype = np.float32,
columns = sample_names,
index = list(range(len(cags))),
)
# Iterate over each sample
for sample_ix, sample_name in enumerate(sample_names):
# Read the depth of sequencing for each gene
logging.info("Reading gene abundances for %s" % sample_name)
sample_gene_depth = pd.Series(z[:, sample_ix], index=gene_names)
# Sum up the depth by CAG
df[sample_name] = pd.Series({
cag_ix: sample_gene_depth.reindex(index=cag_gene_list).sum()
for cag_ix, cag_gene_list in cags.items()
})
# Now write out to a feather file
logging.info("Building a single DataFrame of CAG abundances")
pd.DataFrame(
abund_df
).fillna(
0
).reset_index(
df.reset_index(
).to_feather(
"CAG.abund.feather"
)
Expand Down
4 changes: 2 additions & 2 deletions modules/assembly.nf
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ gzip genes.shard.*.fasta

process diamond_tax {
tag "Annotate genes by taxonomy"
container "quay.io/fhcrc-microbiome/famli@sha256:25c34c73964f06653234dd7804c3cf5d9cf520bc063723e856dae8b16ba74b0c"
container "quay.io/fhcrc-microbiome/famli:v1.5"
label 'mem_veryhigh'

input:
Expand Down Expand Up @@ -615,7 +615,7 @@ with gzip.open("genes.fasta.gz", "wt") as fo:
// Use alignment to figure out which assembled allele was grouped into which gene
process alignAlleles {
tag "Match alleles to gene centroids"
container "quay.io/fhcrc-microbiome/famli@sha256:25c34c73964f06653234dd7804c3cf5d9cf520bc063723e856dae8b16ba74b0c"
container "quay.io/fhcrc-microbiome/famli:v1.5"
label 'mem_medium'
errorStrategy 'retry'

Expand Down
4 changes: 1 addition & 3 deletions modules/general.nf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Container versions
container__fastatools = "quay.io/fhcrc-microbiome/fastatools:0.7.1__bcw.0.3.2"
container__ubuntu = "ubuntu:18.04"
container__experiment_collection = "quay.io/fhcrc-microbiome/experiment-collection@sha256:fae756a380a3d3335241b68251942a8ed0bf1ae31a33a882a430085b492e44fe"
container__experiment_collection = "quay.io/fhcrc-microbiome/experiment-collection:v0.2"
container__pandas = "quay.io/fhcrc-microbiome/python-pandas:v1.0.3"

// Default parameters
Expand Down Expand Up @@ -849,7 +849,6 @@ with pd.HDFStore("${results_hdf}", "a") as store:
store,
"/annot/gene/eggnog",
format = "fixed",
dtype=str,
)
# Write summary annotation table to HDF5
Expand All @@ -858,7 +857,6 @@ with pd.HDFStore("${results_hdf}", "a") as store:
"/annot/gene/all",
format = "table",
data_columns = ["CAG"],
dtype=str,
)
"""
Expand Down

0 comments on commit f5368eb

Please sign in to comment.