Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

njit(parallel = True) gives wrong output #9379

Closed
netomenoci opened this issue Dec 26, 2023 · 7 comments · May be fixed by #9397
Closed

njit(parallel = True) gives wrong output #9379

netomenoci opened this issue Dec 26, 2023 · 7 comments · May be fixed by #9397
Labels
needtriage ParallelAccelerator stale Marker label for stale issues.

Comments

@netomenoci
Copy link

netomenoci commented Dec 26, 2023

njit with parallel = True gives the wrong output.

The function _groupby_apply_nb_parallel_dynamic_size partially replicates a dataframe.groupby().apply(func) using numba for speedup. Full minimal example code available below.

The function _groupby_apply_nb_parallel_dynamic_size returns wrong output when parallel is set to true.
I have also noticed that when parallel = True, the output of _groupby_apply_nb_parallel_dynamic_size (in the minimal example below) returns os.cpu_count() unique values, when it should return 1001 unique values. The function works correctly when setting Parallel = False. I have also managed to get the correct output by doing some modifications, such as:
out_cols_size = 1 # Setting this to 1 statically also seems to solve the issue
Even though I don't know why this should fix the issue.

I can't see any data race conditions, thus why I think it is a Numba bug.

OS: Ubuntu 22.04
Python 3.11.7
Numba version: '0.58.1', installed through pip
Numpy '1.26.2'
Pandas : '2.1.4' (it's only used to generate the expected array, nothing else). The version doesn't matter

The below code has similar behavior as pandas.DataFrame.groupby

import os
import numpy as np
import numba
from numba import njit, prange
from numba.typed import List as NumbaList
import pandas as pd


PARALLEL = True # Set it to false so it works with no issues

@njit(cache=False)
def get_max_first_array_of_list_of_arrays(y_x: NumbaList, return_dtype=np.float64):
    """
    This function only uses the first element of y_x as a minimal example
    """
    y = y_x[0]
    ans = np.array([y.max()], dtype=return_dtype)
    return ans


@njit(parallel=PARALLEL, cache=False)
def _groupby_apply_nb_parallel_dynamic_size(
    start_indices: np.array,
    end_indices: np.array,
    # dataargs is a list of numpy 2d arrays
    dataargs: numba.typed.List,
    func: callable,
    funcargs,
    return_dtype=np.float64,
):
    """
    start_indices : 1d np.array with the start indices of each group
    end_indices : 1d np.array with the end indices of each group
    dataargs: List of np 2d arrays. These are arguments of func
    func: jitted function
    funcargs: other arguments to func.
    return_dtype : output array dtype
    
    """

    ngroups = len(start_indices)
    if ngroups == 0:
        raise Exception("empty number of grouped, (start_indices)")

    # First call to determinize the size of the output. Assumes the first group will have output size
    # equal to the output size of all other groups
    res = func(
        [data[start_indices[0] : end_indices[0], ...] for data in dataargs], *funcargs
    )
    out_cols_size = res.shape[0] # Setting this to 1 statically also seems to solve the issue
    result = np.empty(shape=(ngroups, out_cols_size), dtype=return_dtype)

    for k in prange(ngroups):
        start_idx = start_indices[k]
        end_idx = end_indices[k]
        res = func([data[start_idx:end_idx, ...] for data in dataargs], *funcargs)
        result[k, :] = res

    return result


if __name__ == "__main__":

    np.random.seed(42)
    size = 100_000
    qty_intervals = 1_000
    save_dir = os.path.join("random_experiments", "parallel_py")

    samples = np.sort(np.random.choice(range(size), qty_intervals, replace=False))
    start_indices = np.append(0, samples)
    end_indices = np.append(samples, size)

    x = np.zeros(shape = (size)) # This could be any np.array
    y = np.exp(np.cumsum(np.random.normal(0, 1e-2, size)))

    os.makedirs(save_dir, exist_ok=True)
    for i in range(10):
        max_vals = _groupby_apply_nb_parallel_dynamic_size(
            start_indices = start_indices,
            end_indices = end_indices,
            dataargs  = NumbaList([y, x]),
            func = get_max_first_array_of_list_of_arrays,
            funcargs=(),
            return_dtype=np.float64,
        )
        save_path = os.path.join(save_dir, f'output_{i}_.npy')
        np.save(save_path, max_vals)

    groups = np.zeros(size, dtype=np.int64)
    groups[start_indices] = 1
    groups = np.cumsum(groups)
    max_expected = pd.DataFrame(y).groupby(by = groups).max().values

    for i in range(10):
        save_path = os.path.join(save_dir, f'output_{i}_.npy')
        max_vals = np.load(save_path)
        equal =  np.isclose(max_expected, max_vals, equal_nan = True, rtol = 1e-6).all()
        if not equal:
            print(f"ERROR - parallel output {i} is wrong")

    print(f"Number of unique values : {len(np.unique(max_vals))}")
    print(f"Number of unique expected : {len(np.unique(max_expected))}")
@gmarkall
Copy link
Member

Many thanks for the bug report and reproducer - I can reproduce the issue.

I can't see any obvious issue in your code, but I'm not an expert in the parallel functionality so I can't tell if there's also anything that's not expected to work when parallel=True - I'll bring this up in the next triage meeting, but note that it's not until Tuesday January 16th.

@netomenoci
Copy link
Author

netomenoci commented Dec 27, 2023

Thank you so much @gmarkall

I think I have found the reason behind the issue and managed to make it work, even though I think there is a serious problem that needs to be fixed with the hoisting step.

By calling _groupby_apply_nb_parallel_dynamic_size.parallel_diagnostics(level=4), I was able to check that everything seemed to be fine with the serialization step, but it seems there is something wrong with the hoisting step.

This is what it looks like:
(Copying the relevant part of the code below)


                                                                                                         | 
    for k in prange(ngroups):----------------------------------------------------------------------------| #1
        start_idx = start_indices[k]                                                                     | 
        end_idx = end_indices[k]                                                                         | 
        res = func([data[start_idx:end_idx, ...] for data in dataargs], *funcargs)                       | 
        result[k, :] = res-------------------------------------------------------------------------------| #0
                                                                                                         | 
    return result       

 ---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found

Instruction hoisting:
loop #1:
  Has the following hoisted:
    closure__locals___listcomp__v6__v20build_slice_6 = global(slice: <class 'slice'>)
    closure__locals___listcomp__v6__vconst22_8 = const(ellipsis, Ellipsis)
    $_list_extend_gv_tuple.1 = global(tuple: <class 'tuple'>)
    $const318.27 = const(NoneType, None)
    $const320.28 = const(NoneType, None)
    $322build_slice.29 = global(slice: <class 'slice'>)
    $322build_slice.30 = call $322build_slice.29($const318.27, $const320.28, func=$322build_slice.29, args=(Var($const318.27, numba_issue.py:57), Var($const320.28, numba_issue.py:57)), kws=(), vararg=None, varkwarg=None, target=None)
    msg.25 = const(str, Sizes of result, res do not match on /home/numba_issue.py (57))
    assert.26 = global(assert_equiv: <intrinsic assert_equiv>)
    closure__locals___listcomp__v6__v6build_list_0 = build_list(items=[])
  Failed to hoist the following:
....
....

It seems that it has a lot of things being hoisted, in particular:
closure__locals___listcomp__v6__v6build_list_0 = build_list(items=[])Which looks like the list comprehension I have in [data[start_idx:end_idx, ...]

If it indeed is, then that should be the issue, as this obviously cannot be hoisted outside of the loop since the indices are defined by the prange variable.

I went further and tried to make the dependence even more explicit to help the compiler. I have then replaced

    for k in prange(ngroups):
        start_idx = start_indices[k]
        end_idx = end_indices[k]
        res = func([data[start_idx:end_idx, ...] for data in dataargs], *funcargs)
        result[k, :] = res
        

by the below one liner

for k in prange(ngroups):
        result[k, :] = func([data[start_indices[k]:end_indices[k], ...] for data in dataargs], *funcargs)

This is what the hoisting diagnosis looks like after this change:


---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found

Instruction hoisting:
loop #1:
  Has the following hoisted:
    closure__locals___listcomp__v6__v44build_slice_10 = global(slice: <class 'slice'>)
    closure__locals___listcomp__v6__vconst46_12 = const(ellipsis, Ellipsis)
    $_list_extend_gv_tuple.1 = global(tuple: <class 'tuple'>)
    $const282.21 = const(NoneType, None)
    $const284.22 = const(NoneType, None)
    $286build_slice.23 = global(slice: <class 'slice'>)
    $286build_slice.24 = call $286build_slice.23($const282.21, $const284.22, func=$286build_slice.23, args=(Var($const282.21, numba_issue.py:54), Var($const284.22, numba_issue.py:54)), kws=(), vararg=None, varkwarg=None, target=None)
    msg.25 = const(str, Sizes of result, $276call_function_ex.18 do not match on /home/numba_issue.py (54))
    assert.26 = global(assert_equiv: <intrinsic assert_equiv>)
  Failed to hoist the following:
  ....
  ....

Ie, there a single difference: the line below disappeared from the hoisting.
closure__locals___listcomp__v6__v6build_list_0 = build_list(items=[])
After that, the code works both with PARALLEL = True and PARALLEL = False

I am now afraid of using the parallel optimization from Numba, as it seems it cannot be trusted to perform the optimizations. I will be happy to hear back from you guys on what could be the cause of the issue, so I can feel safer to continue using it after understand in detail what might be going wrong. I would also be happy to be part of the PR to solve the issue.

Best regards,
Neto

@netomenoci
Copy link
Author

I have found another similar issue, so I will just add it here rather than creating a separate new one.

Replacing

for k in prange(ngroups):
        result[k, :] = func([data[start_indices[k]:end_indices[k], ...] for data in dataargs], *funcargs)

with

for k in prange(ngroups):
        result[k, ...] = func([data[start_indices[k]:end_indices[k], ...] for data in dataargs], *funcargs)

ie, using Ellipsis instead of ":" , results in the same issue of Parallel giving wrong results (not only that, but also a number of unique results equal to the number of cpus in the machine).

It is not possible to reproduce it with the above example (sorry). I was unable to create a minimum example for the Ellipsis issue, as it only happens when I run multiple tests at the same time (using pytest) and the issue doesn't happen when running the function in isolation.

I have captured though the diagnostic on the function, and as you can see, it also seems that the result[k, ...] is being hoisted out of the prange loop, which is wrong.


Parallel loop listing for  Function _groupby_apply_nb_parallel_dynamic_size
----------------------------------------------------------------------------------------------------|loop #ID
@njit(parallel=PARALLEL_FAST_GROUPBY, cache=False)                                                  | 
def _groupby_apply_nb_parallel_dynamic_size(                                                                   | 
    start_indices: np.array,                                                                        | 
    end_indices: np.array,                                                                          | 
    dataargs: numba.typed.List,                                                                     | 
    func: callable,                                                                                 | 
    funcargs,                                                                                       | 
    return_dtype=np.float64,                                                                        | 
):                                                                                                  | 
    """                                                                                             | 
    start_indices : 1d np.array with the start indices of each group                                | 
    end_indices : 1d np.array with the end indices of each group                                    | 
    dataargs: List of np 2d arrays. These are arguments of func                                     | 
    func: jitted function                                                                           | 
    funcargs: other arguments to func.                                                              | 
    return_dtype : output array dtype                                                               | 
    """                                                                                             | 
                                                                                                    | 
    ngroups = len(start_indices)                                                                    | 
    if ngroups == 0:                                                                                | 
        raise Exception("empty number of grouped, (start_indices)")                                 | 
                                                                                                    | 
    # First call to determinize the size of the output.                                             | 
    # Assumes the first group will have output size equal to the output size of all other groups    | 
    out_cols_size = (                                                                               | 
        func(                                                                                       | 
            [data[start_indices[0] : end_indices[0], ...] for data in dataargs],                    | 
            *funcargs,                                                                              | 
        )                                                                                           | 
    ).shape[0]                                                                                      | 
    result = np.empty(shape=(ngroups, out_cols_size), dtype=return_dtype)                           | 
                                                                                                    | 
    for k in prange(ngroups):-----------------------------------------------------------------------| #10
        result[k, ...] = func(                                                                      | 
            [data[start_indices[k] : end_indices[k], ...] for data in dataargs],                    | 
            *funcargs,                                                                              | 
        )                                                                                           | 
                                                                                                    | 
    return result                                                                                   | 
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
----------------------------- Before Optimisation ------------------------------
--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel structure is already optimal.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
 
---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found

Instruction hoisting:
loop #10:
  Has the following hoisted:
    closure__locals___listcomp__v85__v44build_slice_10 = global(slice: <class 'slice'>)
    closure__locals___listcomp__v85__vconst46_12 = const(ellipsis, Ellipsis)
    $_list_extend_gv_tuple.1 = global(tuple: <class 'tuple'>)
    $const278.21 = const(ellipsis, Ellipsis)
  Failed to hoist the following:
    not pure: $closure__locals___listcomp__v85_implicit0.497 = getiter(value=dataargs)
    dependency: closure__locals___listcomp__v85__v20binary_subscr_6 = static_getitem(value=start__indices, index=None, index_var=$parfor__index_493.512, fn=<built-in function getitem>)
    dependency: closure__locals___listcomp__v85__v34binary_subscr_9 = static_getitem(value=end__indices, index=None, index_var=$parfor__index_493.512, fn=<built-in function getitem>)
    dependency: closure__locals___listcomp__v85__v44build_slice_11 = call $push_global_to_block.510(closure__locals___listcomp__v85__v20binary_subscr_6, closure__locals___listcomp__v85__v34binary_subscr_9, func=$push_global_to_block.510, args=(Var(closure__locals___listcomp__v85__v20binary_subscr_6, fastapply.py:239), Var(closure__locals___listcomp__v85__v34binary_subscr_9, fastapply.py:239)), kws=(), vararg=None, varkwarg=None, target=None)
    dependency: closure__locals___listcomp__v85__v48build_tuple_13 = build_tuple(items=[Var(closure__locals___listcomp__v85__v44build_slice_11, fastapply.py:239), Var(closure__locals___listcomp__v85__vconst46_12, fastapply.py:239)])
    dependency: closure__locals___listcomp__v85__v60list_append_15 = getattr(value=closure__locals___listcomp__v85__v6build_list_0, attr=append)
    dependency: closure__locals___listcomp__v85__v60list_append_16 = call closure__locals___listcomp__v85__v60list_append_15(closure__locals___listcomp__v85__v50binary_subscr_14, func=closure__locals___listcomp__v85__v60list_append_15, args=(Var(closure__locals___listcomp__v85__v50binary_subscr_14, fastapply.py:239),), kws=(), vararg=None, varkwarg=None, target=None)
    dependency: $264build_list.13 = build_tuple(items=[Var(closure__locals___listcomp__v85__v6build_list_0, fastapply.py:239)])
    dependency: $268list_extend.15_var_funcargs = call $push_global_to_block.511(funcargs, func=$push_global_to_block.511, args=(Var(funcargs, fastapply.py:205),), kws=(), vararg=None, varkwarg=None, target=None)
    dependency: $268list_extend.16 = $264build_list.13 + $268list_extend.15_var_funcargs
    stored array: $272call_function_ex.18 = call func(*$268list_extend.16, func=func, args=[], kws=[], vararg=$268list_extend.16, varkwarg=None, target=None)
    dependency: $280build_tuple.22 = build_tuple(items=[Var($parfor__index_493.512, <string>:3), Var($const278.21, fastapply.py:238)])
    dependency: closure__locals___listcomp__v85__v10for_iter_2 = iternext(value=$closure__locals___listcomp__v85_implicit0.497)
    dependency: closure__locals___listcomp__v85__v10for_iter_3 = pair_first(value=closure__locals___listcomp__v85__v10for_iter_2)
    dependency: closure__locals___listcomp__v85__v10for_iter_4 = pair_second(value=closure__locals___listcomp__v85__v10for_iter_2)
--------------------------------------------------------------------------------

@gmarkall
Copy link
Member

@DrTodd13 Do you have any thoughts on this issue?

Copy link

This issue is marked as stale as it has had no activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with any updates and confirm that this issue still needs to be addressed.

@github-actions github-actions bot added the stale Marker label for stale issues. label Feb 16, 2024
@netomenoci
Copy link
Author

This issue shouldn't be closed. The pr #9397 is still waiting for review

@github-actions github-actions bot removed the stale Marker label for stale issues. label Feb 20, 2024
Copy link

This issue is marked as stale as it has had no activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with any updates and confirm that this issue still needs to be addressed.

@github-actions github-actions bot added the stale Marker label for stale issues. label Mar 21, 2024
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needtriage ParallelAccelerator stale Marker label for stale issues.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants