Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT allow metadata to be transformed in a Pipeline #28901

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions doc/whats_new/v1.6.rst
Expand Up @@ -92,3 +92,10 @@ Thanks to everyone who has contributed to the maintenance and improvement of
the project since version 1.5, including:

TODO: update at the time of the release.

:mod:`pipeline`
---------------

- |Feature| :class:`pipeline.Pipeline` can now transform metadata up to the step
requiring the metadata, which can be set using the `transform_input` parameter.
:pr:`28901` by `Adrin Jalali`_.
8 changes: 6 additions & 2 deletions sklearn/compose/tests/test_column_transformer.py
Expand Up @@ -2631,15 +2631,19 @@ def test_metadata_routing_for_column_transformer(method):
)

if method == "transform":
trs.fit(X, y)
trs.fit(X, y, sample_weight=sample_weight, metadata=metadata)
trs.transform(X, sample_weight=sample_weight, metadata=metadata)
else:
getattr(trs, method)(X, y, sample_weight=sample_weight, metadata=metadata)

assert len(registry)
for _trs in registry:
check_recorded_metadata(
obj=_trs, method=method, sample_weight=sample_weight, metadata=metadata
obj=_trs,
method=method,
parent=method,
sample_weight=sample_weight,
metadata=metadata,
)


Expand Down
12 changes: 10 additions & 2 deletions sklearn/ensemble/tests/test_stacking.py
Expand Up @@ -973,13 +973,21 @@ def test_metadata_routing_for_stacking_estimators(Estimator, Child, prop, prop_v
assert len(registry)
for sub_est in registry:
check_recorded_metadata(
obj=sub_est, method="fit", split_params=(prop), **{prop: prop_value}
obj=sub_est,
method="fit",
parent="fit",
split_params=(prop),
**{prop: prop_value},
)
# access final_estimator:
registry = est.final_estimator_.registry
assert len(registry)
check_recorded_metadata(
obj=registry[-1], method="predict", split_params=(prop), **{prop: prop_value}
obj=registry[-1],
method="predict",
parent="predict",
split_params=(prop),
**{prop: prop_value},
)


Expand Down
2 changes: 1 addition & 1 deletion sklearn/ensemble/tests/test_voting.py
Expand Up @@ -759,7 +759,7 @@ def test_metadata_routing_for_voting_estimators(Estimator, Child, prop):
registry = estimator[1].registry
assert len(registry)
for sub_est in registry:
check_recorded_metadata(obj=sub_est, method="fit", **kwargs)
check_recorded_metadata(obj=sub_est, method="fit", parent="fit", **kwargs)


@pytest.mark.usefixtures("enable_slep006")
Expand Down
1 change: 1 addition & 0 deletions sklearn/model_selection/tests/test_search.py
Expand Up @@ -2612,6 +2612,7 @@ def test_multi_metric_search_forwards_metadata(SearchCV, param_search):
check_recorded_metadata(
obj=_scorer,
method="score",
parent="_score",
split_params=("sample_weight", "metadata"),
sample_weight=score_weights,
metadata=score_metadata,
Expand Down
4 changes: 4 additions & 0 deletions sklearn/model_selection/tests/test_validation.py
Expand Up @@ -2600,6 +2600,7 @@ def test_validation_functions_routing(func):
check_recorded_metadata(
obj=_scorer,
method="score",
parent=func.__name__,
split_params=("sample_weight", "metadata"),
sample_weight=score_weights,
metadata=score_metadata,
Expand All @@ -2610,6 +2611,7 @@ def test_validation_functions_routing(func):
check_recorded_metadata(
obj=_splitter,
method="split",
parent=func.__name__,
groups=split_groups,
metadata=split_metadata,
)
Expand All @@ -2619,6 +2621,7 @@ def test_validation_functions_routing(func):
check_recorded_metadata(
obj=_estimator,
method="fit",
parent=func.__name__,
split_params=("sample_weight", "metadata"),
sample_weight=fit_sample_weight,
metadata=fit_metadata,
Expand Down Expand Up @@ -2656,6 +2659,7 @@ def test_learning_curve_exploit_incremental_learning_routing():
check_recorded_metadata(
obj=_estimator,
method="partial_fit",
parent="learning_curve",
split_params=("sample_weight", "metadata"),
sample_weight=fit_sample_weight,
metadata=fit_metadata,
Expand Down
126 changes: 117 additions & 9 deletions sklearn/pipeline.py
Expand Up @@ -32,6 +32,7 @@
MethodMapping,
_raise_for_params,
_routing_enabled,
get_routing_for_object,
process_routing,
)
from .utils.metaestimators import _BaseComposition, available_if
Expand Down Expand Up @@ -93,6 +94,17 @@ class Pipeline(_BaseComposition):
must define `fit`. All non-last steps must also define `transform`. See
:ref:`Combining Estimators <combining_estimators>` for more details.

transform_input : list of str, default=None
This enables transforming some input arguments to ``fit`` (other than ``X``)
to be transformed by the steps of the pipeline up to the step which requires
them. Requirement is defined via :ref:`metadata routing <metadata_routing>`.
This can be used to pass a validation set through the pipeline for instance.

See the example TBD for more details.

You can only set this if metadata routing is enabled, which you
can enable using ``sklearn.set_config(enable_metadata_routing=True)``.

memory : str or object with the joblib.Memory interface, default=None
Used to cache the fitted transformers of the pipeline. The last step
will never be cached, even if it is a transformer. By default, no
Expand Down Expand Up @@ -160,12 +172,14 @@ class Pipeline(_BaseComposition):

_parameter_constraints: dict = {
"steps": [list, Hidden(tuple)],
"transform_input": [list, None],
"memory": [None, str, HasMethods(["cache"])],
"verbose": ["boolean"],
}

def __init__(self, steps, *, memory=None, verbose=False):
def __init__(self, steps, *, transform_input=None, memory=None, verbose=False):
self.steps = steps
self.transform_input = transform_input
self.memory = memory
self.verbose = verbose

Expand Down Expand Up @@ -378,9 +392,66 @@ def _check_method_params(self, method, props, **kwargs):
fit_params_steps[step]["fit_predict"][param] = pval
return fit_params_steps

def _get_step_params(self, *, step_idx, step_params, all_params):
"""Get params (metadata) for step `name`.

This transforms the metadata up to this step if required, which is
indicated by the `transform_input` parameter.

If a param in `step_params` is included in the `transform_input` list, it
will be transformed.

`all_params` are the metadata passed by the user. Used to call `transform`
on the pipeline itself.
"""
if (
self.transform_input is None
or not all_params
or not step_params
or step_idx == 0
):
# we only need to process step_params if transform_input is set
# and metadata is given by the user.
return step_params

sub_pipeline = self[:step_idx]
sub_metadata_routing = get_routing_for_object(sub_pipeline)
# here we get the metadata required by sub_pipeline.transform
transform_params = {
key: value
for key, value in all_params.items()
if key
in sub_metadata_routing.consumes(
method="transform", params=all_params.keys()
)
}
transformed_params = dict()
transformed_cache = dict() # used to transform each param once
for method, method_params in step_params.items():
transformed_params[method] = Bunch()
for param_name, param_value in method_params.items():
if param_name in self.transform_input:
# transform the parameter
if param_name not in transformed_cache:
transformed_cache[param_name] = sub_pipeline.transform(
param_value, **transform_params
)
transformed_params[method][param_name] = transformed_cache[
param_name
]
else:
transformed_params[method][param_name] = param_value
return transformed_params

# Estimator interface

def _fit(self, X, y=None, routed_params=None):
def _fit(self, X, y=None, routed_params=None, raw_params=None):
"""Fit the pipeline except the last step.

routed_params is the output of `process_routing`
raw_params is the parameters passed by the user, used when `transform_input`
is set by the user, to transform metadata using a sub-pipeline.
"""
# shallow copy of steps - this should really be steps_
self.steps = list(self.steps)
self._validate_steps()
Expand All @@ -403,14 +474,20 @@ def _fit(self, X, y=None, routed_params=None):
else:
cloned_transformer = clone(transformer)
# Fit or load from cache the current transformer
step_params = self._get_step_params(
step_idx=step_idx,
step_params=routed_params[name],
all_params=raw_params,
)

X, fitted_transformer = fit_transform_one_cached(
cloned_transformer,
X,
y,
None,
weight=None,
message_clsname="Pipeline",
message=self._log_message(step_idx),
params=routed_params[name],
params=step_params,
)
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
Expand Down Expand Up @@ -465,11 +542,22 @@ def fit(self, X, y=None, **params):
self : object
Pipeline with fitted steps.
"""
if not _routing_enabled() and self.transform_input is not None:
raise ValueError(
"The `transform_input` parameter can only be set if metadata "
"routing is enabled. You can enable metadata routing using "
"`sklearn.set_config(enable_metadata_routing=True)`."
)

routed_params = self._check_method_params(method="fit", props=params)
Xt = self._fit(X, y, routed_params)
Xt = self._fit(X, y, routed_params, raw_params=params)
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if self._final_estimator != "passthrough":
last_step_params = routed_params[self.steps[-1][0]]
last_step_params = self._get_step_params(
step_idx=len(self) - 1,
step_params=routed_params[self.steps[-1][0]],
all_params=params,
)
self._final_estimator.fit(Xt, y, **last_step_params["fit"])

return self
Expand Down Expand Up @@ -536,7 +624,11 @@ def fit_transform(self, X, y=None, **params):
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if last_step == "passthrough":
return Xt
last_step_params = routed_params[self.steps[-1][0]]
last_step_params = self._get_step_params(
step_idx=len(self) - 1,
step_params=routed_params[self.steps[-1][0]],
all_params=params,
)
if hasattr(last_step, "fit_transform"):
return last_step.fit_transform(
Xt, y, **last_step_params["fit_transform"]
Expand Down Expand Up @@ -1217,7 +1309,7 @@ def _name_estimators(estimators):
return list(zip(names, estimators))


def make_pipeline(*steps, memory=None, verbose=False):
def make_pipeline(*steps, memory=None, transform_input=None, verbose=False):
"""Construct a :class:`Pipeline` from the given estimators.

This is a shorthand for the :class:`Pipeline` constructor; it does not
Expand All @@ -1239,6 +1331,17 @@ def make_pipeline(*steps, memory=None, verbose=False):
or ``steps`` to inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.

transform_input : list of str, default=None
This enables transforming some input arguments to ``fit`` (other than ``X``)
to be transformed by the steps of the pipeline up to the step which requires
them. Requirement is defined via :ref:`metadata routing <metadata_routing>`.
This can be used to pass a validation set through the pipeline for instance.

See the example TBD for more details.

You can only set this if metadata routing is enabled, which you
can enable using ``sklearn.set_config(enable_metadata_routing=True)``.

verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
Expand All @@ -1262,7 +1365,12 @@ def make_pipeline(*steps, memory=None, verbose=False):
Pipeline(steps=[('standardscaler', StandardScaler()),
('gaussiannb', GaussianNB())])
"""
return Pipeline(_name_estimators(steps), memory=memory, verbose=verbose)
return Pipeline(
_name_estimators(steps),
transform_input=transform_input,
memory=memory,
verbose=verbose,
)


def _transform_one(transformer, X, y, weight, columns=None, params=None):
Expand Down