Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support pipeline parallelism model wrapper #1355

Open
wants to merge 48 commits into
base: main
Choose a base branch
from

Conversation

fanqiNO1
Copy link
Collaborator

@fanqiNO1 fanqiNO1 commented Sep 12, 2023

Background

As the model inference process requires more and more CUDA memory, we need a way to complete the model inference process in a variety of CUDA memory situations, mainly the following two cases:

  1. Insufficient CUDA memory
    The model inference process is accomplished by cpu offload, disk offload policy.
  2. Sufficient CUDA memory
    The model can be partitioned across multiple gpus, in which case the model inference should be done as efficiently as possible.

huggingface introduces the accelerate library, which can also allow users to complete the inference in the case of insufficient CUDA memory, but its utilization of resources is too inefficient.

Design

To accelerate the inference process by utilizing resources as much as possible, we will implement a pipeline parallelism-based model wrapper.

The pipeline parallelism-based model wrapper is primarily responsible for:

  • build model, load and dispatch weights
  • pipeline parallelism-based inference process

This PR will support MMPipelineParallel.

Environment

  • PyTorch: 2.0.0
  • CUDA: 11.8
  • GPU: 8 * A100, 80G

Validation

  • init_device_map
  • offload policy
  • pipeline parallelism

Experiment

ResNet-152

Accelerate Torchgpipe Ours
pipeline-1 1478.576 samples/sec 1476.419 sample/sec 1482.065 samples/sec
pipeline-2 935.565 samples/sec 1327.254 samples/sec 1733.871 samples/sec
pipeline-4 1023.315 samples/sec 1908.557 samples/sec 2757.441 samples/sec
pipeline-8 1051.154 samples/sec 2874.286 samples/sec 3742.485 samples/sec

Scipts

import torch
from torch import nn
from torchvision.models import resnet152


class MMResnet(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = resnet152()
        
    def forward(self, x):
        return self.model(x)
    
    def data_preprocessor(self, x, training=False):
        return x
        
    def test_step(self, data):
        data = data['input']
        return self.model(data)


if __name__ == '__main__':
    import time
    from mmengine.model import MMPipelineParallel
    from tqdm import tqdm
    from torchvision.models import resnet152
    from colorama import Fore

    SEED = 0x66ccff
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)

    model = MMResnet()
    model.eval()

    SETTINGS = {
        1: {
            'num_chunks': 2,
            'batch_size': 220
        },
        2: {
            'num_chunks': 1667,
            'batch_size': 25000
        },
        4: {
            'num_chunks': 256,
            'batch_size': 5632
        },
        8: {
            'num_chunks': 150,
            'batch_size': 5400
        }
    }

    num_pipelines = 1
    num_chunks = SETTINGS[num_pipelines]['num_chunks']
    num_samples = 50000
    batch_size = SETTINGS[num_pipelines]['batch_size']
    # generate data
    dataset = []
    all_cuda_data = True
    num_batches = num_samples // batch_size
    others = num_samples % batch_size
    if all_cuda_data:
        data = torch.randn(batch_size, 3, 224, 224).to('cuda:0')
        for i in tqdm(range(num_batches)):
            dataset.append(data)
        if others > 0:
            dataset.append(data[:others])
    else:
        for i in range(num_batches):
            dataset.append(torch.randn(batch_size, 3, 224, 224))
        if others > 0:
            dataset.append(torch.randn(others, 3, 224, 224))
    print(f'{Fore.GREEN}Data generated{Fore.RESET}')
    # init inferencer
    inferencer = MMPipelineParallel(
        module=model,
        num_pipelines=num_pipelines,
        num_chunks=num_chunks,
        no_split_module_classes='Bottleneck',
        input_key='input'
    )
    # run
    EPOCHS = 10
    SKIP_EPOCHs = 1
    throughputs = []
    elapseds = []
    for i in range(EPOCHS):
        torch.cuda.synchronize()
        tick = time.time()
        # output = inferencer(dataset)
        for data in dataset:
            data = {'input': data}
            out = inferencer(data)
        torch.cuda.synchronize()
        tock = time.time()
        # calculate throughput
        elapsed = tock - tick
        throughput = num_samples / elapsed
        if i >= SKIP_EPOCHs:
            throughputs.append(throughput)
            elapseds.append(elapsed)
        print(f'{Fore.BLUE}Epoch {i+1} throughput: ' +
              f'{throughput:.3f}samples/sec with ' +
              f'{elapsed:.3f}secs{Fore.RESET}')
    throughput = sum(throughputs) / len(throughputs)
    elapsed = sum(elapseds) / len(elapseds)
    print(f'{Fore.RED}Pipeline {num_pipelines}, ' +
          f'Chunks {num_chunks}, Batchsize {batch_size} ' +
          f'Throughput: {throughput:.3f}samples/sec ' +
          f'with {elapsed:.3f}secs{Fore.RESET}')

@CLAassistant
Copy link

CLAassistant commented Sep 12, 2023

CLA assistant check
All committers have signed the CLA.


def __init__(self,
model: Union[dict, nn.Module],
weights: Optional[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepting weights and loading weights in model_wrapper is inconsistent with other model wrappers. we should consider combining with BaseInferencer to see if there's a better approach.

mmengine/model/wrappers/pipeline_distributed.py Outdated Show resolved Hide resolved
mmengine/model/wrappers/pipeline_distributed.py Outdated Show resolved Hide resolved
mmengine/model/wrappers/pipeline_distributed.py Outdated Show resolved Hide resolved
mmengine/model/wrappers/pipeline_distributed.py Outdated Show resolved Hide resolved
mmengine/model/wrappers/pipeline_distributed.py Outdated Show resolved Hide resolved
}
# handle tied weights
tied_weights = self.model_tree['tied_parameters']
for source, targets in tied_weights.items():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key of tied_weights means param_name, and value means list of module_names. So, why do we use device_map[source] and device_map[target] here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants