-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Behavior of Process.map with chunksize > 1 and a timeout #132
Comments
Hello, the When If one of the elements of the chunk causes the worker process to crash or to hang until the timeout occurs, the whole chunk is lost. The reason for this is simple: crashes and timeouts are handled on the main loop which cannot understand what went wrong within the affected worker process. If you want your logic to pinpoint the offending item within your collection, you can rely on the fact that the returned |
Thanks a lot for your reply! Given that information, shouldn't something like this work to ensure the order and length of the output is kept the same? with ProcessPool(max_workers=num_workers) as pool:
future = pool.map(
computeMetric, inputList, timeout=5, chunksize=10
)
iterator = future.result()
while True:
try:
result = next(iterator)
outputList.append(result)
except StopIteration:
break
except TimeoutError:
timeoutCounter += 1
outputList += [np.nan] * chunksize
except Exception:
errorCounter += 1
outputList += [np.nan] * chunksize When I use this in some of my tests, the output length changes, but shouldn't it be the same if a chunk fails as a whole, and I add |
Hi, Here I have a minimal working example. I just selected random numbers which would time out or produce another error: from pebble import ProcessPool
from concurrent.futures import TimeoutError
import numpy as np
import time
num_workers=1
timeout=1
chunksize=4
def processFunction(inputNumber):
if inputNumber == 23:
time.sleep(5)
elif inputNumber == 42:
raise ValueError
elif inputNumber == 86:
time.sleep(5)
elif inputNumber == 98:
raise ValueError
return inputNumber
timeoutCounter = 0
errorCounter = 0
with ProcessPool(max_workers=num_workers) as pool:
outputList = []
future = pool.map(processFunction, range(100), timeout=timeout, chunksize=chunksize)
iterator = future.result()
while True:
try:
result = next(iterator)
outputList.append(result)
except StopIteration:
break
except TimeoutError:
timeoutCounter += 1
outputList += [np.nan]*chunksize
except Exception:
errorCounter += 1
outputList += [np.nan]*chunksize
if timeoutCounter + errorCounter > 0:
# Calculate total number of np.nan
failedCompoundsTotal = np.count_nonzero(np.isnan(outputList))
print(f"{failedCompoundsTotal} compounds failed in total. {timeoutCounter} chunks (up to {timeoutCounter * chunksize} compounds) timed out and were skipped, {errorCounter} chunks raised an error")
print(len(outputList),outputList) If you run this code example, you'll find that too many except Exception:
errorCounter += 1
outputList += [np.nan] I made these changes in my more specific example, but it seems that somehow still unexpected behaviour is taking place (where the final length has a different length than the input). I'm still investigating this in more detail however. I will try to update if I find anything, otherwise I will just put a check to on the chunksize in the final version of my method. |
Normal errors are returned as the worker can intercept them and pass them back. As I mentioned above, timeout and crashes will lead to the loss of the whole batch. With crash I do not mean a Python exception, I mean an actual crash such as a segmentation fault or a OOM. The following example shows what I've meant. We pass 10 elements with a chunksize of 2:
import os
import time
from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired
def function(value):
if value == 1:
raise RuntimeError('BOOM!')
if value == 3:
time.sleep(5)
if value == 7:
os._exit(1) # Simulate a crash such as a segfault
return value
with ProcessPool(max_workers=1) as pool:
processed = []
future = pool.map(function, range(10), timeout=1, chunksize=2)
iterator = future.result()
while True:
try:
result = next(iterator)
processed.append(result)
except StopIteration:
break
except TimeoutError as error:
processed.append(error)
except ProcessExpired as error:
processed.append(error)
except Exception as error:
processed.append(error)
print(processed) Output:
|
Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
Thanks a lot for the clarification, this makes more sense now! |
I'm a bit confused about the interaction of the process.map function when setting a chunksize larger than one and a timeout.
Here is an example of how my implementation:
I understand the timeout is set for the whole chunk, but let's say you have a chunksize of 10 and a timeout of 5 seconds. The process is running, but gets stuck on the 7th element of the chunk. Are the first 6 elements that came before still processed "normally" (appended to the final list), or are they also included when catching the timeout error for the chunk? If this is the case, is there a way to know how many elements were processed until the timeout triggered? Or how many elements still had to be processed in that chunk? This is important for error handling, since it's important that the results of the output list matches the order (and length) of the input list.
The text was updated successfully, but these errors were encountered: