Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new class called StudyJoblib #5301

Open
cheginit opened this issue Mar 10, 2024 · 1 comment
Open

Add a new class called StudyJoblib #5301

cheginit opened this issue Mar 10, 2024 · 1 comment
Labels
feature Change that does not break compatibility, but affects the public interfaces.

Comments

@cheginit
Copy link

cheginit commented Mar 10, 2024

Motivation

The current parallelization with Python's built-in ThreadPool is very limited. By adding a new class called StudyJoblib and using Study as the base-class, we can just replace the .optimize and use joblib instead. We can then add a new arg to create_study called, for example, use_joblib that will create a StudyJoblib instead of Study. My implementation does not add joblib as a new dep, rather it's a soft dep that if use_joblib=True, will check if it's installed and throws an exception if not installed.

I can submit a PR if interested.

Description

I have already implemented this and tested it for my use-case. It works nicely and runs much faster than the default method. This is what I have:

class StudyJoblib(Study):
    def __init__(
        self,
        study_name: str,
        storage: str | storages.BaseStorage,
        sampler: BaseSampler | None = None,
        pruner: pruners.BasePruner | None = None,
    ) -> None:
        super().__init__(study_name, storage, sampler, pruner)

    def optimize(
        self,
        func: ObjectiveFuncType,
        n_trials: int | None = None,
        timeout: float | None = None,
        n_jobs: int = 1,
        catch: Iterable[type[Exception]] | type[Exception] = (),
        callbacks: list[Callable[["Study", FrozenTrial], None]] | None = None,
        gc_after_trial: bool = False,
        show_progress_bar: bool = False,
    ) -> None:
        """Optimize an objective function."""
        if joblib is None:
            raise ImportError(
                "Please install joblib to use `Study.optimize` method with `use_joblib=True`. "
                "You can install joblib with `pip install joblib`."
            )
        if not isinstance(catch, tuple):
            raise TypeError(
                "The catch argument is of type '{}' but must be a tuple.".format(type(catch).__name__)
            )

        if n_trials is None:
            raise ValueError("The n_trials argument must be specified.")

        if self._thread_local.in_optimize_loop:
            raise RuntimeError("Nested invocation of `Study.optimize` method isn't allowed.")

        self._stop_flag = False

        if not isinstance(n_jobs, int):
            raise ValueError("The n_jobs argument must be an integer.")

        with joblib.Parallel(n_jobs=n_jobs, require="sharedmem") as parllel:
            parllel(
                joblib.delayed(_optimize_sequential)(
                    self,
                    func,
                    1,
                    timeout,
                    catch,
                    callbacks,
                    gc_after_trial,
                    True,
                    None,
                    None,
                ) for _ in range(n_trials)
            )

def create_study(
    *,
    storage: str | storages.BaseStorage | None = None,
    sampler: BaseSampler | None = None,
    pruner: pruners.BasePruner | None = None,
    study_name: str | None = None,
    direction: str | StudyDirection | None = None,
    load_if_exists: bool = False,
    directions: Sequence[str | StudyDirection] | None = None,
    use_joblib: bool = False,
) -> Study | StudyJoblib:
    """Create a new :class:`~optuna.study.Study` or :class:`~optuna.study.StudyJoblib`."""
    if direction is None and directions is None:
        directions = ["minimize"]
    elif direction is not None and directions is not None:
        raise ValueError("Specify only one of `direction` and `directions`.")
    elif direction is not None:
        directions = [direction]
    elif directions is not None:
        directions = list(directions)
    else:
        assert False

    if len(directions) < 1:
        raise ValueError("The number of objectives must be greater than 0.")
    elif any(
        d not in ["minimize", "maximize", StudyDirection.MINIMIZE, StudyDirection.MAXIMIZE]
        for d in directions
    ):
        raise ValueError(
            "Please set either 'minimize' or 'maximize' to direction. You can also set the "
            "corresponding `StudyDirection` member."
        )

    direction_objects = [
        d if isinstance(d, StudyDirection) else StudyDirection[d.upper()] for d in directions
    ]

    storage = storages.get_storage(storage)
    try:
        study_id = storage.create_new_study(direction_objects, study_name)
    except DuplicatedStudyError:
        if load_if_exists:
            assert study_name is not None

            logger = optuna.logging.get_logger("optuna")

            logger.info(
                "Using an existing study with name '{}' instead of "
                "creating a new one.".format(study_name)
            )
            study_id = storage.get_study_id_from_name(study_name)
        else:
            raise

    if sampler is None and len(direction_objects) > 1:
        sampler = NSGAIISampler()

    study_name = storage.get_study_name_from_id(study_id)
    if use_joblib:
        if importlib.util.find_spec("joblib") is None:
            raise ImportError(
                "Please install joblib to use `use_joblib=True`. "
                "You can install joblib with `pip install joblib`."
            )
        return StudyJoblib(study_name=study_name, storage=storage, sampler=sampler, pruner=pruner)

    return Study(study_name=study_name, storage=storage, sampler=sampler, pruner=pruner)

Alternatives (optional)

No response

Additional context (optional)

No response

@cheginit cheginit added the feature Change that does not break compatibility, but affects the public interfaces. label Mar 10, 2024
@cheginit
Copy link
Author

cheginit commented Mar 10, 2024

Never mind, I came up with a more elegant solution for using joblib. I'm posting the code here for people who may want to use this approach. I can also add this as an example to the Optuna examples repo, if interested:

import contextlib
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, cast

import optuna
from optuna.exceptions import DuplicatedStudyError, ExperimentalWarning
from optuna.pruners import BasePruner, HyperbandPruner
from optuna.samplers import BaseSampler, TPESampler
from optuna.storages import JournalFileStorage, JournalStorage
from optuna.study import MaxTrialsCallback, Study
from optuna.trial import Trial, TrialState


@dataclass
class StudyConfig:
    study_name: str
    sampler: BaseSampler
    pruner: BasePruner
    directions: list[Literal["minimize", "maximize"]]
    storage: JournalStorage
    n_trials: int
    n_cores: int = 1
    log_path: Path = Path("optuna_journal.log")
    study_path: Path = Path("optuna_study.pkl")

    @property
    def study_args(self) -> dict[str, Any]:
        return {
            "study_name": self.study_name,
            "sampler": self.sampler,
            "pruner": self.pruner,
            "directions": self.directions,
            "storage": self.storage,
        }


def objective(trial: Trial) -> float:
    x = trial.suggest_float("x", -100, 100)
    y = trial.suggest_categorical("y", [-1, 0, 1])
    return x**2 + y


def optimize(study_cfg: StudyConfig, worker_id: int) -> None:
    study = optuna.create_study(**study_cfg.study_args, load_if_exists=True)
    n_trials = study_cfg.n_trials // study_cfg.n_cores
    n_trials += study_cfg.n_cores - (study_cfg.n_trials % study_cfg.n_cores)
    study.optimize(
        objective,
        n_trials=n_trials,
        callbacks=[MaxTrialsCallback(study_cfg.n_trials, states=(TrialState.COMPLETE,))],
    )
    if worker_id == 0:
        with study_cfg.study_path.open("wb") as f:
            pickle.dump(study, f)


n_trials = 6000
n_cores = 12
log_path = Path("optuna_journal.log")
log_path.unlink(missing_ok=True)
Path(f"{log_path}.lock").unlink(missing_ok=True)
study_path = Path("optuna_study.pkl")
study_path.unlink(missing_ok=True)

with warnings.catch_warnings():
    warnings.simplefilter("ignore", ExperimentalWarning)
    study_cfg = StudyConfig(
        "test",
        TPESampler(seed=42),
        HyperbandPruner(),
        ["minimize"],
        JournalStorage(JournalFileStorage(str(log_path))),
        n_trials,
        n_cores,
        log_path,
        study_path,
    )
    with contextlib.suppress(DuplicatedStudyError):
        _ = optuna.create_study(**study_cfg.study_args)
while study_cfg.n_trials >= min(100, study_cfg.n_trials):
    try:
        _ = joblib.Parallel(n_jobs=n_cores)(
            joblib.delayed(optimize)(study_cfg, i) for i in range(n_cores)
        )
    except Exception:
        Path(f"{log_path}.lock").unlink(missing_ok=True)
        study_cfg.n_trials //= 2
    else:
        break
with study_cfg.study_path.open("rb") as f:
    study = cast("Study", pickle.load(f))
best_params = study.best_trial.params
best_params

Note that, sometimes, depending on the number of trials, it fails with Error: did not possess lock, that's why I added that while-loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Change that does not break compatibility, but affects the public interfaces.
Projects
None yet
Development

No branches or pull requests

1 participant