Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior of Process.map with chunksize > 1 and a timeout #132

Closed
Jnelen opened this issue Apr 11, 2024 · 5 comments
Closed

Behavior of Process.map with chunksize > 1 and a timeout #132

Jnelen opened this issue Apr 11, 2024 · 5 comments

Comments

@Jnelen
Copy link

Jnelen commented Apr 11, 2024

I'm a bit confused about the interaction of the process.map function when setting a chunksize larger than one and a timeout.
Here is an example of how my implementation:

with ProcessPool(max_workers=num_workers) as pool:

   future = pool.map(
       computeMetric, inputList, timeout=5, chunksize=10
   )
   iterator = future.result()

   while True:
       try:
           result = next(iterator)
           outputList.append(result)
       except StopIteration:
           break
       except TimeoutError:
           timeoutCounter += 1
           outputList += [np.nan] * chunksize
       except Exception:
           errorCounter += 1
           outputList.append(np.nan)

I understand the timeout is set for the whole chunk, but let's say you have a chunksize of 10 and a timeout of 5 seconds. The process is running, but gets stuck on the 7th element of the chunk. Are the first 6 elements that came before still processed "normally" (appended to the final list), or are they also included when catching the timeout error for the chunk? If this is the case, is there a way to know how many elements were processed until the timeout triggered? Or how many elements still had to be processed in that chunk? This is important for error handling, since it's important that the results of the output list matches the order (and length) of the input list.

@noxdafox
Copy link
Owner

Hello,

the chunksize parameter is intended to reduce the IPC overhead when dealing with large datasets. This is particularly useful if the dataset is consisting of lots of small elements (ex: array of float). Sending them one-by-one would become a major bottleneck.

When chunksize is greater than 1, the worker process will process the allotted chunk altogether. In practice, this means it will receive the chunk, process it all and return it back to the main process. Thus minimizing the overhead caused by IPC.

If one of the elements of the chunk causes the worker process to crash or to hang until the timeout occurs, the whole chunk is lost. The reason for this is simple: crashes and timeouts are handled on the main loop which cannot understand what went wrong within the affected worker process.

If you want your logic to pinpoint the offending item within your collection, you can rely on the fact that the returned ProcessMapFuture yields the chunks in the order of submission. Hence, you can re-sumbit the offending chunk(s) with a chunksize of 1 and identify the problematic item in your collection.

@Jnelen
Copy link
Author

Jnelen commented Apr 18, 2024

Thanks a lot for your reply! Given that information, shouldn't something like this work to ensure the order and length of the output is kept the same?

with ProcessPool(max_workers=num_workers) as pool:

   future = pool.map(
       computeMetric, inputList, timeout=5, chunksize=10
   )
   iterator = future.result()

   while True:
       try:
           result = next(iterator)
           outputList.append(result)
       except StopIteration:
           break
       except TimeoutError:
           timeoutCounter += 1
           outputList += [np.nan] * chunksize
       except Exception:
           errorCounter += 1
           outputList += [np.nan] * chunksize

When I use this in some of my tests, the output length changes, but shouldn't it be the same if a chunk fails as a whole, and I add [np.nan]*chunksize to my final output list? Maybe something in my implementation is wrong?
In my specific case, my computeMetric function is from the spyrmsd package. However I will try to make an easier workable example.

@Jnelen
Copy link
Author

Jnelen commented Apr 21, 2024

Hi,

Here I have a minimal working example. I just selected random numbers which would time out or produce another error:

from pebble import ProcessPool
from concurrent.futures import TimeoutError
import numpy as np
import time

num_workers=1
timeout=1
chunksize=4

def processFunction(inputNumber):
	if inputNumber == 23:
		time.sleep(5)
	elif inputNumber == 42:
		raise ValueError
	elif inputNumber == 86:
		time.sleep(5)
	elif inputNumber == 98:
		raise ValueError
	return inputNumber

timeoutCounter = 0
errorCounter = 0

with ProcessPool(max_workers=num_workers) as pool:
	outputList = []

	future = pool.map(processFunction, range(100), timeout=timeout, chunksize=chunksize)

	iterator = future.result()

	while True:
		try:
			result = next(iterator)
			outputList.append(result)
		except StopIteration:
			break
		except TimeoutError:
			timeoutCounter += 1
			outputList += [np.nan]*chunksize
		except Exception:
			errorCounter += 1
			outputList += [np.nan]*chunksize
	if timeoutCounter + errorCounter > 0:
		# Calculate total number of np.nan
		failedCompoundsTotal = np.count_nonzero(np.isnan(outputList))	
		print(f"{failedCompoundsTotal} compounds failed in total. {timeoutCounter} chunks (up to {timeoutCounter * chunksize} compounds) timed out and were skipped, {errorCounter} chunks raised an error")

print(len(outputList),outputList)

If you run this code example, you'll find that too many np.nan are inserted when a non-timeout error happens. So it does seem to me that if a single (non-timeout) error happens in a chunk, the rest can somehow still be processed correctly? A timeout error however does make the whole chunk fail, no matter what did or didn't process successfully. It seems like when a regular error occurs, it's better to only 1 np.nan instead:

		except Exception:
			errorCounter += 1
			outputList += [np.nan]

I made these changes in my more specific example, but it seems that somehow still unexpected behaviour is taking place (where the final length has a different length than the input). I'm still investigating this in more detail however. I will try to update if I find anything, otherwise I will just put a check to on the chunksize in the final version of my method.

@noxdafox
Copy link
Owner

noxdafox commented May 10, 2024

Normal errors are returned as the worker can intercept them and pass them back. As I mentioned above, timeout and crashes will lead to the loss of the whole batch. With crash I do not mean a Python exception, I mean an actual crash such as a segmentation fault or a OOM.

The following example shows what I've meant.

We pass 10 elements with a chunksize of 2:

  • If element is equal to 1, we raise an exception. In this case the whole batch is processed successfully and element 1 will be replaced with the raised exception.
  • If element is equal to 3, we simulate a timeout. In this case, the whole batch is lost and replaced by a TimeoutError.
  • If element is equal to 7, we simulate a crash similar to a segmentation fault. These errors are rare and typically show up if using faulty C libraries or if we run out of memory. The whole batch is lost again and a ProcessExpired error is raised.
import os
import time
from concurrent.futures import TimeoutError

from pebble import ProcessPool, ProcessExpired


def function(value):
    if value == 1:
        raise RuntimeError('BOOM!')
    if value == 3:
        time.sleep(5)
    if value == 7:
        os._exit(1)  # Simulate a crash such as a segfault

    return value


with ProcessPool(max_workers=1) as pool:
    processed = []

    future = pool.map(function, range(10), timeout=1, chunksize=2)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
            processed.append(result)
        except StopIteration:
            break
        except TimeoutError as error:
            processed.append(error)
        except ProcessExpired as error:
            processed.append(error)
        except Exception as error:
            processed.append(error)

    print(processed)

Output:

[0, RuntimeError('BOOM!'), TimeoutError(), 4, 5, ProcessExpired('Abnormal termination'), 8, 9]

noxdafox added a commit that referenced this issue May 12, 2024
Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
@Jnelen
Copy link
Author

Jnelen commented May 13, 2024

Thanks a lot for the clarification, this makes more sense now!

@Jnelen Jnelen closed this as completed May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants