Skip to content
This repository has been archived by the owner on Oct 14, 2018. It is now read-only.

ENH: Hyperband implementation #72

Closed
wants to merge 43 commits into from
Closed

Conversation

stsievert
Copy link
Member

@stsievert stsievert commented Jun 3, 2018

This PR addresses dask/dask-ml#161 with the implementation of the adaptive model selection algorithm Hyperband. I describe the formulation of Hyperband in dask/dask-ml#161 (comment). Briefly, Hyperband treats hyperparameter optimization as a resource allocation problem: why spend time on poorly performing models if you want to find the best performing model? Part of Hyperband trains models for a few iterations then "kills" the worst performing models, and then loops again in this "train-kill" cycle until 1 model is left.

This algorithm is well-suited for Dask, as it is embarrassingly parallel. The main function behind Hyperband, successive_halving is run a couple times in a embarrassingly parallel for-loop, and each call to successive_halving calls partial_fit on many models at once in the "train-kill" cycle.

The main feature of Hyperband is that it only requires one input, the maximum number of times partial_fit is called on any one classifier. This value is (approximately) proportional to the time spent waiting for Hyperband to complete. With this computational budget that the user specifies, Hyperband has theory that says it will find the best model possible given this computational budget*.

This simplifies the user-facing API. In Hyperband, the user can specify (approximately) the time required for the search. This is an advantage over grid or random search, where the user would have to find some balance between "evaluating many parameters for few calls" or "evaluating few parameters for many calls". This is mentioned in the scikit-learn RandomizedSearchCV documentation:

n_iter : int, default=10. Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution.

This tradeoff is not present in Hyperband.

This is important for models that require lots of computation. The implementation of this algorithm would have saved me a couple months of research effort, but I couldn't use sklearn. As such, I have tested it to make sure it works with a simple class that only requires 4 functions, {get, set}_params, partial_fit and score.

I opened this PR instead of continuing with #71 to give a better explanation of Hyperband, and because the user API is now usable, and tested.

TODO:

All of the TODOs come for free if GridSearchCV is used which essentially all I'm doing (evaluating a specific parameter is a grid with each list having length one). However this will require another modification, which boils down to defining a partial_fit function in methods.py and making sure it's called from the right place.

I do not think multimetric scoring is relevant here because this is an adaptive algorithm that has to decide quantitatively which model is better. We should leave it up to the user to define their model.score to determine which model is better.

* More precisely, with high probability the expected loss achieved by the produced model will be "close" to the best loss that could have been found with this computational budget. "close" means "within log factors"

- there are some circular imports with dask_ml.model_selection.train_test_split
- This implementaiton can be simplified if dask_searchcv supports customizing
  the methods.train function. This will enable me to use a GridSearchCV
  inside SuccessiveHalving. This will allow support for all the other
  keywords arguments in DaskBaseSearchCV
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting to see this come together!

I did a first pass through. Will have more thoughts later.

Do you have docs / a notebook started that uses this?

def _train(model, X, y, X_val, y_val, max_iter=1, dry_run=False, classes=None,
s=1, i=1, k='_', verbose=False, **fit_kwargs):
"""
Train function. Returns validation loss.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of our projects will use (variations on) the numpydoc format: http://numpydoc.readthedocs.io/en/latest/format.html. So this would be formatted as

Helper to train the model on the provided data.

Parameters
-----------
model : Estimator
X : array-like, [n_samples, n_features]
y : array-like, [n_samples,]
...

Returns
--------
validation_score : float
    The model's score on the validation set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to be careful with loss vs. score. In scikit-learn's terminology. For scores, higher values are better than lower values. For losses, lower values are better. http://scikit-learn.org/stable/modules/model_evaluation.html

if verbose:
msg = ("Training model {k} in bracket s={s} iteration {i}. "
"This model is {percent:.1f}% trained for this bracket")
print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to use the logging module, as it's more useful in a distributed setting (print statements may be hard / difficult to access if they're on a different machine).

At the top of the module, you can use logger = logging.getLogger(__name__) to get or create a new logger with the name dask_search_cv.adaptive. Then when you actually want to see the log, you can do an annoying

logger = logging.getLogger('dask_searchcv.adaptive')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)

There's probably a better way to integrate with distributed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm running into an issue, it's not logging on this line in particular (though it's logging on other lines). I'll try to formulate a bug report.

msg = ("Training model {k} in bracket s={s} iteration {i}. "
"This model is {percent:.1f}% trained for this bracket")
print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter))
if not dry_run:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: what is dry_run for? Seems to be just for testing. Let's see if we can refactor the implementation / tests to make dry_run unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dry_run is for Hyperband.info, which runs Hyperband without calling model.fit. info is a useful metric to see how many calls to partial_fit will be made.

print(msg.format(iter=iter, k=k, s=s, i=i, percent=iter * 100.0 / max_iter))
if not dry_run:
_ = model.partial_fit(X, y, classes, **fit_kwargs)
fit_time = default_timer() - start_time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good thing to log: dask/dask-ml#193

start_time = default_timer()
score = model.score(X_val, y_val) if not dry_run else np.random.rand()
score_time = default_timer() - start_time
return score, {'score_time': score_time, 'fit_time': fit_time}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the docs to reflect this return value.

def _successive_halving(params, model, n=None, r=None, s=None, verbose=False,
shared=None, eta=3, _prefix='', dry_run=False, **fit_kwargs):
client = _get_client()
data = client.get_dataset('_dask_data')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best to pass the data / a future for the data to the function, rather than publishing / unpublishing.

How aggressive to be while search. The default is 3, and theory
suggests that ``eta=e=2.718...``. Changing this is not recommended.

run_in_parallel : bool, optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this n_jobs and have the default be -1. Then run_in_parallel = n_jobs == -1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing some support for n_jobs in the distributed Client, but I'll leave that for a future PR. Right now n_jobs has to be either 0 or -1.

optimal config from ``params`` given this computational effort
``max_iter``.

.. _Hyperband model selection algorithm: https://arxiv.org/abs/1603.06560
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def fit(self, X, y, dry_run=False, verbose=False, **fit_kwargs):
"""
This function implicitly assumes that higher scores are better.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These docs would be better to have in the class docstring, since they're describing the parameters like model, which were passed above.

Also, you can safely assume a few things, since they're part of the scikit-learn API

  • higher scores are better
  • the signature of model.set_params
  • sklearn.base.clone will work.

It's good to call out partial_fit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some words on why I included these lines in the docs. I see this as being useful for other classes that do not conform to sklearn.BaseEstimator classes.

I've raised a ValueError if the estimator doesn't have partial_fit, included a helpful error message and tested for it.

client = _get_client()

variables = {'val_score': -np.inf, 'model': None, 'config': None,
'classes': np.unique(y).tolist(), 'eta': self.eta,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding classes, Hyperband will work with regression as well, right? In that case, we don't want to unique.

Since unique is maybe expensive (even for categorical y), we'll want to require the user provide it. With e.g. sklearn.linear_model.SGDClassifier, you're required to pass classes to the first partial_fit call.

We have two options for where the user should provide the required fit kwargs

  1. As a fit_kwargs to argument to Hyperband itself
  2. As keyword arguments in .fit that are passed through

I'll do some research on which is preferable. Perhaps @ogrisel has thoughts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've stripped unique out of the implementation. Right now, I have Hyperband.fit(X, y, **fit_kwargs). This is what sklearn GridSearchCV.fit does, and passing fit_kwargs to __init__ is deprecated.

@stsievert
Copy link
Member Author

Thanks for the review, @TomAugspurger. It was useful. I've updated most of the comments, but am still debugging.

Here's a gist of my example notebook: https://gist.github.com/stsievert/38780f0c746b2f1c8eb32feea4dd14de, with a simple test function.

Right now I'm facing some memory issues, and issues with n_jobs == -1. I need to resolve these. I'll probably do this with joblib and some nested parallelism, which will require dask/distributed#1705.

@TomAugspurger
Copy link
Member

👍 I'll take a look tonight or tomorrow. I've lost my day to debugging strange distributed errors that I think are my fault :)

FYI, LMK if you need help getting a functional version of scikit-learn master + joblib master. It takes a bit of work. Or I can scale up the cluster again.

@stsievert
Copy link
Member Author

Thanks @TomAugspurger! And no rush, I'm taking tomorrow and Thursday off, and have other things on my plate.

@stsievert
Copy link
Member Author

This is ready for review no @TomAugspurger Some notes:

  • I would like to log inside a function called with client.submit at adaptive.py#L56. Have you handled this situation before?
  • I'm running into some testing difficulty. All tests pass individually* with n_jobs=0, and the example notebook runs fine even with n_jobs=-1. However, they fail with py.test test_adaptive.py. I'd like to improve this, and think _get_client is the issue. What tools do you have for this? I've tried with _get_client() but that doesn't help.

*: tested with pytest -k "{partial-function-name}"

@TomAugspurger
Copy link
Member

Will take a quick pass now.

I would like to log inside a function called with client.submit

Did you write the function that's being submitted? If so, can you write the logging inside the function, so that it logs when running?

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still going through it. Trying things out locally.

from sklearn.linear_model import Lasso


class TestFunction:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this is just a helper, the test name shouldn't start with Test as then it gets discovered by test runners (they typically pick up tests named Test* and functions named test_*.

return self._fn()


def test_hyperband_sklearn():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http://distributed.readthedocs.io/en/latest/develop.html#writing-tests may be helpful here.

Async / distributed tests are tricky... I've found the second version, with cluster(), to be the easiest to work with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would remove your _get_client() call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still running into testing issues, even after trying with cluster() and gen_cluster(client=True). I run into issues even mirroring distributed/tests/test_client.py#L4717. I think I've resolved all the systems level issues, and will check next week.

Any tips? I've pushed some intermingled gen_cluster and with cluster() code in 22c86e4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try this out locally to? I've also had to resolve some systems level issues.

def test_hyperband_sklearn():
client = _get_client()
X, y = make_classification(n_samples=1000, chunks=500)
classes = da.unique(y)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always be [0, 1]. It's controlled by the n_classes argument to make_classification, and currently only 2 is supported.

client = _get_client()
X, y = make_classification(n_samples=1000, chunks=500)
classes = da.unique(y)
model = PartialSGDClassifier(warm_start=True, classes=classes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartialSGDClassifier is being deprecated in favor of Incremental(SGDClassifier()).

Hopefully this will work for you (if not, let me know). It'll require passing through fit_kwargs like

alg.fit(X, y, classes=[0, 1]

And you'll need to ensure that classes gets through to Incremental.fit, which passes it through to SGDClassifier.partial_fit.

alg.fit(X, y, dry_run=True)
assert len(alg.history) == 40

alg.fit(X, y, classes=classes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fit call calls compute internally, which blocks. Ideally the following would work:

delayed_object = alg.fit(X, y, classes=classes, compute=False)
future = c.compute(delayed_object)
result = yield future


# @_with_client
@gen_cluster(client=True)
def test_hyperband_sklearn(*args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doepending on the options to gen_cluster, the test is called with the following

  • client
  • scheduler
  • list of workers,

so the typical function signature would be


@gen_cluster(client=True)
def test_hyperband_sklearn(c, s, a, b): 
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, (I think) gen_cluster tests require essentially everything to be async. After import sklearn.linear_model.SGDClassifier, this fails with

../distributed/distributed/utils_test.py:808:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/ioloop.py:582: in run_sync
    return future_cell[0].result()
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:1113: in run
    yielded = self.gen.send(value)
../distributed/distributed/utils_test.py:789: in coro
    result = yield future
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:1099: in run
    value = future.result()
../../Envs/dask-dev/lib/python3.6/site-packages/tornado/gen.py:296: in wrapper
    result = func(*args, **kwargs)
/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/types.py:248: in wrapped
    coro = func(*args, **kwargs)
dask_searchcv/tests/test_adaptive.py:50: in test_hyperband_sklearn
    X, y = make_classification(n_samples=1000, chunks=500)
../dask-ml/dask_ml/datasets.py:332: in make_classification
    informative_idx, beta = dask.compute(informative_idx, beta)
../dask/dask/base.py:401: in compute
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

.0 = <zip object at 0x116259a08>

>   return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
E   AssertionError: yield from wasn't used with future

The problem is the line informative_idx, beta = dask.compute(informative_idx, beta) coming from dask_ml.datasets.

So I'd recommend trying a with cluster style test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for this diff

diff --git a/dask_searchcv/tests/test_adaptive.py b/dask_searchcv/tests/test_adaptive.py
index 199c5a1..2fbe5e8 100644
--- a/dask_searchcv/tests/test_adaptive.py
+++ b/dask_searchcv/tests/test_adaptive.py
@@ -1,18 +1,15 @@
 from dask_searchcv.adaptive import _top_k, Hyperband
-from dask_ml.linear_model import PartialSGDClassifier
+from sklearn.linear_model import SGDClassifier
 from dask_ml.datasets import make_classification
 import numpy as np
 import dask.array as da
 from dask.distributed import Client
 import pandas as pd
 import pytest
-import distributed
 from pprint import pprint
 import scipy.stats as stats
-import random
 from sklearn.linear_model import Lasso
 from distributed.utils_test import cluster, loop, gen_cluster
-import time
 from dask_ml.wrappers import Incremental
 
 
@@ -45,23 +42,24 @@ def _with_client(fn):
 
 
 #  @_with_client
-@gen_cluster(client=True)
-def test_hyperband_sklearn(*args, **kwargs):
-    X, y = make_classification(n_samples=1000, chunks=500)
-    classes = da.unique(y)
-    model = Incremental(SGDClassifier(warm_start=True, classes=classes,
-                                      loss='hinge', penalty='elasticnet'))
-
-    params = {'alpha': np.logspace(-3, 0, num=int(10e3)),
-              'l1_ratio': np.linspace(0, 1, num=int(10e3))}
-    alg = Hyperband(model, params, max_iter=9, n_jobs=0)
-
-    alg.fit(X, y, dry_run=True)
-    assert len(alg.history) == 20
-    alg.fit(X, y, dry_run=True)
-    assert len(alg.history) == 40
-
-    alg.fit(X, y, classes=classes)
+def test_hyperband_sklearn():
+    with cluster() as (s, (a, b)):
+        client = Client(s['address'])  # noqa
+        X, y = make_classification(n_samples=1000, chunks=500)
+        classes = da.unique(y)
+        model = Incremental(SGDClassifier(warm_start=True,
+                                          loss='hinge', penalty='elasticnet'))
+
+        params = {'alpha': np.logspace(-3, 0, num=int(10e3)),
+                  'l1_ratio': np.linspace(0, 1, num=int(10e3))}
+        alg = Hyperband(model, params, max_iter=9, n_jobs=0)
+
+        alg.fit(X, y, dry_run=True, classes=[0, 1])
+        assert len(alg.history) == 20
+        alg.fit(X, y, dry_run=True, classes=[0, 1])
+        assert len(alg.history) == 40
+
+        alg.fit(X, y, classes=classes)
 
 
 #  @_with_client

That's running, but taking a while. Not sure if it's a long test, or if it's deadlocked somewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm resolving some of these issues now.

@mrocklin
Copy link
Member

mrocklin commented Jun 19, 2018 via email

@stsievert
Copy link
Member Author

Should I create a minimal working example?

@TomAugspurger
Copy link
Member

What's the error? I'm debugging an Incremental / distributed issue, and don't want to duplicate efforts.

# results = dask.compute(delayed_results)[0]

val_scores = {k: r[0] for k, r in results.items()}
times += [{'id': k, **r[1]} for k, r in results.items()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is invalid syntax.

Copy link
Member Author

@stsievert stsievert Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I picked it up at the same time, and it's fixed in the most recent commit.

There's a bigger issue for the failing tests – dask-searchcv doesn't depend on dask-ml. I think this issue should be resolved soon, for some other reasons.

@stsievert
Copy link
Member Author

stsievert commented Jun 20, 2018

What's the error? I'm debugging an Incremental / distributed issue, and don't want to duplicate efforts.

The discussion around the Incremental+distributed issue is at #72 (comment). I only provide a traceback and haven't spent time debugging or crafting a minimal working example.

@mrocklin
Copy link
Member

Currently it looks like we're calling compute within _successive_halving which is itself delayed and called within compute. This is suboptimal. Instead, it would generally be cleaner if we constructed as much of the computation as we can with dask.delayed and then called compute once, probably after the many calls to _successive_halving. How feasible is this? Currently it looks like _successive_halving does a bit of computation, and then a bit of administrative work. Should that administrative work be delayed? Or should we pull that administrative work outside of the _successive_halving function and do it after the final compute call?

To start off I think it might simplify things if we not special case any code for n_jobs==1 and instead use dask.delayed everywhere. Hopefully we can just set scheduler='single-threaded' in our final compute call when we're ready to achieve n_jobs==1 if that's important.

@mrocklin
Copy link
Member

My apologies for not noticing this sooner. I only realized that this was happening while running the example notebook while trying to find out what has happening with the confusing error that @stsievert was getting.

@mrocklin
Copy link
Member

I also think that flattening things down so that we only call compute once in between epochs, rather than calling compute within compute, will resolve any such confusing errors.

@mrocklin
Copy link
Member

@stsievert let me know if the above makes sense or not. I'd be happy to jump on a call and go through things if that would be helpful.

@stsievert
Copy link
Member Author

Thanks for noticing this @mrocklin. I think the administrative work you're mentioning is the part of successive halving that's adaptive. The models are sorted in each iteration to _successive_halving (in the call to _top_k), and the best performing models are selected. This is the reason I call compute within _successive_halving.

Here's a simplified version of what's happening:

import numpy as np
import dask

def _successive_halving():
    v1 = dask.delayed(np.random.rand)()  # model 1's score
    v2 = dask.delayed(np.random.rand)()  # model 2's score
    
    if v1.compute() > v2.compute():
        return 1
    return 0

if __name__ == "__main__":
    # what we're doing now
    delayed_results = [dask.delayed(_successive_halving() for _ in range(5)]
    results = [r.compute() for f in delayed_results]
    
    # what I think we should do (well, with joblib)
    # results = [_successive_halving() for _ in range(5)]

This is what what's happening now. I think we should uncomment the last lines and use joblib for the outer-loop. How does that sound?

@mrocklin
Copy link
Member

There are a variety of reasons why it's nicer to create a single large graph if possible, I'm happy to expand on these if desired. Calling Dask from within Dask is more complex for a variety of reasons, and is best avoided if possible, especially within library code, which has a higher standard of simplicity.

Technology options in Dask are ordered something like the following:

  1. dask.delayed or other collections alone that construct and then run a single task graph. Runs everywhere, is the easiest for users to deal with, but doesn't allow for any alteration of computation between compute calls
  2. Normal concurrent.futures. Requires the distributed scheduler, but allows for submission of tasks during computation, typically using as_completed to control all of the in-flight futures.
  3. Something like Joblib, which hands control off to some other library
  4. Submitting tasks from tasks, where workers can create clients that themselves create tasks. This code is easy to write, but very difficult to debug and follow. It is best avoided if possible.

Ideally most things we write will be towards the simpler end of this list if possible. Currently we're at item four, but would like to be at 1 or maybe 2. Generally the decision between 1 and 2 is "Do I absolutely need to control the computation between calls to compute?" Now I understand that your algorithm is adaptive, but you also call compute often.

@mrocklin
Copy link
Member

Anyway, I suggest that we check in at some point today

@stsievert
Copy link
Member Author

stsievert commented Jun 20, 2018

Yeah, we should check in. I'm free all day. Let me know when works.

Thanks for the nice summary of the options to use with Dask. I suggested using joblib not to depend on dask.distributed, but when you put it that way I'm inclined towards option 2. This is less complicated than joblib + dask.delayed, plus it will still use all the workers.

I think we should also discuss where to merge this in. I have some dependence on dask-ml's train_test_split and dask_ml.metrics.

@TomAugspurger
Copy link
Member

TomAugspurger commented Jun 20, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Jun 20, 2018 via email

@mrocklin
Copy link
Member

Here is some very loose pseudocode from our conversation

def partial_fit_and_score(model, x_block, y_block, metadata):
    model = model.clone()
    model.partial_fit(x_block, y_block)
    score = model.score(...)
    metadata = metadata.copy()
    metadata['model'] = model
    metadata['score'] = score
    metadata['i'] = metadata['i'] + 1
    return metadata

def _hyperband(x, y, parameter_list, nr_list):
    x_blocks = x.to_delayed()
    y_blocks = y.to_delayed()

    nr_buckets = {nr: {...} for nr in nr_list}

    model_futures = [client.submit(partial_fit_and_score, model, x_block[0], y_block[0]) ....]
    score_futures = [client.submit(get_metadata, model_future) for model_future in model_futures]

    seq = as_completed(score_futures)

    for future in seq:
        metadata = future  # {'nr': ..., 'score': ..., 'model-id': ...,}

        nr = metadata['nr']
        should_resubmit = update(nr_buckets[nr], metadata[...])

        model_future = get_model_future_from_score_future(metadata)
        i = get_xy_block_id(metadata)

        if should_resubmit:
            model_future = client.submit(partial_fit_and_score, model_future, x_block[i], y_block[i])
            score_future = client.submit(get_metadata, model_future)
            seq.add(score_future)

    best = get_best_parameters(nr_buckets)
    return best


class HyperBand(...):
    def __init__(...):
        ...

    def fit(self, x, y):
        ...
        best = _hyperband(..., x, y, ...)
        ...

@stsievert
Copy link
Member Author

stsievert commented Jun 21, 2018

Another pseudo-code that actually runs is at https://gist.github.com/stsievert/b2441010b855169f2a062cc1fa87eb42

@stsievert
Copy link
Member Author

Closing in favor of dask/dask-ml#221

@stsievert stsievert closed this Jun 21, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants