Skip to content

Commit

Permalink
- moved shared memory functions to shared_memory.py
Browse files Browse the repository at this point in the history
- ordered_enqueuer_cf.py: improved exception handling (retry in main thread)
  • Loading branch information
jeanollion committed Apr 26, 2024
1 parent 47c9149 commit 948284c
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 114 deletions.
9 changes: 8 additions & 1 deletion dataset_iterator/concat_iterator.py
Expand Up @@ -101,16 +101,23 @@ def set_allowed_indexes(self, indexes):
raise NotImplementedError("Not supported yet")

def close(self):
self._close_datasetIO()
for it in self.iterators:
it.close()

def _close_datasetIO(self):
for it in self.iterators:
it._close_datasetIO()


def _open_datasetIO(self):
for it in self.iterators:
it._open_datasetIO()


def open(self):
for it in self.iterators:
it.open()

def disable_random_transforms(self, data_augmentation:bool=True, channels_postprocessing:bool=False):
return [it.disable_random_transforms(data_augmentation, channels_postprocessing) for it in self.iterators]

Expand Down
4 changes: 4 additions & 0 deletions dataset_iterator/hard_sample_mining.py
Expand Up @@ -45,7 +45,11 @@ def on_epoch_begin(self, epoch, logs=None):
def on_epoch_end(self, epoch, logs=None):
if self.period==1 or (epoch + 1 + self.start_epoch) % self.period == 0:
if (epoch > 0 or not self.skip_first) and epoch + self.start_epoch >= self.start_from_epoch:
self.target_iterator.close()
self.iterator.open()
metrics = self.compute_metrics()
self.iterator.close()
self.target_iterator.open()
first = self.proba_per_metric is None
self.proba_per_metric = get_index_probability(metrics, enrich_factor=self.enrich_factor, quantile_max=self.quantile_max, quantile_min=self.quantile_min, verbose=self.verbose)
self.n_metrics = self.proba_per_metric.shape[0] if len(self.proba_per_metric.shape) == 2 else 1
Expand Down
9 changes: 9 additions & 0 deletions dataset_iterator/index_array_iterator.py
Expand Up @@ -45,6 +45,15 @@ def get_batch_size(self):
def set_index_probability(self, value):
self.index_probability = value


def open(self):
pass


def close(self):
pass


def __len__(self):
if self.step_number > 0:
return self.step_number
Expand Down
14 changes: 9 additions & 5 deletions dataset_iterator/multichannel_iterator.py
Expand Up @@ -228,7 +228,7 @@ def __init__(self,
self.return_image_index=return_image_index
self._open_datasetIO()
# check that all ds have compatible length between input and output
indexes = np.array([ds.shape[0] for ds in self.ds_array[0]])
indexes = np.array([len(ds) for ds in self.ds_array[0]])
if len(channel_keywords)>1:
for c, ds_l in enumerate(self.ds_array):
if self.channel_keywords[c] is not None:
Expand All @@ -237,17 +237,17 @@ def __init__(self,
raise ValueError('Channels {}({}) has #{} datasets whereas first channel has #{} datasets'.format(c, channel_keywords[c], len(ds_l), len(self.ds_array[0])))
for ds_idx, ds in enumerate(ds_l):
if singleton:
if ds.shape[0]!=1:
if len(ds)!=1:
raise ValueError("Channel {} is set as singleton but one dataset has more that one image".format(c))
elif indexes[ds_idx] != ds.shape[0]:
elif indexes[ds_idx] != len(ds):
raise ValueError('Channel {}({}) has at least one dataset with number of elements that differ from Channel 0'.format(c, channel_keywords[c]))
if len(array_keywords)>1: # check that all array ds have compatible length
for c, ds_l in enumerate(self.ads_array):
if self.array_keywords[c] is not None:
if len(self.ds_array[0])!=len(ds_l):
raise ValueError('Array {}({}) has #{} datasets whereas first channel has #{} datasets'.format(c, channel_keywords[c], len(ds_l), len(self.ds_array[0])))
for ds_idx, ds in enumerate(ds_l):
if indexes[ds_idx] != ds.shape[0]:
if indexes[ds_idx] != len(ds):
raise ValueError('Array {}({}) has at least one dataset with number of elements that differ from Channel 0'.format(c, channel_keywords[c]))
# get offset for each dataset
for i in range(1, len(indexes)):
Expand Down Expand Up @@ -281,7 +281,7 @@ def __init__(self,
self.labels[i] = np.char.asarray(ds[()].astype('unicode')) # todo: check if necessary to convert to char array ? unicode is necessary
if len(self.labels)!=len(self.ds_array[0]):
raise ValueError('Invalid input file: number of label array differ from dataset number')
if any(len(self.labels[i].shape)==0 or self.labels[i].shape[0]!=self.ds_array[0][i].shape[0] for i in range(len(self.labels))):
if any(len(self.labels[i].shape)==0 or len(self.labels[i]) != len(self.ds_array[0][i]) for i in range(len(self.labels))):
raise ValueError('Invalid input file: at least one dataset has element numbers that differ from corresponding label array')
except:
self.labels = None
Expand Down Expand Up @@ -321,6 +321,10 @@ def _open_datasetIO(self):
self.ds_scaling_center = [[getAttribute(self.datasetIO.get_attribute(self._get_dataset_path(c, ds_idx), "scaling_center"), 0) for ds_idx in range(len(self.paths))] if self.channel_keywords[c] is not None else None for c in range(len(self.channel_keywords))]
self.ds_scaling_factor = [[getAttribute(self.datasetIO.get_attribute(self._get_dataset_path(c, ds_idx), "scaling_factor"), 1) for ds_idx in range(len(self.paths))] if self.channel_keywords[c] is not None else None for c in range(len(self.channel_keywords))]

def open(self):
self._open_datasetIO()


def _close_datasetIO(self):
if self.datasetIO is not None:
self.datasetIO.close()
Expand Down
127 changes: 19 additions & 108 deletions dataset_iterator/ordered_enqueuer_cf.py
@@ -1,13 +1,13 @@
import os
import traceback
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import queue
import random
import numpy as np
import threading
import time
from multiprocessing import managers, shared_memory
from multiprocessing import managers
from threading import BoundedSemaphore
from .shared_memory import to_shm, from_shm

# adapted from https://github.com/keras-team/keras/blob/v2.13.1/keras/utils/data_utils.py#L651-L776
# uses concurrent.futures, solves a memory leak in case of hard sample mining run as callback with regular orderedEnqueur. Option to pass tensors through shared memory
Expand Down Expand Up @@ -97,18 +97,20 @@ def _run(self):
random.shuffle(sequence)
task = get_item_shm if self.use_shm else get_item
executor = ProcessPoolExecutor(max_workers=self.workers, initializer=init_pool_generator, initargs=(self.sequence, self.uid, self.shm_manager))
#print(f"executor started", flush=True)
for idx, i in enumerate(sequence):
if self.stop_signal.is_set():
return
self.semaphore.acquire()
future = executor.submit(task, self.uid, i)
self.queue.append((future, i))
#print(f"sumit task: {i} {idx+1}/{len(sequence)}")
# Done with the current epoch, waiting for the final batches
self._wait_queue(True) # safer to wait before calling shutdown than calling directly shutdown with wait=True
print("exiting from ProcessPoolExecutor...", flush=True)
#print("exiting from ProcessPoolExecutor...", flush=True)
time.sleep(0.1)
executor.shutdown(wait=False, cancel_futures=True)
print("exiting from ProcessPoolExecutor done", flush=True)
#print("exiting from ProcessPoolExecutor done", flush=True)
if self.stop_signal.is_set() or self.single_epoch:
# We're done
return
Expand Down Expand Up @@ -140,19 +142,20 @@ def get(self):
self._wait_queue(False)
if len(self.queue) > 0:
future, i = self.queue[0]
try:
#print(f"processing task: {i}")
ex = future.exception()
if ex is None:
inputs = future.result()
self.queue.pop(0) # only remove after result() is called to avoid terminating pool while a process is still running
if self.use_shm:
inputs = from_shm(*inputs)
self.semaphore.release() # release is done here and not as a future callback to limit effective number of samples in memory
except Exception as e:
self.stop()
print(f"Exception raised while getting future result from task: {i}", flush=True)
raise e
finally:
future.cancel()
del future
else:
traceback.print_exception(ex)
print(f"Exception raised while getting future result from task: {i}. Task will be re-computed.", flush=True)
inputs = get_item(self.uid, i)
self.queue.pop(0) # only remove after result() is called to avoid terminating pool while a process is still running
self.semaphore.release() # release is done here and not as a future callback to limit effective number of samples in memory
future.cancel()
del future
yield inputs

def stop(self, timeout=None):
Expand All @@ -165,7 +168,7 @@ def stop(self, timeout=None):
"""
self.stop_signal.set()
self.run_thread.join(timeout)
if self.use_shm is not None:
if self.shm_manager is not None:
self.shm_manager.shutdown()
self.shm_manager.join()
self.queue = None
Expand All @@ -191,95 +194,3 @@ def get_item_shm(uid, i):
def get_item(uid, i):
return _SHARED_SEQUENCES[uid][i]


def to_shm(shm_manager, tensors):
flatten_tensor_list, nested_structure = get_flatten_list(tensors)
size = np.sum([a.nbytes for a in flatten_tensor_list])
shm = shm_manager.SharedMemory(size=size)
shapes = []
dtypes = []
offset = 0
for a in flatten_tensor_list:
shm_a = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf, offset=offset)
shm_a[:] = a[:]
shapes.append(a.shape)
dtypes.append(a.dtype)
offset += a.nbytes
tensor_ref = (shapes, dtypes, shm.name, nested_structure)
shm.close()
del shm
return tensor_ref


def from_shm(shapes, dtypes, shm_name, nested_structure):
existing_shm = ErasingSharedMemory(shm_name)
offset = 0
tensor_list = []
for shape, dtype in zip(shapes, dtypes):
a = ShmArray(shape, dtype=dtype, buffer=existing_shm.buf, offset=offset, shm=existing_shm)
tensor_list.append(a)
offset += a.nbytes
return get_nested_structure(tensor_list, nested_structure)


def multiple(item):
return isinstance(item, (list, tuple))


def get_flatten_list(item):
flatten_list = []
nested_structure = []
_flatten(item, 0, flatten_list, nested_structure)
return flatten_list, nested_structure[0]


def _flatten(item, offset, flatten_list, nested_structure):
if multiple(item):
nested_structure.append([])
for sub_item in item:
offset = _flatten(sub_item, offset, flatten_list, nested_structure[-1])
return offset
else:
nested_structure.append(offset)
flatten_list.append(item)
return offset + 1


def get_nested_structure(flatten_list, nested_structure):
if multiple(nested_structure):
result = []
_get_nested(flatten_list, nested_structure, 0, result)
return result[0]
else:
return flatten_list[0]


def _get_nested(flatten_list, nested_structure, offset, result):
if multiple(nested_structure):
result.append([])
for sub_nested in nested_structure:
offset = _get_nested(flatten_list, sub_nested, offset, result[-1])
return offset
else:
result.append(flatten_list[offset])
return offset + 1


# code from: https://muditb.medium.com/speed-up-your-keras-sequence-pipeline-f5d158359f46
class ShmArray(np.ndarray):
def __new__(cls, shape, dtype=float, buffer=None, offset=0, strides=None, order=None, shm=None):
obj = super(ShmArray, cls).__new__(cls, shape, dtype, buffer, offset, strides, order)
obj.shm = shm
return obj

def __array_finalize__(self, obj):
if obj is None: return
self.shm = getattr(obj, 'shm', None)

class ErasingSharedMemory(shared_memory.SharedMemory):
def __del__(self):
super(ErasingSharedMemory, self).__del__()
try:
self.unlink() # manager can delete the file before array is finalized
except FileNotFoundError:
pass
109 changes: 109 additions & 0 deletions dataset_iterator/shared_memory.py
@@ -0,0 +1,109 @@
import numpy as np
from multiprocessing import shared_memory


def to_shm(shm_manager, tensors):
flatten_tensor_list, nested_structure = get_flatten_list(tensors)
size = np.sum([a.nbytes for a in flatten_tensor_list])
shm = shm_manager.SharedMemory(size=size) if shm_manager is not None else shared_memory.SharedMemory(create=True, size=size)
shapes = []
dtypes = []
offset = 0
for a in flatten_tensor_list:
shm_a = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf, offset=offset)
shm_a[:] = a[:]
shapes.append(a.shape)
dtypes.append(a.dtype)
offset += a.nbytes
tensor_ref = (shapes, dtypes, shm.name, nested_structure)
shm.close()
del shm
return tensor_ref


def from_shm(shapes, dtypes, shm_name, nested_structure):
existing_shm = ErasingSharedMemory(shm_name)
offset = 0
tensor_list = []
for shape, dtype in zip(shapes, dtypes):
a = ShmArray(shape, dtype=dtype, buffer=existing_shm.buf, offset=offset, shm=existing_shm)
tensor_list.append(a)
offset += a.nbytes
return get_nested_structure(tensor_list, nested_structure)


def get_idx_from_shm(idx, shapes, dtypes, shm_name, array_idx=0):
existing_shm = shared_memory.SharedMemory(shm_name)
offset = 0
for i in range(0, array_idx):
offset += np.prod(shapes[i]) * dtypes[i].itemsize
shape = shapes[array_idx][1:] if len(shapes[array_idx]) > 1 else (1,)
offset += idx * np.prod(shape) * dtypes[array_idx].itemsize
array = np.copy(np.ndarray(shape, dtype=dtypes[array_idx], buffer=existing_shm.buf, offset=offset))
existing_shm.close()
del existing_shm
return array


def multiple(item):
return isinstance(item, (list, tuple))


def get_flatten_list(item):
flatten_list = []
nested_structure = []
_flatten(item, 0, flatten_list, nested_structure)
return flatten_list, nested_structure[0]


def _flatten(item, offset, flatten_list, nested_structure):
if multiple(item):
nested_structure.append([])
for sub_item in item:
offset = _flatten(sub_item, offset, flatten_list, nested_structure[-1])
return offset
else:
nested_structure.append(offset)
flatten_list.append(item)
return offset + 1


def get_nested_structure(flatten_list, nested_structure):
if multiple(nested_structure):
result = []
_get_nested(flatten_list, nested_structure, 0, result)
return result[0]
else:
return flatten_list[0]


def _get_nested(flatten_list, nested_structure, offset, result):
if multiple(nested_structure):
result.append([])
for sub_nested in nested_structure:
offset = _get_nested(flatten_list, sub_nested, offset, result[-1])
return offset
else:
result.append(flatten_list[offset])
return offset + 1


# code from: https://muditb.medium.com/speed-up-your-keras-sequence-pipeline-f5d158359f46
class ShmArray(np.ndarray):
def __new__(cls, shape, dtype=float, buffer=None, offset=0, strides=None, order=None, shm=None):
obj = super(ShmArray, cls).__new__(cls, shape, dtype, buffer, offset, strides, order)
obj.shm = shm
return obj

def __array_finalize__(self, obj):
if obj is None:
return
self.shm = getattr(obj, 'shm', None)

class ErasingSharedMemory(shared_memory.SharedMemory):
def __del__(self):
super(ErasingSharedMemory, self).__del__()
try:
self.unlink()
except FileNotFoundError: # manager can delete the file before array is finalized
pass

0 comments on commit 948284c

Please sign in to comment.