Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elapsed step times for inference stage in Pipeline and FeatureUnion #19293

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 97 additions & 45 deletions sklearn/pipeline.py
Expand Up @@ -73,9 +73,12 @@ class Pipeline(_BaseComposition):
inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.

verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
verbose : bool or int, default=False
If True or less than 10, the time elapsed while fitting
each step will be printed as it is completed.
If greater than or equal to 10, the time elapsed
while performing each step (fitting or inference)
will be printed as it is completed.

Attributes
----------
Expand Down Expand Up @@ -237,8 +240,8 @@ def _final_estimator(self):
estimator = self.steps[-1][1]
return 'passthrough' if estimator is None else estimator

def _log_message(self, step_idx):
if not self.verbose:
def _log_message(self, step_idx, is_fitting=True):
if not self.verbose or (self.verbose < 10 and not is_fitting):
return None
name, _ = self.steps[step_idx]

Expand Down Expand Up @@ -278,7 +281,7 @@ def _fit(self, X, y=None, **fit_params_steps):
filter_passthrough=False):
if (transformer is None or transformer == 'passthrough'):
with _print_elapsed_time('Pipeline',
self._log_message(step_idx)):
self._log_message(step_idx, True)):
continue

if hasattr(memory, 'location'):
Expand All @@ -303,7 +306,7 @@ def _fit(self, X, y=None, **fit_params_steps):
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer, X, y, None,
message_clsname='Pipeline',
message=self._log_message(step_idx),
message=self._log_message(step_idx, True),
**fit_params_steps[name])
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
Expand Down Expand Up @@ -340,7 +343,7 @@ def fit(self, X, y=None, **fit_params):
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
with _print_elapsed_time('Pipeline',
self._log_message(len(self.steps) - 1)):
self._log_message(len(self.steps) - 1, True)):
if self._final_estimator != 'passthrough':
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **fit_params_last_step)
Expand Down Expand Up @@ -379,7 +382,7 @@ def fit_transform(self, X, y=None, **fit_params):

last_step = self._final_estimator
with _print_elapsed_time('Pipeline',
self._log_message(len(self.steps) - 1)):
self._log_message(len(self.steps) - 1, True)):
if last_step == 'passthrough':
return Xt
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
Expand Down Expand Up @@ -414,9 +417,17 @@ def predict(self, X, **predict_params):
y_pred : array-like
"""
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
return self.steps[-1][-1].predict(Xt, **predict_params)
for idx, name, transform in self._iter(with_final=False,
filter_passthrough=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
if (transform is None or transform == 'passthrough'):
continue
Xt = transform.transform(Xt)
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
y_pred = self.steps[-1][-1].predict(Xt, **predict_params)
return y_pred

@if_delegate_has_method(delegate='_final_estimator')
def fit_predict(self, X, y=None, **fit_params):
Expand Down Expand Up @@ -449,8 +460,8 @@ def fit_predict(self, X, y=None, **fit_params):
Xt = self._fit(X, y, **fit_params_steps)

fit_params_last_step = fit_params_steps[self.steps[-1][0]]
with _print_elapsed_time('Pipeline',
self._log_message(len(self.steps) - 1)):
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, True)):
y_pred = self.steps[-1][-1].fit_predict(Xt, y,
**fit_params_last_step)
return y_pred
Expand All @@ -470,9 +481,14 @@ def predict_proba(self, X):
y_proba : array-like of shape (n_samples, n_classes)
"""
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
return self.steps[-1][-1].predict_proba(Xt)
for idx, name, transform in self._iter(with_final=False):
with _print_elapsed_time(
'Pipeline', self._log_message(idx, False)):
Xt = transform.transform(Xt)
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
y_proba = self.steps[-1][-1].predict_proba(Xt)
return y_proba

@if_delegate_has_method(delegate='_final_estimator')
def decision_function(self, X):
Expand All @@ -489,9 +505,14 @@ def decision_function(self, X):
y_score : array-like of shape (n_samples, n_classes)
"""
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
return self.steps[-1][-1].decision_function(Xt)
for idx, name, transform in self._iter(with_final=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
Xt = transform.transform(Xt)
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
y_score = self.steps[-1][-1].decision_function(Xt)
return y_score

@if_delegate_has_method(delegate='_final_estimator')
def score_samples(self, X):
Expand All @@ -508,9 +529,14 @@ def score_samples(self, X):
y_score : ndarray of shape (n_samples,)
"""
Xt = X
for _, _, transformer in self._iter(with_final=False):
Xt = transformer.transform(Xt)
return self.steps[-1][-1].score_samples(Xt)
for idx, _, transformer in self._iter(with_final=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
Xt = transformer.transform(Xt)
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
y_score = self.steps[-1][-1].score_samples(Xt)
return y_score

@if_delegate_has_method(delegate='_final_estimator')
def predict_log_proba(self, X):
Expand All @@ -527,9 +553,14 @@ def predict_log_proba(self, X):
y_score : array-like of shape (n_samples, n_classes)
"""
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
return self.steps[-1][-1].predict_log_proba(Xt)
for idx, name, transform in self._iter(with_final=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
Xt = transform.transform(Xt)
with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
y_score = self.steps[-1][-1].predict_log_proba(Xt)
return y_score

@property
def transform(self):
Expand All @@ -556,8 +587,12 @@ def transform(self):

def _transform(self, X):
Xt = X
for _, _, transform in self._iter():
Xt = transform.transform(Xt)
for idx, _, transform in self._iter(filter_passthrough=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
if (transform is None or transform == 'passthrough'):
continue
Xt = transform.transform(Xt)
return Xt

@property
Expand Down Expand Up @@ -614,12 +649,18 @@ def score(self, X, y=None, sample_weight=None):
score : float
"""
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
for idx, name, transform in self._iter(with_final=False):
with _print_elapsed_time('Pipeline',
self._log_message(idx, False)):
Xt = transform.transform(Xt)
score_params = {}
if sample_weight is not None:
score_params['sample_weight'] = sample_weight
return self.steps[-1][-1].score(Xt, y, **score_params)

with _print_elapsed_time(
'Pipeline', self._log_message(len(self.steps) - 1, False)):
score = self.steps[-1][-1].score(Xt, y, **score_params)
return score

@property
def classes_(self):
Expand Down Expand Up @@ -705,9 +746,12 @@ def make_pipeline(*steps, memory=None, verbose=False):
inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.

verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
verbose : bool or int, default=False
If True or less than 10, the time elapsed while fitting
each step will be printed as it is completed.
If greater than or equal to 10, the time elapsed
while performing each step (fitting or inference)
will be printed as it is completed.

See Also
--------
Expand All @@ -729,8 +773,15 @@ def make_pipeline(*steps, memory=None, verbose=False):
return Pipeline(_name_estimators(steps), memory=memory, verbose=verbose)


def _transform_one(transformer, X, y, weight, **fit_params):
res = transformer.transform(X)
def _transform_one(transformer,
X,
y,
weight,
message_clsname='',
message=None,
**fit_params):
with _print_elapsed_time(message_clsname, message):
res = transformer.transform(X)
# if we have a weight for this transformer, multiply output
if weight is None:
return res
Expand Down Expand Up @@ -951,7 +1002,7 @@ def fit(self, X, y=None, **fit_params):
self : FeatureUnion
This estimator
"""
transformers = self._parallel_func(X, y, fit_params, _fit_one)
transformers = self._parallel_func(X, y, fit_params, _fit_one, True)
if not transformers:
# All transformers are None
return self
Expand All @@ -977,7 +1028,8 @@ def fit_transform(self, X, y=None, **fit_params):
hstack of results of transformers. sum_n_components is the
sum of n_components (output dimension) over transformers.
"""
results = self._parallel_func(X, y, fit_params, _fit_transform_one)
results = self._parallel_func(X, y, fit_params,
_fit_transform_one, True)
if not results:
# All transformers are None
return np.zeros((X.shape[0], 0))
Expand All @@ -987,12 +1039,13 @@ def fit_transform(self, X, y=None, **fit_params):

return self._hstack(Xs)

def _log_message(self, name, idx, total):
if not self.verbose:
def _log_message(self, name, idx, total, is_fitting=True):
if not self.verbose or (self.verbose < 10 and not is_fitting):
return None

return '(step %d of %d) Processing %s' % (idx, total, name)

def _parallel_func(self, X, y, fit_params, func):
def _parallel_func(self, X, y, fit_params, func, is_fitting):
"""Runs func in parallel on X and y"""
self.transformer_list = list(self.transformer_list)
self._validate_transformers()
Expand All @@ -1002,7 +1055,8 @@ def _parallel_func(self, X, y, fit_params, func):
return Parallel(n_jobs=self.n_jobs)(delayed(func)(
transformer, X, y, weight,
message_clsname='FeatureUnion',
message=self._log_message(name, idx, len(transformers)),
message=self._log_message(name, idx,
len(transformers), is_fitting),
**fit_params) for idx, (name, transformer,
weight) in enumerate(transformers, 1))

Expand All @@ -1021,9 +1075,7 @@ def transform(self, X):
hstack of results of transformers. sum_n_components is the
sum of n_components (output dimension) over transformers.
"""
Xs = Parallel(n_jobs=self.n_jobs)(
delayed(_transform_one)(trans, X, None, weight)
for name, trans, weight in self._iter())
Xs = self._parallel_func(X, None, {}, _transform_one, False)
if not Xs:
# All transformers are None
return np.zeros((X.shape[0], 0))
Expand Down
21 changes: 18 additions & 3 deletions sklearn/tests/test_pipeline.py
Expand Up @@ -1157,7 +1157,7 @@ def test_pipeline_param_error():
r'\[FeatureUnion\].*\(step 2 of 2\) Processing mult2.* total=.*\n$'),
(FeatureUnion([('mult1', 'drop'), ('mult2', Mult()), ('mult3', 'drop')]),
r'\[FeatureUnion\].*\(step 1 of 1\) Processing mult2.* total=.*\n$')
], ['fit', 'fit_transform', 'fit_predict'])
], ['fit', 'fit_transform', 'fit_predict', 'transform', 'predict'])
if hasattr(est, method) and not (
method == 'fit_transform' and hasattr(est, 'steps') and
isinstance(est.steps[-1][1], FitParamT))
Expand All @@ -1172,11 +1172,26 @@ def test_verbose(est, method, pattern, capsys):
y = [[7], [8]]

est.set_params(verbose=False)
func(X, y)
if method in ['transform', 'predict']:
est.fit(X, y)
func(X)
else:
func(X, y)
assert not capsys.readouterr().out, 'Got output for verbose=False'

est.set_params(verbose=True)
func(X, y)
if method in ['transform', 'predict']:
func(X)
assert not capsys.readouterr().out, 'Got output for verbose=False'
else:
func(X, y)
assert re.match(pattern, capsys.readouterr().out)

est.set_params(verbose=10)
if method in ['transform', 'predict']:
func(X)
else:
func(X, y)
assert re.match(pattern, capsys.readouterr().out)


Expand Down