Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BrokenPipeError: [Errno 32] when using custom loss_fn #144

Open
christophelebrun opened this issue Nov 13, 2019 · 5 comments
Open

BrokenPipeError: [Errno 32] when using custom loss_fn #144

christophelebrun opened this issue Nov 13, 2019 · 5 comments

Comments

@christophelebrun
Copy link

christophelebrun commented Nov 13, 2019

def f1_loss(target, pred):
return -f1_score(target, pred)

Instantiate a HyperoptEstimator with the search space and number of evaluations

estim = HyperoptEstimator(classifier=clf,
preprocessing=any_preprocessing('my_pre'),
algo=tpe.suggest,
max_evals=10,
trial_timeout=120,
loss_fn=f1_loss,
)

Search the hyperparameter space based on the data

estim.fit(X_train, y_train)

Get this error :

BrokenPipeError Traceback (most recent call last)
in
55
56 # Search the hyperparameter space based on the data
---> 57 estim.fit(X_train, y_train)
[…]
BrokenPipeError: [Errno 32] Broken pipe

@christophelebrun christophelebrun changed the title BrokenPipeError: [Errno 32] Broken pipe when using custom_loss BrokenPipeError: [Errno 32] when using custom loss_fn Nov 13, 2019
@bjkomer
Copy link
Member

bjkomer commented Nov 13, 2019

I haven't been able to reproduce this error. Can you provide more info on the classifier you are using and the type of data? Is it correct to assume the f1_score you are using is the one built in to sklearn? This score by default assumes binary classification and will throw an error on multiclass data, though the error is not a broken pipe, but instead

ValueError: Target is multiclass but average='binary'. Please choose another average setting.

This can be fixed by setting average="weighted" or some other value that supports multiclass. I'm guessing your issue is something else though, but may still be dependent on some incompatibility between the type of data being used, the classifier chosen, and the loss function.

Here is a simple full script that I made to try to reproduce it. I get no error on my end when I run this, is that the same for you? What do you need to change to see the broken pipe?

from hpsklearn import HyperoptEstimator, any_preprocessing, any_classifier
from hyperopt import tpe
from sklearn.metrics import f1_score

# picking an easily available dataset for binary classification
from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()

X_train = data.data
y_train = data.target

def f1_loss(target, pred):
    return -f1_score(target, pred)

clf = any_classifier('clf')

estim = HyperoptEstimator(classifier=clf,
    preprocessing=any_preprocessing('my_pre'),
    algo=tpe.suggest,
    max_evals=10,
    trial_timeout=120,
    loss_fn=f1_loss,
)

estim.fit(X_train, y_train)
print(estim.score(X_train, y_train))
print(estim.best_model())

@christophelebrun
Copy link
Author

Hi ! Thank you for your answer.
Running your code, I get the same exact error as previously (BrokenPipeError).

My configuration : Windows10, Anaconda, Python 3.7.5, JupyterLab 1.1.4, scikit-learn 0.21.3

Here is the full Traceback :

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
<ipython-input-1-2f2c9e34ed50> in <module>
     23 )
     24 
---> 25 estim.fit(X_train, y_train)
     26 print(estim.score(X_train, y_train))
     27 print(estim.best_model())

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fit(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights)
    781                 increment = min(self.fit_increment,
    782                                 adjusted_max_evals - len(self.trials.trials))
--> 783                 fit_iter.send(increment)
    784                 if filename is not None:
    785                     with open(filename, 'wb') as dump_file:

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fit_iter(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights, increment)
    691                               #    so we notice them.
    692                               catch_eval_exceptions=False,
--> 693                               return_argmin=False, # -- in case no success so far
    694                              )
    695             else:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar)
    401             catch_eval_exceptions=catch_eval_exceptions,
    402             return_argmin=return_argmin,
--> 403             show_progressbar=show_progressbar,
    404         )
    405 

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\base.py in fmin(self, fn, space, algo, max_evals, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar)
    649             catch_eval_exceptions=catch_eval_exceptions,
    650             return_argmin=return_argmin,
--> 651             show_progressbar=show_progressbar)
    652 
    653 

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar)
    420                     show_progressbar=show_progressbar)
    421     rval.catch_eval_exceptions = catch_eval_exceptions
--> 422     rval.exhaust()
    423     if return_argmin:
    424         if len(trials.trials) == 0:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in exhaust(self)
    274     def exhaust(self):
    275         n_done = len(self.trials)
--> 276         self.run(self.max_evals - n_done, block_until_done=self.asynchronous)
    277         self.trials.refresh()
    278         return self

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in run(self, N, block_until_done)
    239                     else:
    240                         # -- loop over trials and do the jobs directly
--> 241                         self.serial_evaluate()
    242 
    243                     try:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in serial_evaluate(self, N)
    139                 ctrl = base.Ctrl(self.trials, current_trial=trial)
    140                 try:
--> 141                     result = self.domain.evaluate(spec, ctrl)
    142                 except Exception as e:
    143                     logger.info('job exception: %s' % str(e))

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\base.py in evaluate(self, config, ctrl, attach_attachments)
    854                 memo=memo,
    855                 print_node_on_error=self.rec_eval_print_node_on_error)
--> 856             rval = self.fn(pyll_rval)
    857 
    858         if isinstance(rval, (float, int, np.number)):

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fn_with_timeout(*args, **kwargs)
    639             th = Process(target=partial(fn, best_loss=self._best_loss),
    640                          args=args, kwargs=kwargs)
--> 641             th.start()
    642             if conn1.poll(self.trial_timeout):
    643                 fn_rval = conn1.recv()

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

BrokenPipeError: [Errno 32] Broken pipe

@bjkomer
Copy link
Member

bjkomer commented Nov 13, 2019

This helps a ton! I've been testing with linux. From searching around it looks like this broken pipe error is a very common issue with multiprocessing in Python and with Windows. One common solution people have is to wrap your code in an if __name__ == '__main__': block.

So something like this:

def main():
   # your code here

if __name__ == '__main__':
    main()

This doesn't always work, and some people have resorted to disabling multiprocessing for windows. Unfortunately there doesn't seem to be a flag for that in hyperopt-sklearn at the moment.

Some sources of similar issues here, here and here

@christophelebrun
Copy link
Author

christophelebrun commented Nov 14, 2019

Thank you for your help.
The 'if __name__ == '__main__': block make the error changed.
I now got an AttributeError: Can't pickle local object 'main.<locals>.f1_loss'.
Bellow is the full traceback.

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-1-c4d45cbab714> in <module>
     32 
     33 if __name__ == '__main__':
---> 34     main()

<ipython-input-1-c4d45cbab714> in main()
     27     )
     28 
---> 29     estim.fit(X_train, y_train)
     30     print(estim.score(X_train, y_train))
     31     print(estim.best_model())

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fit(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights)
    781                 increment = min(self.fit_increment,
    782                                 adjusted_max_evals - len(self.trials.trials))
--> 783                 fit_iter.send(increment)
    784                 if filename is not None:
    785                     with open(filename, 'wb') as dump_file:

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fit_iter(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights, increment)
    691                               #    so we notice them.
    692                               catch_eval_exceptions=False,
--> 693                               return_argmin=False, # -- in case no success so far
    694                              )
    695             else:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar)
    401             catch_eval_exceptions=catch_eval_exceptions,
    402             return_argmin=return_argmin,
--> 403             show_progressbar=show_progressbar,
    404         )
    405 

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\base.py in fmin(self, fn, space, algo, max_evals, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar)
    649             catch_eval_exceptions=catch_eval_exceptions,
    650             return_argmin=return_argmin,
--> 651             show_progressbar=show_progressbar)
    652 
    653 

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar)
    420                     show_progressbar=show_progressbar)
    421     rval.catch_eval_exceptions = catch_eval_exceptions
--> 422     rval.exhaust()
    423     if return_argmin:
    424         if len(trials.trials) == 0:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in exhaust(self)
    274     def exhaust(self):
    275         n_done = len(self.trials)
--> 276         self.run(self.max_evals - n_done, block_until_done=self.asynchronous)
    277         self.trials.refresh()
    278         return self

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in run(self, N, block_until_done)
    239                     else:
    240                         # -- loop over trials and do the jobs directly
--> 241                         self.serial_evaluate()
    242 
    243                     try:

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\fmin.py in serial_evaluate(self, N)
    139                 ctrl = base.Ctrl(self.trials, current_trial=trial)
    140                 try:
--> 141                     result = self.domain.evaluate(spec, ctrl)
    142                 except Exception as e:
    143                     logger.info('job exception: %s' % str(e))

~\Anaconda3\envs\DS_projet7\lib\site-packages\hyperopt\base.py in evaluate(self, config, ctrl, attach_attachments)
    854                 memo=memo,
    855                 print_node_on_error=self.rec_eval_print_node_on_error)
--> 856             rval = self.fn(pyll_rval)
    857 
    858         if isinstance(rval, (float, int, np.number)):

d:\onedrive\openclassrooms\parcours data scientist\projet7\hyperopt-sklearn\hpsklearn\estimator.py in fn_with_timeout(*args, **kwargs)
    639             th = Process(target=partial(fn, best_loss=self._best_loss),
    640                          args=args, kwargs=kwargs)
--> 641             th.start()
    642             if conn1.poll(self.trial_timeout):
    643                 fn_rval = conn1.recv()

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)

~\Anaconda3\envs\DS_projet7\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

AttributeError: Can't pickle local object 'main.<locals>.f1_loss'

@linminhtoo
Copy link
Contributor

linminhtoo commented Oct 24, 2020

I'm getting similar error using a custom f2 loss.
EDIT: moving the code from my jupyter notebook to a python script and wrapping it in if __name__ == '__main__' : solved the issue. Thanks!

def ftwo_loss_fn(y_true, y_pred):
    return 1 - sklearn.metrics.fbeta_score(y_true, y_pred, beta=2)

The full trace:

job exception: [Errno 32] Broken pipe

  0%|                                                                            | 0/1 [00:01<?, ?trial/s, best loss=?]
---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
<ipython-input-30-8f54cad0f87f> in <module>
      4                          seed=0)
      5 
----> 6 estim.fit(X_train.values, y_train.values) # , n_folds=5, cv_shuffle=True, random_state=0
      7 
      8 print(estim.score(X_test.values, y_test.values))

C:\anaconda3\lib\site-packages\hpsklearn\estimator.py in fit(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights)
    744             increment = min(self.fit_increment,
    745                             adjusted_max_evals - len(self.trials.trials))
--> 746             fit_iter.send(increment)
    747             if filename is not None:
    748                 with open(filename, 'wb') as dump_file:

C:\anaconda3\lib\site-packages\hpsklearn\estimator.py in fit_iter(self, X, y, EX_list, valid_size, n_folds, cv_shuffle, warm_start, random_state, weights, increment)
    655                               #    so we notice them.
    656                               catch_eval_exceptions=False,
--> 657                               return_argmin=False, # -- in case no success so far
    658                              )
    659             else:

C:\anaconda3\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    520             show_progressbar=show_progressbar,
    521             early_stop_fn=early_stop_fn,
--> 522             trials_save_file=trials_save_file,
    523         )
    524 

C:\anaconda3\lib\site-packages\hyperopt\base.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
    697             show_progressbar=show_progressbar,
    698             early_stop_fn=early_stop_fn,
--> 699             trials_save_file=trials_save_file,
    700         )
    701 

C:\anaconda3\lib\site-packages\hyperopt\fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    551 
    552     # next line is where the fmin is actually executed
--> 553     rval.exhaust()
    554 
    555     if return_argmin:

C:\anaconda3\lib\site-packages\hyperopt\fmin.py in exhaust(self)
    354     def exhaust(self):
    355         n_done = len(self.trials)
--> 356         self.run(self.max_evals - n_done, block_until_done=self.asynchronous)
    357         self.trials.refresh()
    358         return self

C:\anaconda3\lib\site-packages\hyperopt\fmin.py in run(self, N, block_until_done)
    290                 else:
    291                     # -- loop over trials and do the jobs directly
--> 292                     self.serial_evaluate()
    293 
    294                 self.trials.refresh()

C:\anaconda3\lib\site-packages\hyperopt\fmin.py in serial_evaluate(self, N)
    168                 ctrl = base.Ctrl(self.trials, current_trial=trial)
    169                 try:
--> 170                     result = self.domain.evaluate(spec, ctrl)
    171                 except Exception as e:
    172                     logger.error("job exception: %s" % str(e))

C:\anaconda3\lib\site-packages\hyperopt\base.py in evaluate(self, config, ctrl, attach_attachments)
    905                 print_node_on_error=self.rec_eval_print_node_on_error,
    906             )
--> 907             rval = self.fn(pyll_rval)
    908 
    909         if isinstance(rval, (float, int, np.number)):

C:\anaconda3\lib\site-packages\hpsklearn\estimator.py in fn_with_timeout(*args, **kwargs)
    603             th = Process(target=partial(fn, best_loss=self._best_loss),
    604                          args=args, kwargs=kwargs)
--> 605             th.start()
    606             if conn1.poll(self.trial_timeout):
    607                 fn_rval = conn1.recv()

C:\anaconda3\lib\multiprocessing\process.py in start(self)
    110                'daemonic processes are not allowed to have children'
    111         _cleanup()
--> 112         self._popen = self._Popen(self)
    113         self._sentinel = self._popen.sentinel
    114         # Avoid a refcycle if the target function holds an indirect

C:\anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

C:\anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

C:\anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     87             try:
     88                 reduction.dump(prep_data, to_child)
---> 89                 reduction.dump(process_obj, to_child)
     90             finally:
     91                 set_spawning_popen(None)

C:\anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

BrokenPipeError: [Errno 32] Broken pipe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants