-
-
Notifications
You must be signed in to change notification settings - Fork 43
Conversation
squash
- there are some circular imports with dask_ml.model_selection.train_test_split - This implementaiton can be simplified if dask_searchcv supports customizing the methods.train function. This will enable me to use a GridSearchCV inside SuccessiveHalving. This will allow support for all the other keywords arguments in DaskBaseSearchCV
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exciting to see this come together!
I did a first pass through. Will have more thoughts later.
Do you have docs / a notebook started that uses this?
dask_searchcv/adaptive.py
Outdated
def _train(model, X, y, X_val, y_val, max_iter=1, dry_run=False, classes=None, | ||
s=1, i=1, k='_', verbose=False, **fit_kwargs): | ||
""" | ||
Train function. Returns validation loss. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of our projects will use (variations on) the numpydoc format: http://numpydoc.readthedocs.io/en/latest/format.html. So this would be formatted as
Helper to train the model on the provided data.
Parameters
-----------
model : Estimator
X : array-like, [n_samples, n_features]
y : array-like, [n_samples,]
...
Returns
--------
validation_score : float
The model's score on the validation set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to be careful with loss
vs. score
. In scikit-learn's terminology. For scores, higher values are better than lower values. For losses, lower values are better. http://scikit-learn.org/stable/modules/model_evaluation.html
dask_searchcv/adaptive.py
Outdated
if verbose: | ||
msg = ("Training model {k} in bracket s={s} iteration {i}. " | ||
"This model is {percent:.1f}% trained for this bracket") | ||
print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to use the logging
module, as it's more useful in a distributed setting (print
statements may be hard / difficult to access if they're on a different machine).
At the top of the module, you can use logger = logging.getLogger(__name__)
to get or create a new logger with the name dask_search_cv.adaptive
. Then when you actually want to see the log, you can do an annoying
logger = logging.getLogger('dask_searchcv.adaptive')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
There's probably a better way to integrate with distributed
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm running into an issue, it's not logging on this line in particular (though it's logging on other lines). I'll try to formulate a bug report.
dask_searchcv/adaptive.py
Outdated
msg = ("Training model {k} in bracket s={s} iteration {i}. " | ||
"This model is {percent:.1f}% trained for this bracket") | ||
print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter)) | ||
if not dry_run: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: what is dry_run for? Seems to be just for testing. Let's see if we can refactor the implementation / tests to make dry_run unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dry_run
is for Hyperband.info
, which runs Hyperband without calling model.fit
. info
is a useful metric to see how many calls to partial_fit
will be made.
dask_searchcv/adaptive.py
Outdated
print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter)) | ||
if not dry_run: | ||
_ = model.partial_fit(X, y, classes, **fit_kwargs) | ||
fit_time = default_timer() - start_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a good thing to log: dask/dask-ml#193
dask_searchcv/adaptive.py
Outdated
start_time = default_timer() | ||
score = model.score(X_val, y_val) if not dry_run else np.random.rand() | ||
score_time = default_timer() - start_time | ||
return score, {'score_time': score_time, 'fit_time': fit_time} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the docs to reflect this return value.
dask_searchcv/adaptive.py
Outdated
def _successive_halving(params, model, n=None, r=None, s=None, verbose=False, | ||
shared=None, eta=3, _prefix='', dry_run=False, **fit_kwargs): | ||
client = _get_client() | ||
data = client.get_dataset('_dask_data') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to pass the data / a future for the data to the function, rather than publishing / unpublishing.
dask_searchcv/adaptive.py
Outdated
How aggressive to be while search. The default is 3, and theory | ||
suggests that ``eta=e=2.718...``. Changing this is not recommended. | ||
|
||
run_in_parallel : bool, optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this n_jobs
and have the default be -1
. Then run_in_parallel = n_jobs == -1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing some support for n_jobs
in the distributed Client, but I'll leave that for a future PR. Right now n_jobs
has to be either 0 or -1.
dask_searchcv/adaptive.py
Outdated
optimal config from ``params`` given this computational effort | ||
``max_iter``. | ||
|
||
.. _Hyperband model selection algorithm: https://arxiv.org/abs/1603.06560 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an official "references" section / format. See https://github.com/dask/dask-ml/blob/7db9ab799d7a3ee3e8a87c48603768b0bc79256f/dask_ml/cluster/k_means.py#L87-L92
dask_searchcv/adaptive.py
Outdated
|
||
def fit(self, X, y, dry_run=False, verbose=False, **fit_kwargs): | ||
""" | ||
This function implicitly assumes that higher scores are better. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These docs would be better to have in the class docstring, since they're describing the parameters like model
, which were passed above.
Also, you can safely assume a few things, since they're part of the scikit-learn API
- higher scores are better
- the signature of
model.set_params
sklearn.base.clone
will work.
It's good to call out partial_fit
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some words on why I included these lines in the docs. I see this as being useful for other classes that do not conform to sklearn.BaseEstimator
classes.
I've raised a ValueError if the estimator doesn't have partial_fit
, included a helpful error message and tested for it.
dask_searchcv/adaptive.py
Outdated
client = _get_client() | ||
|
||
variables = {'val_score': -np.inf, 'model': None, 'config': None, | ||
'classes': np.unique(y).tolist(), 'eta': self.eta, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding classes, Hyperband will work with regression as well, right? In that case, we don't want to unique
.
Since unique is maybe expensive (even for categorical y
), we'll want to require the user provide it. With e.g. sklearn.linear_model.SGDClassifier
, you're required to pass classes
to the first partial_fit
call.
We have two options for where the user should provide the required fit kwargs
- As a
fit_kwargs
to argument toHyperband
itself - As keyword arguments in
.fit
that are passed through
I'll do some research on which is preferable. Perhaps @ogrisel has thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've stripped unique
out of the implementation. Right now, I have Hyperband.fit(X, y, **fit_kwargs)
. This is what sklearn GridSearchCV.fit does, and passing fit_kwargs
to __init__
is deprecated.
Thanks for the review, @TomAugspurger. It was useful. I've updated most of the comments, but am still debugging. Here's a gist of my example notebook: https://gist.github.com/stsievert/38780f0c746b2f1c8eb32feea4dd14de, with a simple test function. Right now I'm facing some memory issues, and issues with |
👍 I'll take a look tonight or tomorrow. I've lost my day to debugging strange distributed errors that I think are my fault :) FYI, LMK if you need help getting a functional version of scikit-learn master + joblib master. It takes a bit of work. Or I can scale up the cluster again. |
Thanks @TomAugspurger! And no rush, I'm taking tomorrow and Thursday off, and have other things on my plate. |
This is ready for review no @TomAugspurger Some notes:
*: tested with |
Will take a quick pass now.
Did you write the function that's being submitted? If so, can you write the logging inside the function, so that it logs when running? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through it. Trying things out locally.
dask_searchcv/tests/test_adaptive.py
Outdated
from sklearn.linear_model import Lasso | ||
|
||
|
||
class TestFunction: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this is just a helper, the test name shouldn't start with Test
as then it gets discovered by test runners (they typically pick up tests named Test*
and functions named test_*
.
dask_searchcv/tests/test_adaptive.py
Outdated
return self._fn() | ||
|
||
|
||
def test_hyperband_sklearn(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://distributed.readthedocs.io/en/latest/develop.html#writing-tests may be helpful here.
Async / distributed tests are tricky... I've found the second version, with cluster()
, to be the easiest to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would remove your _get_client()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still running into testing issues, even after trying with cluster()
and gen_cluster(client=True)
. I run into issues even mirroring distributed/tests/test_client.py#L4717. I think I've resolved all the systems level issues, and will check next week.
Any tips? I've pushed some intermingled gen_cluster
and with cluster()
code in 22c86e4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you try this out locally to? I've also had to resolve some systems level issues.
dask_searchcv/tests/test_adaptive.py
Outdated
def test_hyperband_sklearn(): | ||
client = _get_client() | ||
X, y = make_classification(n_samples=1000, chunks=500) | ||
classes = da.unique(y) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always be [0, 1]
. It's controlled by the n_classes
argument to make_classification
, and currently only 2
is supported.
dask_searchcv/tests/test_adaptive.py
Outdated
client = _get_client() | ||
X, y = make_classification(n_samples=1000, chunks=500) | ||
classes = da.unique(y) | ||
model = PartialSGDClassifier(warm_start=True, classes=classes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartialSGDClassifier is being deprecated in favor of Incremental(SGDClassifier())
.
Hopefully this will work for you (if not, let me know). It'll require passing through fit_kwargs
like
alg.fit(X, y, classes=[0, 1]
And you'll need to ensure that classes
gets through to Incremental.fit
, which passes it through to SGDClassifier.partial_fit
.
dask_searchcv/tests/test_adaptive.py
Outdated
alg.fit(X, y, dry_run=True) | ||
assert len(alg.history) == 40 | ||
|
||
alg.fit(X, y, classes=classes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fit call calls compute
internally, which blocks. Ideally the following would work:
delayed_object = alg.fit(X, y, classes=classes, compute=False)
future = c.compute(delayed_object)
result = yield future
dask_searchcv/tests/test_adaptive.py
Outdated
|
||
# @_with_client | ||
@gen_cluster(client=True) | ||
def test_hyperband_sklearn(*args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doepending on the options to gen_cluster
, the test is called with the following
- client
- scheduler
- list of workers,
so the typical function signature would be
@gen_cluster(client=True)
def test_hyperband_sklearn(c, s, a, b):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, (I think) gen_cluster
tests require essentially everything to be async. After import sklearn.linear_model.SGDClassifier
, this fails with
../distributed/distributed/utils_test.py:808:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/ioloop.py:582: in run_sync
return future_cell[0].result()
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:1113: in run
yielded = self.gen.send(value)
../distributed/distributed/utils_test.py:789: in coro
result = yield future
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:1099: in run
value = future.result()
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:296: in wrapper
result = func(*args, **kwargs)
/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/types.py:248: in wrapped
coro = func(*args, **kwargs)
dask_searchcv/tests/test_adaptive.py:50: in test_hyperband_sklearn
X, y = make_classification(n_samples=1000, chunks=500)
../dask-ml/dask_ml/datasets.py:332: in make_classification
informative_idx, beta = dask.compute(informative_idx, beta)
../dask/dask/base.py:401: in compute
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.0 = <zip object at 0x116259a08>
> return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
E AssertionError: yield from wasn't used with future
The problem is the line informative_idx, beta = dask.compute(informative_idx, beta)
coming from dask_ml.datasets
.
So I'd recommend trying a with cluster
style test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for this diff
diff --git a/dask_searchcv/tests/test_adaptive.py b/dask_searchcv/tests/test_adaptive.py
index 199c5a1..2fbe5e8 100644
--- a/dask_searchcv/tests/test_adaptive.py
+++ b/dask_searchcv/tests/test_adaptive.py
@@ -1,18 +1,15 @@
from dask_searchcv.adaptive import _top_k, Hyperband
-from dask_ml.linear_model import PartialSGDClassifier
+from sklearn.linear_model import SGDClassifier
from dask_ml.datasets import make_classification
import numpy as np
import dask.array as da
from dask.distributed import Client
import pandas as pd
import pytest
-import distributed
from pprint import pprint
import scipy.stats as stats
-import random
from sklearn.linear_model import Lasso
from distributed.utils_test import cluster, loop, gen_cluster
-import time
from dask_ml.wrappers import Incremental
@@ -45,23 +42,24 @@ def _with_client(fn):
# @_with_client
-@gen_cluster(client=True)
-def test_hyperband_sklearn(*args, **kwargs):
- X, y = make_classification(n_samples=1000, chunks=500)
- classes = da.unique(y)
- model = Incremental(SGDClassifier(warm_start=True, classes=classes,
- loss='hinge', penalty='elasticnet'))
-
- params = {'alpha': np.logspace(-3, 0, num=int(10e3)),
- 'l1_ratio': np.linspace(0, 1, num=int(10e3))}
- alg = Hyperband(model, params, max_iter=9, n_jobs=0)
-
- alg.fit(X, y, dry_run=True)
- assert len(alg.history) == 20
- alg.fit(X, y, dry_run=True)
- assert len(alg.history) == 40
-
- alg.fit(X, y, classes=classes)
+def test_hyperband_sklearn():
+ with cluster() as (s, (a, b)):
+ client = Client(s['address']) # noqa
+ X, y = make_classification(n_samples=1000, chunks=500)
+ classes = da.unique(y)
+ model = Incremental(SGDClassifier(warm_start=True,
+ loss='hinge', penalty='elasticnet'))
+
+ params = {'alpha': np.logspace(-3, 0, num=int(10e3)),
+ 'l1_ratio': np.linspace(0, 1, num=int(10e3))}
+ alg = Hyperband(model, params, max_iter=9, n_jobs=0)
+
+ alg.fit(X, y, dry_run=True, classes=[0, 1])
+ assert len(alg.history) == 20
+ alg.fit(X, y, dry_run=True, classes=[0, 1])
+ assert len(alg.history) == 40
+
+ alg.fit(X, y, classes=classes)
# @_with_client
That's running, but taking a while. Not sure if it's a long test, or if it's deadlocked somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm resolving some of these issues now.
It's an interesting error. I don't know yet what's going on.
…On Mon, Jun 18, 2018 at 7:41 PM, Scott Sievert ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In dask_searchcv/adaptive.py
<#72 (comment)>:
> + iters = 0
+ times = []
+ for i in range(s + 1):
+ n_i = math.floor(n * eta**-i)
+ r_i = r * eta**i
+ iters += r_i
+ msg = ('Training %d models for %d iterations during iteration %d '
+ 'of bracket=%d')
+ logger.info(msg, n_i, r_i, i, s)
+
+ results = {k: dask.delayed(_train)(model, data, max_iter=r_i,
+ s=s, i=i, k=k, dry_run=dry_run,
+ scorer=scorer, **fit_kwargs)
+ for k, model in models.items()}
+
+ results = {k: v.compute() for k, v in results.items()}
This error happens inconsistently too. I've run Hyperband.fit 4 times
now, and it's passed once (with no changes to source or notebook in
between).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#72 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszKw3ZtixhUIzfuI2JNAUwQfSr8Lfks5t-DrEgaJpZM4UYPL1>
.
|
Should I create a minimal working example? |
What's the error? I'm debugging an Incremental / distributed issue, and don't want to duplicate efforts. |
dask_searchcv/adaptive.py
Outdated
# results = dask.compute(delayed_results)[0] | ||
|
||
val_scores = {k: r[0] for k, r in results.items()} | ||
times += [{'id': k, **r[1]} for k, r in results.items()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is invalid syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I picked it up at the same time, and it's fixed in the most recent commit.
There's a bigger issue for the failing tests – dask-searchcv doesn't depend on dask-ml. I think this issue should be resolved soon, for some other reasons.
The discussion around the Incremental+distributed issue is at #72 (comment). I only provide a traceback and haven't spent time debugging or crafting a minimal working example. |
Currently it looks like we're calling compute within To start off I think it might simplify things if we not special case any code for |
My apologies for not noticing this sooner. I only realized that this was happening while running the example notebook while trying to find out what has happening with the confusing error that @stsievert was getting. |
I also think that flattening things down so that we only call compute once in between epochs, rather than calling compute within compute, will resolve any such confusing errors. |
@stsievert let me know if the above makes sense or not. I'd be happy to jump on a call and go through things if that would be helpful. |
Thanks for noticing this @mrocklin. I think the administrative work you're mentioning is the part of successive halving that's adaptive. The models are sorted in each iteration to Here's a simplified version of what's happening: import numpy as np
import dask
def _successive_halving():
v1 = dask.delayed(np.random.rand)() # model 1's score
v2 = dask.delayed(np.random.rand)() # model 2's score
if v1.compute() > v2.compute():
return 1
return 0
if __name__ == "__main__":
# what we're doing now
delayed_results = [dask.delayed(_successive_halving() for _ in range(5)]
results = [r.compute() for f in delayed_results]
# what I think we should do (well, with joblib)
# results = [_successive_halving() for _ in range(5)] This is what what's happening now. I think we should uncomment the last lines and use joblib for the outer-loop. How does that sound? |
There are a variety of reasons why it's nicer to create a single large graph if possible, I'm happy to expand on these if desired. Calling Dask from within Dask is more complex for a variety of reasons, and is best avoided if possible, especially within library code, which has a higher standard of simplicity. Technology options in Dask are ordered something like the following:
Ideally most things we write will be towards the simpler end of this list if possible. Currently we're at item four, but would like to be at 1 or maybe 2. Generally the decision between 1 and 2 is "Do I absolutely need to control the computation between calls to |
Anyway, I suggest that we check in at some point today |
Yeah, we should check in. I'm free all day. Let me know when works. Thanks for the nice summary of the options to use with Dask. I suggested using joblib not to depend on dask.distributed, but when you put it that way I'm inclined towards option 2. This is less complicated than joblib + dask.delayed, plus it will still use all the workers. I think we should also discuss where to merge this in. I have some dependence on dask-ml's |
I'd be happy to join in if you end up doing it today.
…On Wed, Jun 20, 2018 at 12:59 PM, Scott Sievert ***@***.***> wrote:
Yeah, we should check in. I'm free all day. Let me know when works.
Thanks for the nice summary of the options to use with Dask. I suggested
using joblib not to depend on dask.distributed, but when you put it that
way I'm inclined towards option 2.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#72 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABQHIuXuPb4RSXogg6iJzRg1m6zlrMOpks5t-o2GgaJpZM4UYPL1>
.
|
Whoops! Too late
On Wed, Jun 20, 2018 at 2:10 PM, Tom Augspurger <notifications@github.com>
wrote:
… I'd be happy to join in if you end up doing it today.
On Wed, Jun 20, 2018 at 12:59 PM, Scott Sievert ***@***.***>
wrote:
> Yeah, we should check in. I'm free all day. Let me know when works.
>
> Thanks for the nice summary of the options to use with Dask. I suggested
> using joblib not to depend on dask.distributed, but when you put it that
> way I'm inclined towards option 2.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <#72 (comment)>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/
ABQHIuXuPb4RSXogg6iJzRg1m6zlrMOpks5t-o2GgaJpZM4UYPL1>
> .
>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#72 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszNILstnvJRGgLDukicZqJq4DkShzks5t-pACgaJpZM4UYPL1>
.
|
Here is some very loose pseudocode from our conversation def partial_fit_and_score(model, x_block, y_block, metadata):
model = model.clone()
model.partial_fit(x_block, y_block)
score = model.score(...)
metadata = metadata.copy()
metadata['model'] = model
metadata['score'] = score
metadata['i'] = metadata['i'] + 1
return metadata
def _hyperband(x, y, parameter_list, nr_list):
x_blocks = x.to_delayed()
y_blocks = y.to_delayed()
nr_buckets = {nr: {...} for nr in nr_list}
model_futures = [client.submit(partial_fit_and_score, model, x_block[0], y_block[0]) ....]
score_futures = [client.submit(get_metadata, model_future) for model_future in model_futures]
seq = as_completed(score_futures)
for future in seq:
metadata = future # {'nr': ..., 'score': ..., 'model-id': ...,}
nr = metadata['nr']
should_resubmit = update(nr_buckets[nr], metadata[...])
model_future = get_model_future_from_score_future(metadata)
i = get_xy_block_id(metadata)
if should_resubmit:
model_future = client.submit(partial_fit_and_score, model_future, x_block[i], y_block[i])
score_future = client.submit(get_metadata, model_future)
seq.add(score_future)
best = get_best_parameters(nr_buckets)
return best
class HyperBand(...):
def __init__(...):
...
def fit(self, x, y):
...
best = _hyperband(..., x, y, ...)
... |
Another pseudo-code that actually runs is at https://gist.github.com/stsievert/b2441010b855169f2a062cc1fa87eb42 |
Closing in favor of dask/dask-ml#221 |
This PR addresses dask/dask-ml#161 with the implementation of the adaptive model selection algorithm Hyperband. I describe the formulation of Hyperband in dask/dask-ml#161 (comment). Briefly, Hyperband treats hyperparameter optimization as a resource allocation problem: why spend time on poorly performing models if you want to find the best performing model? Part of Hyperband trains models for a few iterations then "kills" the worst performing models, and then loops again in this "train-kill" cycle until 1 model is left.
This algorithm is well-suited for Dask, as it is embarrassingly parallel. The main function behind Hyperband,
successive_halving
is run a couple times in a embarrassingly parallel for-loop, and each call tosuccessive_halving
callspartial_fit
on many models at once in the "train-kill" cycle.The main feature of Hyperband is that it only requires one input, the maximum number of times
partial_fit
is called on any one classifier. This value is (approximately) proportional to the time spent waiting for Hyperband to complete. With this computational budget that the user specifies, Hyperband has theory that says it will find the best model possible given this computational budget*.This simplifies the user-facing API. In Hyperband, the user can specify (approximately) the time required for the search. This is an advantage over grid or random search, where the user would have to find some balance between "evaluating many parameters for few calls" or "evaluating few parameters for many calls". This is mentioned in the scikit-learn RandomizedSearchCV documentation:
This tradeoff is not present in Hyperband.
This is important for models that require lots of computation. The implementation of this algorithm would have saved me a couple months of research effort, but I couldn't use sklearn. As such, I have tested it to make sure it works with a simple class that only requires 4 functions,
{get, set}_params
,partial_fit
andscore
.I opened this PR instead of continuing with #71 to give a better explanation of Hyperband, and because the user API is now usable, and tested.
TODO:
support pipelines.(future work: addpartial_fit
param to sklearn pipeline)support scoring keyword arg(relegated to [WIP] API: allow cross validation to work with partial_fit scikit-learn/scikit-learn#11266)support K-fold cross validation(relegated to [WIP] API: allow cross validation to work with partial_fit scikit-learn/scikit-learn#11266)All of the TODOs come for free if GridSearchCV is used which essentially all I'm doing (evaluating a specific parameter is a grid with each list having length one). However this will require another modification, which boils down to defining a
partial_fit
function inmethods.py
and making sure it's called from the right place.I do not think multimetric scoring is relevant here because this is an adaptive algorithm that has to decide quantitatively which model is better. We should leave it up to the user to define their
model.score
to determine which model is better.* More precisely, with high probability the expected loss achieved by the produced model will be "close" to the best loss that could have been found with this computational budget. "close" means "within log factors"