Skip to content

Commit

Permalink
Always extract the readcounts per CAG
Browse files Browse the repository at this point in the history
Not just if the user provides a formula. Addresses #56
  • Loading branch information
Sam Minot committed Jan 26, 2021
1 parent b966b47 commit f55edaa
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 130 deletions.
10 changes: 8 additions & 2 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ include {
writeManifest;
combineReads;
readTaxonomy;
extractCounts;
addEggnogResults;
addCorncobResults;
addTaxResults;
Expand Down Expand Up @@ -406,6 +407,12 @@ workflow {

}

// Extract the readcounts for all CAGs
extractCounts(
alignment_wf.out.famli_json_list,
makeCAGs.out[0],
)

// ########################
// # STATISTICAL ANALYSIS #
// ########################
Expand All @@ -421,8 +428,7 @@ workflow {
// Calculate the association of individual CAGs with user-provided features
if ( params.formula ) {
corncob_wf(
alignment_wf.out.famli_json_list,
alignment_wf.out.cag_csv,
extractCounts.out,
file(params.manifest),
formula_ch
)
Expand Down
122 changes: 122 additions & 0 deletions modules/general.nf
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,128 @@ with pd.HDFStore("${params.output_prefix}.results.hdf5", "w") as store:

}


// Extract a CAG-level counts table from the FAMLI JSON outputs
// Corncob takes the absolute number of reads from each sample into account
// and so it needs to have access to those integer values
process extractCounts {
tag "Make CAG ~ sample read-count matrix"
container "${container__pandas}"
label 'mem_veryhigh'
publishDir "${params.output_folder}/abund/", mode: "copy"

input:
file famli_json_list
file cag_csv

output:
file "CAG.readcounts.csv.gz"


"""
#!/usr/bin/env python3
from collections import defaultdict
import gzip
import json
import logging
import os
import pandas as pd
# Set up logging
logFormatter = logging.Formatter(
'%(asctime)s %(levelname)-8s [extractCounts] %(message)s'
)
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.INFO)
# Write logs to STDOUT
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
# Read in the CAG assignment for each gene
logging.info("Reading in CAG assignments")
cags = pd.read_csv(
"${cag_csv}"
).set_index(
"gene"
)["CAG"]
# Make an object to hold the number of reads per CAG, per sample
cag_counts = dict()
# Read in each of the FAMLI output objects
for fp in "${famli_json_list}".split(" "):
logging.info("Processing %s" % fp)
# Get the sample name
assert fp.endswith(".json.gz"), fp
sample_name = fp[:-len(".json.gz")]
# Make sure the file was staged correctly
assert os.path.exists(fp), "%s not found" % fp
# Add the counts
cag_counts[sample_name] = defaultdict(int)
for i in json.load(
gzip.open(
fp,
"rt"
)
):
# Add the value in 'nreads' to the CAG assigned to this gene
cag_counts[
sample_name
][
cags[
i[
"id"
]
]
] += i[
"nreads"
]
# Format as a DataFrame
logging.info("Making a DataFrame")
cag_counts = pd.DataFrame(
cag_counts
).fillna(
0
).applymap(
int
)
# Transform so that samples are rows
cag_counts = cag_counts.T
# Add a "total" column
cag_counts["total"] = cag_counts.sum(axis=1)
# Reset the index and add a name indicating that the rows
# correspond to specimens from the manifest
cag_counts = cag_counts.reset_index(
).rename(
columns=dict([("index", "specimen")])
)
# Save to a file
logging.info("Writing to disk")
cag_counts.to_csv(
"CAG.readcounts.csv.gz",
compression="gzip",
index=None
)
logging.info("Done")
"""
}


process addCorncobResults{
tag "Add statistical analysis to HDF"
container "${container__pandas}"
Expand Down
129 changes: 2 additions & 127 deletions modules/statistics.nf
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,14 @@ workflow validation_wf {
// Workflow to run corncob on the actual data
workflow corncob_wf {
take:
famli_json_list
cag_csv
countsCSV
manifest_csv
formula_ch

main:

extractCounts(
famli_json_list,
cag_csv
)

splitCorncob(
extractCounts.out
countsCSV
)

runCorncob(
Expand Down Expand Up @@ -168,125 +162,6 @@ for cag_id, cag_df in df.groupby("CAG"):

}

// Extract a CAG-level counts table from the FAMLI JSON outputs
// Corncob takes the absolute number of reads from each sample into account
// and so it needs to have access to those integer values
process extractCounts {
tag "Make CAG ~ sample read-count matrix"
container "${container__pandas}"
label 'mem_veryhigh'
publishDir "${params.output_folder}/abund/", mode: "copy"

input:
file famli_json_list
file cag_csv

output:
file "CAG.readcounts.csv.gz"


"""
#!/usr/bin/env python3
from collections import defaultdict
import gzip
import json
import logging
import os
import pandas as pd
# Set up logging
logFormatter = logging.Formatter(
'%(asctime)s %(levelname)-8s [extractCounts] %(message)s'
)
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.INFO)
# Write logs to STDOUT
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
# Read in the CAG assignment for each gene
logging.info("Reading in CAG assignments")
cags = pd.read_csv(
"${cag_csv}"
).set_index(
"gene"
)["CAG"]
# Make an object to hold the number of reads per CAG, per sample
cag_counts = dict()
# Read in each of the FAMLI output objects
for fp in "${famli_json_list}".split(" "):
logging.info("Processing %s" % fp)
# Get the sample name
assert fp.endswith(".json.gz"), fp
sample_name = fp[:-len(".json.gz")]
# Make sure the file was staged correctly
assert os.path.exists(fp), "%s not found" % fp
# Add the counts
cag_counts[sample_name] = defaultdict(int)
for i in json.load(
gzip.open(
fp,
"rt"
)
):
# Add the value in 'nreads' to the CAG assigned to this gene
cag_counts[
sample_name
][
cags[
i[
"id"
]
]
] += i[
"nreads"
]
# Format as a DataFrame
logging.info("Making a DataFrame")
cag_counts = pd.DataFrame(
cag_counts
).fillna(
0
).applymap(
int
)
# Transform so that samples are rows
cag_counts = cag_counts.T
# Add a "total" column
cag_counts["total"] = cag_counts.sum(axis=1)
# Reset the index and add a name indicating that the rows
# correspond to specimens from the manifest
cag_counts = cag_counts.reset_index(
).rename(
columns=dict([("index", "specimen")])
)
# Save to a file
logging.info("Writing to disk")
cag_counts.to_csv(
"CAG.readcounts.csv.gz",
compression="gzip",
index=None
)
logging.info("Done")
"""
}

process splitCorncob {
container "${container__pandas}"
Expand Down
2 changes: 1 addition & 1 deletion run_corncob.nf
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ include {
splitCorncob;
runCorncob;
joinCorncob;
extractCounts;
runBetta;
addBetta;
} from './modules/statistics' params(
Expand All @@ -76,6 +75,7 @@ include {

// Import the process used to add corncob results to the output
include {
extractCounts;
repackHDF;
addCorncobResults;
} from './modules/general' params(
Expand Down

0 comments on commit f55edaa

Please sign in to comment.