Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption of very high-resolution merges #408

Merged
merged 7 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 27 additions & 15 deletions src/cooler/cli/cload.py
Expand Up @@ -372,12 +372,6 @@ def pairix(
@click.option(
"--pos2", "-p2", help="pos2 field number (one-based)", type=int, required=True
)
@click.option(
"--chunksize",
help="Number of input lines to load at a time",
type=int,
default=int(15e6),
)
@click.option(
"--zero-based",
"-0",
Expand Down Expand Up @@ -433,6 +427,28 @@ def pairix(
# "specify at least one input field for aggregation as an alternative.",
# is_flag=True,
# default=False)
@click.option(
"--chunksize",
"-c",
help="Size in number of lines/records of data chunks to read and process "
"from the input stream at a time. These chunks will be saved as temporary "
"partial coolers and then merged.",
type=int,
default=15_000_000,
)
@click.option(
"--mergebuf",
help="Total number of pixel records to buffer per epoch of merging data. "
"Defaults to the same value as `chunksize`.",
type=int,
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge in a single pass.",
type=int,
default=200,
show_default=True,
)
@click.option(
"--temp-dir",
help="Create temporary files in a specified directory. Pass ``-`` to use "
Expand All @@ -445,13 +461,6 @@ def pairix(
is_flag=True,
default=False,
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge before invoking recursive merging",
type=int,
default=200,
show_default=True,
)
@click.option(
"--storage-options",
help="Options to modify the data filter pipeline. Provide as a "
Expand All @@ -478,12 +487,13 @@ def pairs(
cool_path,
metadata,
assembly,
chunksize,
zero_based,
comment_char,
input_copy_status,
no_symmetric_upper,
field,
chunksize,
mergebuf,
temp_dir,
no_delete_temp,
max_merge,
Expand All @@ -501,6 +511,8 @@ def pairs(

"""
chromsizes, bins = parse_bins(bins)
if mergebuf is None:
mergebuf = chunksize

symmetric_upper = not no_symmetric_upper
tril_action = None
Expand Down Expand Up @@ -639,7 +651,7 @@ def pairs(
dtypes=output_field_dtypes,
metadata=metadata,
assembly=assembly,
mergebuf=chunksize,
mergebuf=mergebuf,
max_merge=max_merge,
temp_dir=temp_dir,
delete_temp=not no_delete_temp,
Expand Down
43 changes: 30 additions & 13 deletions src/cooler/cli/load.py
Expand Up @@ -46,16 +46,6 @@
type=str,
multiple=True,
)
@click.option(
"--chunksize",
"-c",
help="Size (in number of lines/records) of data chunks to read and process "
"from the input file at a time. These chunks will be saved as "
"temporary partial Coolers and merged at the end. Also specifies the "
"size of the buffer during the merge step.",
type=int,
default=int(20e6),
)
@click.option(
"--count-as-float",
is_flag=True,
Expand Down Expand Up @@ -99,12 +89,34 @@
"distinct, use the ``--no-symmetric-upper`` option instead. ",
show_default=True,
)
@click.option(
"--chunksize",
"-c",
help="Size in number of lines/records of data chunks to read and process "
"from the input stream at a time. These chunks will be saved as temporary "
"partial coolers and then merged.",
type=int,
default=20_000_000,
)
@click.option(
"--mergebuf",
help="Total number of records to buffer per epoch of merging data. Defaults "
"to the same value as `chunksize`.",
type=int,
)
@click.option(
"--temp-dir",
help="Create temporary files in a specified directory. Pass ``-`` to use "
"the platform default temp dir.",
type=click.Path(exists=True, file_okay=False, dir_okay=True, allow_dash=True),
)
@click.option(
"--max-merge",
help="Maximum number of chunks to merge in a single pass.",
type=int,
default=200,
show_default=True,
)
@click.option(
"--no-delete-temp",
help="Do not delete temporary files when finished.",
Expand Down Expand Up @@ -133,13 +145,15 @@ def load(
format,
metadata,
assembly,
chunksize,
field,
count_as_float,
one_based,
comment_char,
input_copy_status,
no_symmetric_upper,
chunksize,
mergebuf,
max_merge,
temp_dir,
no_delete_temp,
storage_options,
Expand Down Expand Up @@ -178,6 +192,8 @@ def load(
"""
logger = get_logger(__name__)
chromsizes, bins = parse_bins(bins_path)
if mergebuf is None:
mergebuf = chunksize

symmetric_upper = not no_symmetric_upper
tril_action = None
Expand Down Expand Up @@ -334,10 +350,11 @@ def load(
dtypes=output_field_dtypes,
metadata=metadata,
assembly=assembly,
mergebuf=chunksize,
ensure_sorted=False,
mergebuf=mergebuf,
max_merge=max_merge,
temp_dir=temp_dir,
delete_temp=not no_delete_temp,
ensure_sorted=False,
# boundscheck=True,
# dupcheck=True,
triucheck=True if symmetric_upper else False,
Expand Down
131 changes: 80 additions & 51 deletions src/cooler/reduce.py
Expand Up @@ -44,68 +44,91 @@


def merge_breakpoints(
indexes: list[np.ndarray],
maxbuf: int
indexes: list[np.ndarray | h5py.Dataset],
bufsize: int
) -> tuple[np.ndarray, np.ndarray]:
"""
Partition k offset arrays for performing a k-way external merge, such that
no single merge pass loads more than ``maxbuf`` records into memory, with
one exception (see Notes).
Given ``k`` bin1_offset indexes, determine how to partition the data from
``k`` corresponding pixel tables for a k-way merge.

The paritition is a subsequence of bin1 IDs, defining the bounds of chunks
of data that will be loaded into memory from each table in a single "epoch"
of merging data. The bounds are calculated such that no single epoch will
load more than ``bufsize`` records into memory.

However, the ``bufsize`` condition is not guaranteed and a warning will be
raised if it cannot be satisfied for one or more epochs (see Notes).

Parameters
----------
indexes : sequence of 1D arrays of equal length
These offset-array indexes map non-negative integers to their offset
locations in a corresponding data table
maxbuf : int
Maximum cumulative number of records loaded into memory for a single
merge pass
indexes : sequence of 1D array-like of equal length
Offset arrays that map bin1 IDs to their offset locations in a
corresponding pixel table.
bufsize : int
Maximum number of pixel records loaded into memory in a single merge
epoch.

Returns
-------
breakpoints : 1D array
breakpoint locations to segment all the offset arrays
cum_offset : 1D array
cumulative number of records that will be processed at each breakpoint
bin1_partition : 1D array
Bin1 IDs defining where to partition all the tables for merging.
cum_nrecords : 1D array
Cumulative number of records (from all pixel tables combined) that will
be processed at each epoch.

Notes
-----
The one exception to the post-condition is if any single increment of the
indexes maps to more than ``maxbuf`` records, these will produce
oversized chunks.

The one exception to the post-condition is when a single bin1 increment in
a table contains more than ``bufsize`` records.
"""
# k = len(indexes)

# the virtual cumulative index if no pixels were merged
cumindex = np.vstack(indexes).sum(axis=0)
cum_start = 0
cum_nnz = cumindex[-1]
# n = len(cumindex)

breakpoints = [0]
cum_offsets = [0]
# This is a "virtual" cumulative index if all the tables were concatenated
# and sorted but no pixel records were aggregated. It helps us track how
# many records would be processed at each merge epoch.
# NOTE: We sum these incrementally in case the indexes are lazy to avoid
# loading all indexes into memory at once.
combined_index = np.zeros(indexes[0].shape)
for i in range(len(indexes)):
combined_index += indexes[i]
combined_start = 0
combined_nnz = combined_index[-1]

bin1_partition = [0]
cum_nrecords = [0]
lo = 0
while True:
# find the next mark
hi = bisect_right(cumindex, min(cum_start + maxbuf, cum_nnz), lo=lo) - 1
# Find the next bin1 ID from the combined index
hi = bisect_right(
combined_index,
min(combined_start + bufsize, combined_nnz),
lo=lo
) - 1

if hi == lo:
# number of records to nearest mark exceeds `maxbuf`
# check for oversized chunks afterwards
# This means number of records to nearest mark exceeds `bufsize`.
# Check for oversized chunks afterwards.
hi += 1

breakpoints.append(hi)
cum_offsets.append(cumindex[hi])
bin1_partition.append(hi)
cum_nrecords.append(combined_index[hi])

if cumindex[hi] == cum_nnz:
if combined_index[hi] == combined_nnz:
break

lo = hi
cum_start = cumindex[hi]
combined_start = combined_index[hi]

breakpoints = np.array(breakpoints)
cum_offsets = np.array(cum_offsets)
return breakpoints, cum_offsets
bin1_partition = np.array(bin1_partition)
cum_nrecords = np.array(cum_nrecords)

nrecords_per_epoch = np.diff(cum_nrecords)
n_over = (nrecords_per_epoch > bufsize).sum()
if n_over > 0:
warnings.warn(
f"{n_over} merge epochs will require buffering more than {bufsize} "
f"pixel records, with as many as {nrecords_per_epoch.max():g}."
)

return bin1_partition, cum_nrecords


class CoolerMerger(ContactBinner):
Expand All @@ -117,12 +140,12 @@ class CoolerMerger(ContactBinner):
def __init__(
self,
coolers: list[Cooler],
maxbuf: int,
mergebuf: int,
columns: list[str] | None = None,
agg: dict[str, Any] | None = None
):
self.coolers = list(coolers)
self.maxbuf = maxbuf
self.mergebuf = mergebuf
self.columns = ["count"] if columns is None else columns
self.agg = {col: "sum" for col in self.columns}
if agg is not None:
Expand All @@ -145,18 +168,23 @@ def __init__(
raise ValueError("Coolers must have same bin structure")

def __iter__(self) -> Iterator[dict[str, np.ndarray]]:
indexes = [c._load_dset("indexes/bin1_offset") for c in self.coolers]
breakpoints, cum_offsets = merge_breakpoints(indexes, self.maxbuf)
chunksizes = np.diff(cum_offsets)
if chunksizes.max() > self.maxbuf:
warnings.warn(f"Some merge passes will use more than {self.maxbuf} pixels")
# Load bin1_offset indexes lazily.
indexes = [c.open("r")["indexes/bin1_offset"] for c in self.coolers]

# Calculate the common partition of bin1 offsets that define the epochs
# of merging data.
bin1_partition, cum_nrecords = merge_breakpoints(indexes, self.mergebuf)
nrecords_per_epoch = np.diff(cum_nrecords)
logger.info(f"n_merge_epochs: {len(nrecords_per_epoch)}")
logger.debug(f"bin1_partition: {bin1_partition}")
logger.debug(f"nrecords_per_merge_epoch: {nrecords_per_epoch}")

nnzs = [len(c.pixels()) for c in self.coolers]
logger.info(f"nnzs: {nnzs}")

starts = [0] * len(self.coolers)
for bp in breakpoints[1:]:
stops = [index[bp] for index in indexes]
logger.info(f"current: {stops}")
for bin1_id in bin1_partition[1:]:
stops = [index[bin1_id] for index in indexes]

# extract, concat
combined = pd.concat(
Expand All @@ -178,6 +206,7 @@ def __iter__(self) -> Iterator[dict[str, np.ndarray]]:

yield {k: v.values for k, v in df.items()}

logger.info(f"records consumed: {stops}")
starts = stops


Expand Down Expand Up @@ -265,7 +294,7 @@ def merge_coolers(

bins = clrs[0].bins()[["chrom", "start", "end"]][:]
assembly = clrs[0].info.get("genome-assembly", None)
iterator = CoolerMerger(clrs, maxbuf=mergebuf, columns=columns, agg=agg)
iterator = CoolerMerger(clrs, mergebuf=mergebuf, columns=columns, agg=agg)

create(
output_uri,
Expand Down