Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Distributed Evaluation (#2775)
Browse files Browse the repository at this point in the history
* Distributed eval.

* More work on distributed_eval.

* Fix mp_eval. Support dumping parallel logs.

* Self feeding change should not have slipped in there.

* Update docstrings.

* Typos.
  • Loading branch information
stephenroller committed Jul 16, 2020
1 parent 20cc87d commit 1a65ad0
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 117 deletions.
51 changes: 51 additions & 0 deletions parlai/scripts/distributed_eval.py
@@ -0,0 +1,51 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""
Distributed evaluation script. NOT MEANT TO BE CALLED DIRECTLY BY USER.
This script is meant to be in conjunction with
`SLURM <https://slurm.schedmd.com/>`, which provides environmental variables
describing the environment.
An example sbatch script is below, for a 2-host, 8-GPU setup (16 total gpus):
.. code-block:: bash\n\n
#!/bin/sh
#SBATCH --job-name=distributed_example
#SBATCH --output=/path/to/savepoint/stdout.%j
#SBATCH --error=/path/to/savepoint/stderr.%j
#SBATCH --partition=priority
#SBATCH --nodes=2
#SBATCH --time=0:10:00
#SBATCH --signal=SIGINT
#SBATCH --gres=gpu:8
#SBATCH --ntasks-per-node=8
#SBATCH --mem=64G
#SBATCH --cpus-per-task=10
srun python -u -m parlai.scripts.distributed_eval \
-m seq2seq -t convai2 --dict-file /path/to/dict-file
"""

import os

import parlai.scripts.eval_model as eval_model
import parlai.utils.distributed as distributed_utils


def main():
parser = eval_model.setup_args()
parser.add_distributed_training_args()
parser.add_argument('--port', type=int, default=61337, help='TCP port number')
opt = parser.parse_args(print_args=(os.environ['SLURM_PROCID'] == '0'))

with distributed_utils.slurm_distributed_context(opt) as opt:
return eval_model.eval_model(opt)


if __name__ == '__main__':
main()
49 changes: 3 additions & 46 deletions parlai/scripts/distributed_train.py
Expand Up @@ -31,14 +31,9 @@
-m seq2seq -t convai2 --dict-file /path/to/dict-file
"""

import os
import socket
import subprocess

import parlai.scripts.train_model as single_train
import parlai.utils.logging as logging
from parlai.scripts.multiprocessing_train import multiprocess_train
from parlai.scripts.script import ParlaiScript
import parlai.utils.distributed as distributed_utils


def setup_args():
Expand All @@ -48,52 +43,14 @@ def setup_args():
return parser


def dist_train(opt, node_list):
# We can determine the init method automatically for Slurm.
try:
# Figure out the main host, and which rank we are.
hostnames = subprocess.check_output(
['scontrol', 'show', 'hostnames', node_list]
)
main_host = hostnames.split()[0].decode('utf-8')
distributed_rank = int(os.environ['SLURM_PROCID'])
if opt.get('model_parallel'):
# -1 signals to multiprocessing_train to use all GPUs available.
# (A value of None signals to multiprocessing_train to use the GPU
# corresponding to the rank.
device_id = -1
else:
device_id = int(os.environ['SLURM_LOCALID'])
port = opt['port']
logging.info(
f'Initializing host {socket.gethostname()} as rank {distributed_rank}, '
f'main is {main_host}'
)
# Begin distributed training
multiprocess_train(distributed_rank, opt, port, 0, device_id, main_host)
except subprocess.CalledProcessError as e:
# scontrol failed
raise e
except FileNotFoundError:
# Slurm is not installed
raise RuntimeError('SLURM does not appear to be installed.')


class DistributedTrain(ParlaiScript):
@classmethod
def setup_args(cls):
return setup_args()

def run(self):
# double check we're using SLURM
node_list = os.environ.get('SLURM_JOB_NODELIST')
if node_list is None:
raise RuntimeError(
'Does not appear to be in a SLURM environment. '
'You should not call this script directly; '
'see launch_distributed.py'
)
return dist_train(self.opt, node_list)
with distributed_utils.slurm_distributed_context(self.opt) as opt:
return single_train.TrainLoop(opt).train_model()


if __name__ == '__main__':
Expand Down
32 changes: 27 additions & 5 deletions parlai/scripts/eval_model.py
Expand Up @@ -20,7 +20,11 @@
from parlai.core.params import ParlaiParser, print_announcements
from parlai.core.agents import create_agent
from parlai.core.logs import TensorboardLogger
from parlai.core.metrics import aggregate_named_reports, Metric
from parlai.core.metrics import (
aggregate_named_reports,
aggregate_unnamed_reports,
Metric,
)
from parlai.core.worlds import create_task
from parlai.utils.misc import TimeLogger, nice_report
from parlai.utils.world_logging import WorldLogger
Expand All @@ -30,6 +34,13 @@
import json
import random

from parlai.utils.distributed import (
is_primary_worker,
all_gather_list,
is_distributed,
get_rank,
)


def setup_args(parser=None):
if parser is None:
Expand Down Expand Up @@ -85,6 +96,8 @@ def setup_args(parser=None):


def _save_eval_stats(opt, report):
if not is_primary_worker:
return
report_fname = opt['report_filename']
if report_fname == '':
return
Expand Down Expand Up @@ -122,6 +135,10 @@ def _eval_single_world(opt, agent, task):
# max number of examples to evaluate
max_cnt = opt['num_examples'] if opt['num_examples'] > 0 else float('inf')
cnt = 0
total_cnt = world.num_examples()

if is_distributed():
logging.warn('Progress bar is approximate in distributed mode.')

while not world.epoch_done() and cnt < max_cnt:
cnt += opt.get('batchsize', 1)
Expand All @@ -134,18 +151,22 @@ def _eval_single_world(opt, agent, task):
if log_time.time() > log_every_n_secs:
report = world.report()
text, report = log_time.log(
report.get('exs', 0), min(max_cnt, world.num_examples()), report
report.get('exs', 0), min(max_cnt, total_cnt), report
)
logging.info(text)

report = world.report()
report = aggregate_unnamed_reports(all_gather_list(world.report()))
world.reset()

if world_logger is not None:
# dump world acts to file
world_logger.reset() # add final acts to logs
base_outfile = opt['report_filename'].split('.')[0]
outfile = base_outfile + f'_{task}_replies.jsonl'
if is_distributed():
rank = get_rank()
outfile = base_outfile + f'_{task}_{rank}_replies.jsonl'
else:
outfile = base_outfile + f'_{task}_replies.jsonl'
world_logger.write(outfile, world, file_format=opt['save_format'])

return report
Expand Down Expand Up @@ -195,6 +216,7 @@ def eval_model(opt, print_parser=None):
logging.info(
f'Finished evaluating tasks {tasks} using datatype {opt.get("datatype")}'
)

print(nice_report(report))
_save_eval_stats(opt, report)
return report
Expand All @@ -206,7 +228,7 @@ def setup_args(cls):
return setup_args()

def run(self):
return eval_model(self.opt)
return eval_model(self.opt, print_parser=self.parser)


if __name__ == '__main__':
Expand Down
81 changes: 81 additions & 0 deletions parlai/scripts/multiprocessing_eval.py
@@ -0,0 +1,81 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


"""
Main launch script for single-host, multi-GPU evaluation.
This is a drop-in replacement for eval_model.py. This script will launch N
subprocess, each which runs the full eval loop independently.
Uses torch.nn.parallel.DistributedDataParallel for its main uses. Agents must
specifically implement the wrapper of DistributedDataParallel, but all
TorchRankerAgents and TorchGeneratorAgents support this.
"""

import torch
import random
import os
import signal
import parlai.utils.distributed as distributed_utils
import parlai.scripts.eval_model as eval_model


def multiprocess_eval(
rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost'
):
"""
Run a multiprocessing evaluation.
Invoked by launch_and_eval, not instantiated directly.
"""
with distributed_utils.distributed_context(
rank, opt, port, rank_offset, gpu, hostname
) as opt:
return eval_model.eval_model(opt)


def launch_and_eval(opt, port):
"""
Perform a fork() to many processes.
"""
# Launch multiple subprocesses
spawncontext = torch.multiprocessing.spawn(
multiprocess_eval,
# need to give rank offset as 1 to cover the fact that the main
# process is rank 0, but that spawn() doesn't let you control rank
(opt, port, 1),
nprocs=opt['distributed_world_size'] - 1, # main proc will also run loop
join=False,
)

try:
retval = multiprocess_eval(0, opt, port)
spawncontext.join()
return retval
except KeyboardInterrupt:
# tell the subprocesses to stop too
for p in spawncontext.processes:
if p.is_alive():
os.kill(p.pid, signal.SIGINT)
raise


def setup_args():
parser = eval_model.setup_args()
parser.add_distributed_training_args()
parser.set_defaults(distributed_world_size=torch.cuda.device_count())
return parser


def main():
opt = setup_args().parse_args()
port = random.randint(32000, 48000)
return launch_and_eval(opt, port)


if __name__ == '__main__':
main()
61 changes: 3 additions & 58 deletions parlai/scripts/multiprocessing_train.py
Expand Up @@ -18,74 +18,19 @@

import torch
import random
import copy
import os
import signal
import torch.distributed as dist
import parlai.scripts.train_model as single_train
import parlai.utils.distributed as distributed_utils
import parlai.utils.logging as logging
from parlai.scripts.script import ParlaiScript


def multiprocess_train(
rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost'
):
"""
Subprocess which initializes distributed training, and begins training.
This should be launched n times for n GPUs; this is handled either in main
or via srun.
:param int rank: This process's rank - 1. (Starts at -1 ... n - 2). See comments.
:param opt: command line options
:param int port: A TCP port to use. This will need to be changed to run
multiple distributed training setups on the same machine.
:param int gpu: Which GPU to use. Defaults to using rank and local devices,
but must be manually specified when using many-hosts.
:param str hostname: Hostname of the main server.
"""
# Set per-host options
opt = copy.deepcopy(opt)
# we need to manually adjust the rank differently in multiprocessing
# and distributed train
rank = rank + rank_offset
opt['rank'] = rank
if gpu is None:
# default assumption is local GPUs
gpu = rank % torch.cuda.device_count()
opt['gpu'] = gpu
# make sure we don't just use whatever GPU was saved in the model file
if 'override' not in opt:
opt['override'] = {}
opt['override']['gpu'] = gpu

# Suppress output of workers except the main host.
if opt.get('verbose') or rank != 0:
print_prefix = 'rank:{:3d} |'.format(rank)
else:
print_prefix = None
suppress_output = not opt.get('verbose') and rank != 0

with distributed_utils.override_print(suppress_output, print_prefix):
# perform distributed setup, ensuring all hosts are ready
if opt['gpu'] != -1:
torch.cuda.set_device(opt['gpu'])
dist.init_process_group(
backend="nccl",
init_method="tcp://{}:{}".format(hostname, port),
world_size=opt['distributed_world_size'],
rank=rank,
)
logging.info("Distributed group initialized")

# manual_seed can be a noop without this
torch.cuda.init()
# make sure all parameters will be in sync
torch.manual_seed(42)
# force a sync so that no one gets ahead, and all are seeded together
distributed_utils.sync_object(None)

with distributed_utils.distributed_context(
rank, opt, port, rank_offset, gpu, hostname
) as opt:
# Run the actual training
return single_train.TrainLoop(opt).train()

Expand Down

0 comments on commit 1a65ad0

Please sign in to comment.