Skip to content

Commit

Permalink
Reduce memory consumption of very high-resolution merges (#408)
Browse files Browse the repository at this point in the history
* Expose mergebuf parameter to load and cload

* Build cumulative index incrementally from ondisk arrays

* Restore mergebuf to default to chunksize

* Rename maxbuf to bufsize

* Move logging statement

* Add max-merge option to cooler load

* Format string
  • Loading branch information
nvictus committed Mar 23, 2024
1 parent 5d012e2 commit a1b6cb0
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 85 deletions.
42 changes: 27 additions & 15 deletions src/cooler/cli/cload.py
Expand Up @@ -372,12 +372,6 @@ def pairix(
@click.option(
"--pos2", "-p2", help="pos2 field number (one-based)", type=int, required=True
)
@click.option(
"--chunksize",
help="Number of input lines to load at a time",
type=int,
default=int(15e6),
)
@click.option(
"--zero-based",
"-0",
Expand Down Expand Up @@ -433,6 +427,28 @@ def pairix(
# "specify at least one input field for aggregation as an alternative.",
# is_flag=True,
# default=False)
@click.option(
"--chunksize",
"-c",
help="Size in number of lines/records of data chunks to read and process "
"from the input stream at a time. These chunks will be saved as temporary "
"partial coolers and then merged.",
type=int,
default=15_000_000,
)
@click.option(
"--mergebuf",
help="Total number of pixel records to buffer per epoch of merging data. "
"Defaults to the same value as `chunksize`.",
type=int,
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge in a single pass.",
type=int,
default=200,
show_default=True,
)
@click.option(
"--temp-dir",
help="Create temporary files in a specified directory. Pass ``-`` to use "
Expand All @@ -445,13 +461,6 @@ def pairix(
is_flag=True,
default=False,
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge before invoking recursive merging",
type=int,
default=200,
show_default=True,
)
@click.option(
"--storage-options",
help="Options to modify the data filter pipeline. Provide as a "
Expand All @@ -478,12 +487,13 @@ def pairs(
cool_path,
metadata,
assembly,
chunksize,
zero_based,
comment_char,
input_copy_status,
no_symmetric_upper,
field,
chunksize,
mergebuf,
temp_dir,
no_delete_temp,
max_merge,
Expand All @@ -501,6 +511,8 @@ def pairs(
"""
chromsizes, bins = parse_bins(bins)
if mergebuf is None:
mergebuf = chunksize

symmetric_upper = not no_symmetric_upper
tril_action = None
Expand Down Expand Up @@ -639,7 +651,7 @@ def pairs(
dtypes=output_field_dtypes,
metadata=metadata,
assembly=assembly,
mergebuf=chunksize,
mergebuf=mergebuf,
max_merge=max_merge,
temp_dir=temp_dir,
delete_temp=not no_delete_temp,
Expand Down
43 changes: 30 additions & 13 deletions src/cooler/cli/load.py
Expand Up @@ -46,16 +46,6 @@
type=str,
multiple=True,
)
@click.option(
"--chunksize",
"-c",
help="Size (in number of lines/records) of data chunks to read and process "
"from the input file at a time. These chunks will be saved as "
"temporary partial Coolers and merged at the end. Also specifies the "
"size of the buffer during the merge step.",
type=int,
default=int(20e6),
)
@click.option(
"--count-as-float",
is_flag=True,
Expand Down Expand Up @@ -99,12 +89,34 @@
"distinct, use the ``--no-symmetric-upper`` option instead. ",
show_default=True,
)
@click.option(
"--chunksize",
"-c",
help="Size in number of lines/records of data chunks to read and process "
"from the input stream at a time. These chunks will be saved as temporary "
"partial coolers and then merged.",
type=int,
default=20_000_000,
)
@click.option(
"--mergebuf",
help="Total number of records to buffer per epoch of merging data. Defaults "
"to the same value as `chunksize`.",
type=int,
)
@click.option(
"--temp-dir",
help="Create temporary files in a specified directory. Pass ``-`` to use "
"the platform default temp dir.",
type=click.Path(exists=True, file_okay=False, dir_okay=True, allow_dash=True),
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge in a single pass.",
type=int,
default=200,
show_default=True,
)
@click.option(
"--no-delete-temp",
help="Do not delete temporary files when finished.",
Expand Down Expand Up @@ -133,13 +145,15 @@ def load(
format,
metadata,
assembly,
chunksize,
field,
count_as_float,
one_based,
comment_char,
input_copy_status,
no_symmetric_upper,
chunksize,
mergebuf,
max_merge,
temp_dir,
no_delete_temp,
storage_options,
Expand Down Expand Up @@ -178,6 +192,8 @@ def load(
"""
logger = get_logger(__name__)
chromsizes, bins = parse_bins(bins_path)
if mergebuf is None:
mergebuf = chunksize

symmetric_upper = not no_symmetric_upper
tril_action = None
Expand Down Expand Up @@ -334,10 +350,11 @@ def load(
dtypes=output_field_dtypes,
metadata=metadata,
assembly=assembly,
mergebuf=chunksize,
ensure_sorted=False,
mergebuf=mergebuf,
max_merge=max_merge,
temp_dir=temp_dir,
delete_temp=not no_delete_temp,
ensure_sorted=False,
# boundscheck=True,
# dupcheck=True,
triucheck=True if symmetric_upper else False,
Expand Down
131 changes: 80 additions & 51 deletions src/cooler/reduce.py
Expand Up @@ -44,68 +44,91 @@


def merge_breakpoints(
indexes: list[np.ndarray],
maxbuf: int
indexes: list[np.ndarray | h5py.Dataset],
bufsize: int
) -> tuple[np.ndarray, np.ndarray]:
"""
Partition k offset arrays for performing a k-way external merge, such that
no single merge pass loads more than ``maxbuf`` records into memory, with
one exception (see Notes).
Given ``k`` bin1_offset indexes, determine how to partition the data from
``k`` corresponding pixel tables for a k-way merge.
The paritition is a subsequence of bin1 IDs, defining the bounds of chunks
of data that will be loaded into memory from each table in a single "epoch"
of merging data. The bounds are calculated such that no single epoch will
load more than ``bufsize`` records into memory.
However, the ``bufsize`` condition is not guaranteed and a warning will be
raised if it cannot be satisfied for one or more epochs (see Notes).
Parameters
----------
indexes : sequence of 1D arrays of equal length
These offset-array indexes map non-negative integers to their offset
locations in a corresponding data table
maxbuf : int
Maximum cumulative number of records loaded into memory for a single
merge pass
indexes : sequence of 1D array-like of equal length
Offset arrays that map bin1 IDs to their offset locations in a
corresponding pixel table.
bufsize : int
Maximum number of pixel records loaded into memory in a single merge
epoch.
Returns
-------
breakpoints : 1D array
breakpoint locations to segment all the offset arrays
cum_offset : 1D array
cumulative number of records that will be processed at each breakpoint
bin1_partition : 1D array
Bin1 IDs defining where to partition all the tables for merging.
cum_nrecords : 1D array
Cumulative number of records (from all pixel tables combined) that will
be processed at each epoch.
Notes
-----
The one exception to the post-condition is if any single increment of the
indexes maps to more than ``maxbuf`` records, these will produce
oversized chunks.
The one exception to the post-condition is when a single bin1 increment in
a table contains more than ``bufsize`` records.
"""
# k = len(indexes)

# the virtual cumulative index if no pixels were merged
cumindex = np.vstack(indexes).sum(axis=0)
cum_start = 0
cum_nnz = cumindex[-1]
# n = len(cumindex)

breakpoints = [0]
cum_offsets = [0]
# This is a "virtual" cumulative index if all the tables were concatenated
# and sorted but no pixel records were aggregated. It helps us track how
# many records would be processed at each merge epoch.
# NOTE: We sum these incrementally in case the indexes are lazy to avoid
# loading all indexes into memory at once.
combined_index = np.zeros(indexes[0].shape)
for i in range(len(indexes)):
combined_index += indexes[i]
combined_start = 0
combined_nnz = combined_index[-1]

bin1_partition = [0]
cum_nrecords = [0]
lo = 0
while True:
# find the next mark
hi = bisect_right(cumindex, min(cum_start + maxbuf, cum_nnz), lo=lo) - 1
# Find the next bin1 ID from the combined index
hi = bisect_right(
combined_index,
min(combined_start + bufsize, combined_nnz),
lo=lo
) - 1

if hi == lo:
# number of records to nearest mark exceeds `maxbuf`
# check for oversized chunks afterwards
# This means number of records to nearest mark exceeds `bufsize`.
# Check for oversized chunks afterwards.
hi += 1

breakpoints.append(hi)
cum_offsets.append(cumindex[hi])
bin1_partition.append(hi)
cum_nrecords.append(combined_index[hi])

if cumindex[hi] == cum_nnz:
if combined_index[hi] == combined_nnz:
break

lo = hi
cum_start = cumindex[hi]
combined_start = combined_index[hi]

breakpoints = np.array(breakpoints)
cum_offsets = np.array(cum_offsets)
return breakpoints, cum_offsets
bin1_partition = np.array(bin1_partition)
cum_nrecords = np.array(cum_nrecords)

nrecords_per_epoch = np.diff(cum_nrecords)
n_over = (nrecords_per_epoch > bufsize).sum()
if n_over > 0:
warnings.warn(
f"{n_over} merge epochs will require buffering more than {bufsize} "
f"pixel records, with as many as {nrecords_per_epoch.max():g}."
)

return bin1_partition, cum_nrecords


class CoolerMerger(ContactBinner):
Expand All @@ -117,12 +140,12 @@ class CoolerMerger(ContactBinner):
def __init__(
self,
coolers: list[Cooler],
maxbuf: int,
mergebuf: int,
columns: list[str] | None = None,
agg: dict[str, Any] | None = None
):
self.coolers = list(coolers)
self.maxbuf = maxbuf
self.mergebuf = mergebuf
self.columns = ["count"] if columns is None else columns
self.agg = {col: "sum" for col in self.columns}
if agg is not None:
Expand All @@ -145,18 +168,23 @@ def __init__(
raise ValueError("Coolers must have same bin structure")

def __iter__(self) -> Iterator[dict[str, np.ndarray]]:
indexes = [c._load_dset("indexes/bin1_offset") for c in self.coolers]
breakpoints, cum_offsets = merge_breakpoints(indexes, self.maxbuf)
chunksizes = np.diff(cum_offsets)
if chunksizes.max() > self.maxbuf:
warnings.warn(f"Some merge passes will use more than {self.maxbuf} pixels")
# Load bin1_offset indexes lazily.
indexes = [c.open("r")["indexes/bin1_offset"] for c in self.coolers]

# Calculate the common partition of bin1 offsets that define the epochs
# of merging data.
bin1_partition, cum_nrecords = merge_breakpoints(indexes, self.mergebuf)
nrecords_per_epoch = np.diff(cum_nrecords)
logger.info(f"n_merge_epochs: {len(nrecords_per_epoch)}")
logger.debug(f"bin1_partition: {bin1_partition}")
logger.debug(f"nrecords_per_merge_epoch: {nrecords_per_epoch}")

nnzs = [len(c.pixels()) for c in self.coolers]
logger.info(f"nnzs: {nnzs}")

starts = [0] * len(self.coolers)
for bp in breakpoints[1:]:
stops = [index[bp] for index in indexes]
logger.info(f"current: {stops}")
for bin1_id in bin1_partition[1:]:
stops = [index[bin1_id] for index in indexes]

# extract, concat
combined = pd.concat(
Expand All @@ -178,6 +206,7 @@ def __iter__(self) -> Iterator[dict[str, np.ndarray]]:

yield {k: v.values for k, v in df.items()}

logger.info(f"records consumed: {stops}")
starts = stops


Expand Down Expand Up @@ -265,7 +294,7 @@ def merge_coolers(

bins = clrs[0].bins()[["chrom", "start", "end"]][:]
assembly = clrs[0].info.get("genome-assembly", None)
iterator = CoolerMerger(clrs, maxbuf=mergebuf, columns=columns, agg=agg)
iterator = CoolerMerger(clrs, mergebuf=mergebuf, columns=columns, agg=agg)

create(
output_uri,
Expand Down

0 comments on commit a1b6cb0

Please sign in to comment.