Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when running model.learn on a SubprocVecEnv #1814

Open
5 tasks done
1-Bart-1 opened this issue Jan 24, 2024 · 4 comments
Open
5 tasks done

Deadlock when running model.learn on a SubprocVecEnv #1814

1-Bart-1 opened this issue Jan 24, 2024 · 4 comments
Labels
check the checklist You have checked the required items in the checklist but you didn't do what is written... custom gym env Issue related to Custom Gym Env

Comments

@1-Bart-1
Copy link

🐛 Bug

When running model.learn on a SubprocVecEnv as follows:

env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn")) model = SAC(**kwargs, env=env) model.learn(N_TIMESTEPS, callback=eval_callback)

the program ends up in a deadlock. This is likely because I am using a custom environment which is running julia code by using juliacall. The solution is to change CloudpickleWrapper to a DillWrapper as follows:

in stable_baselines3.common.vec_env.base_vec_env.py
`
import dill
class DillWrapper:
def init(self, var: Any):
self.var = var

def __getstate__(self) -> Any:
    return dill.dumps(self.var)

def __setstate__(self, var: Any) -> None:
    self.var = dill.loads(var)

`

And use DillWrapper instead of CloudpickleWrapper in stable_baselines3.common.vec_env.subproc_vec_env.py
By doing this, the problem seems to be less often, but not completely solved.

I am running the code with the following command:
apptainer exec
--env PYTHON_JULIACALL_SYSIMAGE=/cluster/home/bartva/BOAT/Simulation/Kite/trainer/.JlEnv.so
--bind /cluster/work/bartva
--nv
.kite-app.sif python -u BOAT/Simulation/Kite/trainer/hyperparam-tuning.py
--trials 500
--startup_trials 5
--evaluations 5
--steps 200000
--episodes 10
--cpus 0
--verbose 2 \

Code example

KiteEnv.py:

import os
from shutil import copy, rmtree
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import random
import yaml
from uuid import uuid4
from .utils import get_args

args = get_args()

curdir = os.path.dirname(__file__)
path = os.path.join(curdir, "../.JlEnv.so")
main_data_dir = "/cluster/work/bartva/data"



class KiteEnv(gym.Env):
  
  def __init__(self):        
    self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)
            # range:
            #   depower: [0, 1], steering [-1, 1]
            # conversion: 
    self.observation_space = spaces.Box(low=-1, high=1, shape=(9,), dtype=np.float32)
            # [w_old, x_old, y_old, z_old, w, x, y, z, force]

    from juliacall import Main as jl  
    jl.seval("using Pkg")
    jl.Pkg.activate(os.path.join(curdir, "../Environment"))
    jl.seval("using Environment") # this is a custom package i built from Environment.jl
    
    self.Environment = jl.Environment
    
    self.data_dir = os.path.join(main_data_dir, str(uuid4()))
    os.makedirs(self.data_dir)
    copy(os.path.join(main_data_dir, "system.yaml"), 
                os.path.join(self.data_dir, "system.yaml"))
    copy(os.path.join(main_data_dir, "settings.yaml"), 
                os.path.join(self.data_dir, "settings.yaml"))
    print("Made directory: ", self.data_dir)
    self.Environment.set_data_path(self.data_dir)
   
    self.verbose = args.verbose
    self.no_random = args.no_random
    self.render_mode = None  

  def _get_real_action(self, action):
    np.squeeze(action)
    power = action[0] / 2 + 0.5
    steering = action[1]
    return np.array([power, steering])
  
  def _normalize_obs(self, observation):
    np.squeeze(observation)
    max_force = 4000
    normalized_force = 2*observation[8]/max_force - 1
    return np.append(observation[0:8], normalized_force)
  
  def reset(self, seed=None, options=None):
    super().reset(seed=seed)
    
    reset_complete = False
    reset_value = None
    while reset_complete == False:
      try:
        with open(os.path.join(main_data_dir, "settings.yaml"), 'r') as file:
          settings = yaml.safe_load(file)
          
        if (not self.no_random):
          settings['initial']['elevation'] = random.uniform(65, 70)
        
        with open(os.path.join(self.data_dir, "settings.yaml"), 'w') as file:
          yaml.safe_dump(settings, file)
          
        reset_value = self.Environment.reset()[:4]
        reset_complete = True
      except Exception as e:
        print(e)
        print(self.observation)
        reset_complete = False
    
    self.cumulative_force = 0.0
    self.observation = self._normalize_obs(
      np.append(
        reset_value[:4],
        self.Environment.get_next_step(0.5,0.0)
      )
    )
    # self.step_count = 0
    return np.array(self.observation, dtype=np.float32), {}

  def step(self, action):
    action = self._get_real_action(action)
    # self.step_count += 1
    # done = self.step_count >= self.steps_per_episode
    done = False
    
    reward = 0.0
    try:
      self.observation = self._normalize_obs(
        np.append(
        self.observation[4:8],
        self.Environment.get_next_step(float(action[0]), float(action[1]))
        )
      )
      force = self.observation[8]
      reward = force
    except Exception as e:
      if(self.verbose >= 3):
        print("ERROR")
        print(e)
        print(float(action[0]),  float(action[1]))
      done = True
      reward = -10
    
    return np.array(self.observation, dtype=np.float32), reward, done, False, {}
  
  def close(self):
    try:
      rmtree(self.data_dir)
      print("Removed directory: ", self.data_dir)
    except Exception as e:
      print(e)

hyperparam-tuning.py:

# from multiprocessing import set_start_method, get_start_method
import optuna
# from dask.distributed import Client
# from dask.distributed import wait
from typing import Any
from typing import Dict
import time
import psutil
import gymnasium as gym
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import EvalCallback

from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env

from Components.KiteEnv import KiteEnv
from Components import utils
from multiprocessing import cpu_count

args = utils.get_args()
if args.cpus == 0:
    cpus = cpu_count()
else:
    cpus = args.cpus
print("CPUs: ", cpus)

N_TRIALS = args.trials
N_STARTUP_TRIALS = args.startup_trials
N_EVALUATIONS = args.evaluations
N_TIMESTEPS = args.steps
EVAL_FREQ = int(N_TIMESTEPS / N_EVALUATIONS)
N_EVAL_EPISODES = args.episodes

ENV_ID = "KiteEnv-v0"
gym.envs.registration.register(
    id=ENV_ID,
    entry_point=KiteEnv,
)

def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]:
    """
    Sampler for SAC hyperparams.

    :param trial:
    :return:
    """
    gamma = trial.suggest_categorical("gamma", [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999])
    learning_rate = trial.suggest_float("learning_rate", 1e-6, 0.1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128, 256, 512, 1024, 2048])
    buffer_size = trial.suggest_categorical("buffer_size", [int(1e4), int(1e5), int(1e6)])
    learning_starts = trial.suggest_categorical("learning_starts", [0, 1000, 10000, 20000])
    # train_freq = trial.suggest_categorical('train_freq', [1, 10, 100, 300])
    train_freq = trial.suggest_categorical("train_freq", [1, 4, 8, 16, 32, 64, 128, 256, 512])
    # Polyak coeff
    tau = trial.suggest_categorical("tau", [0.001, 0.005, 0.01, 0.02, 0.05, 0.08])
    # gradient_steps takes too much time
    # gradient_steps = trial.suggest_categorical('gradient_steps', [1, 100, 300])
    gradient_steps = train_freq
    # ent_coef = trial.suggest_categorical('ent_coef', ['auto', 0.5, 0.1, 0.05, 0.01, 0.0001])
    ent_coef = "auto"
    # You can comment that out when not using gSDE
    log_std_init = trial.suggest_float("log_std_init", -4, 1)
    # NOTE: Add "verybig" to net_arch when tuning HER
    net_arch_type = trial.suggest_categorical("net_arch", ["small", "medium", "big"])
    # activation_fn = trial.suggest_categorical('activation_fn', [nn.Tanh, nn.ReLU, nn.ELU, nn.LeakyReLU])

    net_arch = {
        "small": [256, 256],
        "medium": [400, 300],
        "big": [500, 400, 300],
    }[net_arch_type]

    target_entropy = "auto"
    # if ent_coef == 'auto':
    #     # target_entropy = trial.suggest_categorical('target_entropy', ['auto', 5, 1, 0, -1, -5, -10, -20, -50])
    #     target_entropy = trial.suggest_float('target_entropy', -10, 10)

    return {
        "gamma": gamma,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "buffer_size": buffer_size,
        "learning_starts": learning_starts,
        "train_freq": train_freq,
        "gradient_steps": gradient_steps,
        "ent_coef": ent_coef,
        "tau": tau,
        "target_entropy": target_entropy,
        "policy_kwargs": dict(log_std_init=log_std_init, net_arch=net_arch),
    }

class TrialEvalCallback(EvalCallback):
    """Callback used for evaluating and reporting a trial."""

    def __init__(
        self,
        eval_env: gym.Env,
        trial: optuna.Trial,
        n_eval_episodes: int = 5,
        eval_freq: int = 10000,
        deterministic: bool = True,
        verbose: int = 0,
    ):
        super().__init__(
            eval_env=eval_env,
            n_eval_episodes=n_eval_episodes,
            eval_freq=eval_freq,
            deterministic=deterministic,
            verbose=verbose,
        )
        self.trial = trial
        self.eval_idx = 0
        self.is_pruned = False

    def _on_step(self) -> bool:
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            super()._on_step()
            self.eval_idx += 1
            self.trial.report(self.last_mean_reward, self.eval_idx)
            # Prune trial if need.
            if self.trial.should_prune():
                self.is_pruned = True
                return False
        return True


def objective(trial: optuna.Trial, env: gym.Env, eval_env: gym.Env) -> float:
    default_hyperparams = {"policy": "MlpPolicy", "env": env}
    kwargs = default_hyperparams.copy()
    kwargs.update(sample_sac_params(trial))    
    model = SAC(**kwargs, verbose=args.verbose)

    eval_callback = TrialEvalCallback(
        eval_env, trial, n_eval_episodes=N_EVAL_EPISODES, eval_freq=int(EVAL_FREQ/cpus), deterministic=True
    )

    nan_encountered = False
    try:
        print("Starting learn...")
        model.learn(N_TIMESTEPS, callback=eval_callback)
    except (AssertionError, ValueError) as e:
        # Sometimes, random hyperparams can generate NaN.
        if(args.verbose >= 2):
            print("ERROR")
            print(e)
        nan_encountered = True
        
    # Tell the optimizer that the trial failed.
    if nan_encountered:
        return float("nan")

    if eval_callback.is_pruned:
        raise optuna.exceptions.TrialPruned()

    return eval_callback.last_mean_reward

if __name__ == "__main__":
    # set_start_method("spawn", force=True)
    
    # print("Start_method: ", get_start_method())
    # gym.envs.registration.register(
    #     id=ENV_ID,
    #     entry_point=KiteEnv,
    # )
    try:
        start_time = time.time()
        sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)
        pruner = MedianPruner(n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=N_EVALUATIONS // 3)
        study = optuna.create_study(sampler=sampler, pruner=pruner, direction="maximize")
        
        env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn"))
        # env = SubprocVecEnv([make_env(ENV_ID, i) for i in range(cpus)], start_method="spawn")
        # eval_env = SubprocVecEnv([make_env(ENV_ID, i) for i in range(cpus)], start_method="spawn")
        eval_env = make_vec_env(ENV_ID, n_envs=cpus, vec_env_cls=SubprocVecEnv, vec_env_kwargs=dict(start_method="spawn"))
        try:
            print("Starting optimization...")
            study.optimize(lambda trial: objective(trial, env, eval_env), n_trials=N_TRIALS)
        except KeyboardInterrupt:
            pass

        print("Number of finished trials: ", len(study.trials))
        print("Best trial:")
        trial = study.best_trial
        print("  Value: ", trial.value)
        print("  Params: ")
        for key, value in trial.params.items():
            print("    {}: {}".format(key, value))
        print("  User attrs:")
        for key, value in trial.user_attrs.items():
            print("    {}: {}".format(key, value))
        print(f"Time used: {time.time()-start_time}")
    except Exception as e:
        print(e)
    finally:
        env.close()
        eval_env.close()

Environment.jl (Has to be built as a custom package!)

module Environment

using Timers; tic()
using KiteModels
using KiteUtils
# using PyCall #removed pycall!!
# using Plots


const Model = KPS4

set_data_path(joinpath(@__DIR__, "../../Simulator/data"))
kcu = KCU(se());
kps4 = Model(kcu);
dt = 1/se().sample_freq
steps = 1000
step = 0
logger = Logger(se().segments + 5, steps) 

GC.gc();
toc();

integrator = KiteModels.init_sim!(kps4, stiffness_factor=0.04);

function get_next_step(depower, steering)
    global step
    depower = Float32(depower)
    steering = Float32(steering)

    v_ro = 0.0

    if depower < 0.22; depower = 0.22; end
    set_depower_steering(kps4.kcu, depower, steering)

    t_sim = 0.0
    open("next_step_io.txt", "w") do io
        redirect_stdout(io) do
            t_sim = @elapsed KiteModels.next_step!(kps4, integrator, v_ro=v_ro, dt=dt)
        end
    end

    GC.gc(false)
    
    sys_state = SysState(kps4)
    step += 1

    return sys_state.orient[1], sys_state.orient[2], sys_state.orient[3], sys_state.orient[4], sys_state.force
end

function reset()
    global kcu
    global kps4
    global integrator
    global step
    global sys_state
    update_settings()
    save_log(logger)
    kcu = KCU(se());
    kps4 = Model(kcu);
    integrator = KiteModels.init_sim!(kps4, stiffness_factor=0.04)
    step = 0
    sys_state = SysState(kps4)
    GC.gc();
    return sys_state.orient[1], sys_state.orient[2], sys_state.orient[3], sys_state.orient[4], sys_state.force
end

function render()
    global sys_state, logger, step, steps
    if(step < steps)
        log!(logger, SysState(kps4))
    end
end


end

Relevant log output / Error message

After 1-2 trials, there is a deadlock.
When using the DillWrapper, the deadlock is after around 7 trials.
When using the "spawn" method and the DillWrapper, the problem seems to be solved.

System Info

I am running the code in an Ubuntu Apptainer on a High Performance Cluster:
https://www.hpc.ntnu.no/idun/

Checklist

@1-Bart-1 1-Bart-1 added the custom gym env Issue related to Custom Gym Env label Jan 24, 2024
@araffin araffin added the check the checklist You have checked the required items in the checklist but you didn't do what is written... label Jan 24, 2024
@araffin
Copy link
Member

araffin commented Jan 24, 2024

Hello,
thanks for the bug report and proposed solution.
I'm not sure if we can do much from the SB3 side as it seems to be a very specific usecase.

@1-Bart-1
Copy link
Author

1-Bart-1 commented Jan 24, 2024 via email

@araffin
Copy link
Member

araffin commented Jan 29, 2024

Could you please run python -c 'import stable_baselines3 as sb3; sb3.get_system_info()' and also give the version of dill that was tested?

@1-Bart-1
Copy link
Author

1-Bart-1 commented Feb 8, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
check the checklist You have checked the required items in the checklist but you didn't do what is written... custom gym env Issue related to Custom Gym Env
Projects
None yet
Development

No branches or pull requests

2 participants