Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed support (rework) #996

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

lrzpellegrini
Copy link
Collaborator

This draft contains the implementation for supporting distributed (multi-GPU) training.

The examples folder contains a simple example (to run naive, replay, replay+scheduler) and a bash script to launch it.

This is an alternative to #907 created with the idea to minimize the changes required to the strategies.

@lrzpellegrini
Copy link
Collaborator Author

It seems that unit tests failed due to a temporary error while downloading CIFAR. I triggered a re-run and unit tests are ok!

avalanche/training/plugins/evaluation.py Outdated Show resolved Hide resolved
return avalanche_forward(self.model, self.mb_x, self.mb_task_id)

@final
def model_adaptation(self, model=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. Local adaptation probably needs all the data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason here is that we want the users to override _model_adaptation instead of model_adaptation. The _model_adaptation is then called from within the with self.use_local_model() context. This is needed to ensure the users are changing the local model instead of the model (which may be the wrapped one).

examples/distributed_training.py Outdated Show resolved Hide resolved
avalanche/training/templates/supervised.py Outdated Show resolved Hide resolved
def forward(self):
"""Compute the model's output given the current mini-batch."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need _forward and forward? I would keep only one of them. Does it make sense to move this method to the DistributedModelStrategy helper?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as _model_adaptation and model_adaptation. The wrapper is mainly needed to add the context managers.

return avalanche_forward(self.model, self.mb_x, self.mb_task_id)

@final
def model_adaptation(self, model=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as _forward above.

avalanche/distributed/distributed_batch.py Outdated Show resolved Hide resolved
An intermediate abstract class in charge of synchronizing data batches.

This class can handle batches as either tuples of elements (as usual) or
even single values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. Why do we need a single class to deal with both tuples and single values? It seems easier to have different classes each with their own merge and sync methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A class that can handle both situations may be useful for unsupervised strategies that may have a batch made of the X value only. In that case, the batch is not a tuple and things need to be managed in a slightly different way. Switching classes depending on that may be more complicated than doing this...

avalanche/distributed/distributed_tensor.py Outdated Show resolved Hide resolved
@AntonioCarta
Copy link
Collaborator

I left some general comments about the API and some things I didn't understand about the implementation. Overall, I think it's now much easier to add distributed support and integrate it. I think the API is a bit more verbose than it needs to be, especially due to the large number of methods that we need to add to a new distributed value. Personally, I would have chosen a more minimal set of methods but the solution as is is already quite good.

One thing that is missing, and it's quite important, is a test that checks which plugins actually work with the distributed training, which may not be obvious right now.

@lrzpellegrini
Copy link
Collaborator Author

Yes, the current issue is to check which plugins may not work. It seems that replay and the scheduler plugin are currently working as expected, but there are many more to test. I'm working on this!

@lrzpellegrini
Copy link
Collaborator Author

@AntonioCarta I fixed the naming of the methods you mentioned and added the use_local strategy method. I also added, in the example, an in-code description of the changes to be made in the main script to support distributed training. I'll add more docstrings and fix the remaining things ASAP.

We still have to finalize a choice regarding the _forward, _model_adaptation, ... methods.

@coveralls
Copy link

coveralls commented Apr 29, 2022

Pull Request Test Coverage Report for Build 2699048914

  • 508 of 966 (52.59%) changed or added relevant lines in 26 files are covered.
  • 2 unchanged lines in 1 file lost coverage.
  • Overall coverage decreased (-1.07%) to 71.195%

Changes Missing Coverage Covered Lines Changed/Added Lines %
avalanche/benchmarks/utils/data_loader.py 5 6 83.33%
avalanche/distributed/distributed_commons.py 7 8 87.5%
avalanche/logging/base_logger.py 2 3 66.67%
avalanche/training/plugins/evaluation.py 9 10 90.0%
avalanche/training/supervised/deep_slda.py 10 11 90.91%
avalanche/training/templates/supervised.py 19 20 95.0%
avalanche/training/supervised/strategy_wrappers.py 1 3 33.33%
avalanche/distributed/strategies/distributed_loss_strategy.py 22 26 84.62%
avalanche/distributed/strategies/distributed_model_strategy.py 20 24 83.33%
avalanche/distributed/strategies/distributed_strategy_support.py 8 12 66.67%
Files with Coverage Reduction New Missed Lines %
avalanche/benchmarks/scenarios/classification_scenario.py 2 86.08%
Totals Coverage Status
Change from base Build 2696355092: -1.07%
Covered Lines: 13238
Relevant Lines: 18594

💛 - Coveralls

@AntonioCarta
Copy link
Collaborator

Thanks!

We still have to finalize a choice regarding the _forward, _model_adaptation, ... methods.

Maybe we could have a forward and a _forward_impl, with the idea that users call forward but override _forward_impl? Should fine if documented.

@ContinualAI-bot
Copy link
Collaborator

Oh no! It seems there are some PEP8 errors! 😕
Don't worry, you can fix them! 💪
Here's a report about the errors and where you can find them:

avalanche/benchmarks/utils/collate_functions.py:131:81: E501 line too long (86 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:180:81: E501 line too long (81 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:183:81: E501 line too long (82 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:201:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:220:81: E501 line too long (105 > 80 characters)
avalanche/benchmarks/utils/data_attribute.py:39:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:84:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:87:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:147:81: E501 line too long (82 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:369:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:489:81: E501 line too long (85 > 80 characters)
avalanche/benchmarks/utils/flat_data.py:75:81: E501 line too long (83 > 80 characters)
avalanche/benchmarks/utils/flat_data.py:246:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/classification_scenario.py:34:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/detection_scenario.py:109:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/detection_scenario.py:112:81: E501 line too long (83 > 80 characters)
avalanche/benchmarks/scenarios/lazy_dataset_sequence.py:15:81: E501 line too long (84 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:3:81: E501 line too long (113 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:26:81: E501 line too long (90 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:27:81: E501 line too long (88 > 80 characters)
avalanche/training/templates/observation_type/online_observation.py:69:81: E501 line too long (84 > 80 characters)
avalanche/distributed/distributed_batch.py:96:81: E501 line too long (84 > 80 characters)
avalanche/distributed/distributed_batch.py:128:81: E501 line too long (85 > 80 characters)
avalanche/distributed/distributed_helper.py:14:81: E501 line too long (82 > 80 characters)
tests/run_dist_tests.py:58:81: E501 line too long (84 > 80 characters)
tests/distributed/test_distributed_batch.py:79:81: E501 line too long (85 > 80 characters)
tests/distributed/test_distributed_batch.py:104:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_helper.py:10:81: E501 line too long (93 > 80 characters)
tests/distributed/test_distributed_helper.py:28:81: E501 line too long (91 > 80 characters)
tests/distributed/test_distributed_helper.py:29:81: E501 line too long (83 > 80 characters)
tests/distributed/test_distributed_helper.py:32:81: E501 line too long (95 > 80 characters)
tests/distributed/test_distributed_helper.py:38:81: E501 line too long (101 > 80 characters)
tests/distributed/test_distributed_helper.py:41:81: E501 line too long (82 > 80 characters)
tests/distributed/test_distributed_helper.py:43:81: E501 line too long (95 > 80 characters)
tests/distributed/test_distributed_helper.py:59:81: E501 line too long (95 > 80 characters)
tests/distributed/test_distributed_helper.py:71:81: E501 line too long (95 > 80 characters)
36      E501 line too long (86 > 80 characters)

…efault loggers creation. Added distributed training integration unit tests.
@ContinualAI-bot
Copy link
Collaborator

Oh no! It seems there are some PEP8 errors! 😕
Don't worry, you can fix them! 💪
Here's a report about the errors and where you can find them:

avalanche/benchmarks/utils/collate_functions.py:131:81: E501 line too long (86 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:180:81: E501 line too long (81 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:183:81: E501 line too long (82 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:201:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/utils/collate_functions.py:220:81: E501 line too long (105 > 80 characters)
avalanche/benchmarks/utils/data_attribute.py:39:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:84:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:87:81: E501 line too long (94 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:147:81: E501 line too long (82 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:369:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/utils/detection_dataset.py:489:81: E501 line too long (85 > 80 characters)
avalanche/benchmarks/utils/flat_data.py:75:81: E501 line too long (83 > 80 characters)
avalanche/benchmarks/utils/flat_data.py:246:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/classification_scenario.py:34:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/detection_scenario.py:109:81: E501 line too long (84 > 80 characters)
avalanche/benchmarks/scenarios/detection_scenario.py:112:81: E501 line too long (83 > 80 characters)
avalanche/benchmarks/scenarios/lazy_dataset_sequence.py:15:81: E501 line too long (84 > 80 characters)
avalanche/training/templates/base_sgd.py:53:81: E501 line too long (95 > 80 characters)
avalanche/training/templates/base_sgd.py:312:5: E303 too many blank lines (2)
avalanche/training/templates/base_sgd.py:338:81: E501 line too long (83 > 80 characters)
avalanche/training/templates/base_sgd.py:350:81: E501 line too long (90 > 80 characters)
avalanche/training/templates/base_sgd.py:377:81: E501 line too long (81 > 80 characters)
avalanche/training/templates/base_sgd.py:417:81: E501 line too long (81 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:3:81: E501 line too long (113 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:26:81: E501 line too long (90 > 80 characters)
avalanche/training/templates/problem_type/supervised_problem.py:27:81: E501 line too long (88 > 80 characters)
avalanche/training/templates/observation_type/online_observation.py:69:81: E501 line too long (84 > 80 characters)
avalanche/training/supervised/naive_object_detection.py:143:81: E501 line too long (81 > 80 characters)
avalanche/training/supervised/naive_object_detection.py:173:9: E125 continuation line with same indent as next logical line
avalanche/training/supervised/naive_object_detection.py:182:81: E501 line too long (81 > 80 characters)
avalanche/distributed/distributed_batch.py:96:81: E501 line too long (84 > 80 characters)
avalanche/distributed/distributed_batch.py:128:81: E501 line too long (85 > 80 characters)
avalanche/distributed/distributed_helper.py:14:81: E501 line too long (82 > 80 characters)
avalanche/distributed/strategies/distributed_mbatch_strategy.py:5:81: E501 line too long (111 > 80 characters)
avalanche/distributed/strategies/distributed_mbatch_strategy.py:171:81: E501 line too long (91 > 80 characters)
tests/run_dist_tests.py:46:81: E501 line too long (83 > 80 characters)
tests/run_dist_tests.py:54:81: E501 line too long (82 > 80 characters)
tests/run_dist_tests.py:71:81: E501 line too long (84 > 80 characters)
tests/distributed/distributed_test_utils.py:10:81: E501 line too long (82 > 80 characters)
tests/distributed/test_distributed_batch.py:10:81: E501 line too long (110 > 80 characters)
tests/distributed/test_distributed_batch.py:69:81: E501 line too long (85 > 80 characters)
tests/distributed/test_distributed_batch.py:94:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_helper.py:9:81: E501 line too long (93 > 80 characters)
tests/distributed/test_distributed_helper.py:12:81: E501 line too long (110 > 80 characters)
tests/distributed/test_distributed_helper.py:26:81: E501 line too long (101 > 80 characters)
tests/distributed/test_distributed_helper.py:29:81: E501 line too long (82 > 80 characters)
tests/distributed/test_distributed_model.py:7:81: E501 line too long (110 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:11:81: E501 line too long (83 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:15:81: E501 line too long (110 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:92:81: E501 line too long (108 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:111:81: E501 line too long (85 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:115:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:123:81: E501 line too long (116 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:129:81: E501 line too long (90 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:138:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:170:81: E501 line too long (100 > 80 characters)
tests/distributed/test_distributed_strategy_support.py:178:81: E501 line too long (100 > 80 characters)
tests/distributed/test_distributed_tensor.py:8:81: E501 line too long (110 > 80 characters)
1       E125 continuation line with same indent as next logical line
1       E303 too many blank lines (2)
56      E501 line too long (86 > 80 characters)

@lrzpellegrini
Copy link
Collaborator Author

@AntonioCarta The PR is almost completed and I'd like to add more unit tests to be sure everything works fine.

The only real part that I still need to figure out is the mb_output collate. Currently, I slightly reworked the whole collate things to have an abstract class that contains:

  • The collate_fn (the usual function passed to DataLoader)
  • A collate for single values (that can be used to collate only x or y o t)
  • A collate that merges batches (that were already collated) [needed for distributed training support]
  • A collate that merges single value batches (that were already collated) [will be needed in the future to make metrics/logging in distributed training more efficient]

The good part is that we can use instances of this class to populate the collate_fn field of AvalancheDatasets because they have a call method that calls the collate_fn. For the moment, I have implemented the Collate class for classification and detection. However, this works for the input batch.

For distributed support, we also need to implement the same thing, but on the outputs. Synchronizing the output across processes is important to have the plugins and metrics aligned. I already took care of the loss by averaging it across processes, but mb_output needs a proper collate. The problem is that it has to be different from the input one (as the format may be different for inputs and outputs). We can't leverage the collate_fn from the experience dataset unless we enrich the Collate class to also contain the collate methods needed to manage the output (this is one possible solution). A different solution is to have the strategy itself define the collate for the output.

Do you any suggestion on this?

@AntonioCarta
Copy link
Collaborator

I need to think about this. Unfortunately the output collate cannot be part of the dataset since it depends on the model's definition and its output.

At this point, I'm wondering whether it's just easier to enforce a more structured and general mini-batch definition. Something like tensordict. It would provide a single and well defined method to collate values (also for scalar and tensor metrics).

@AndreaCossu
Copy link
Collaborator

I agree that TensorDict (the official torch API for that is still experimental) or simply dictionaries are a more flexible choice to manipulate the minibatch information (x, y, task labels etc). However, even with TensorDict you need to know the semantics of the tensors to know how to collate them (how to collate multiple x together, multiple y together etc).

I think if we use dictionaries and 1. a collate for single values + 2. a collate that merges single value batches (often the same as 1.) we could recover all the other collate operations.
Creating the minibatch in the dataloader amounts to use 1 + dictionary. Stacking together multiple mini-batches amounts to use 2 + dictionary.

@AntonioCarta
Copy link
Collaborator

I was definitely underestimating the complexity of possible collate functions.

I think if we use dictionaries and 1. a collate for single values + 2. a collate that merges single value batches (often the same as 1.) we could recover all the other collate operations.

There is also an additional problem. In general, it may not be true that the single collate is the same for all the values in the minibatch.

@ContinualAI-bot
Copy link
Collaborator

Oh no! It seems there are some PEP8 errors! 😕
Don't worry, you can fix them! 💪
Here's a report about the errors and where you can find them:

tests/test_avalanche_classification_dataset.py:1716:27: E741 ambiguous variable name 'l'
1       E741 ambiguous variable name 'l'

@ContinualAI-bot
Copy link
Collaborator

Oh no! It seems there are some PEP8 errors! 😕
Don't worry, you can fix them! 💪
Here's a report about the errors and where you can find them:

avalanche/distributed/distributed_helper.py:285:81: E501 line too long (109 > 80 characters)
avalanche/distributed/distributed_helper.py:286:81: E501 line too long (114 > 80 characters)
avalanche/distributed/distributed_helper.py:287:81: E501 line too long (89 > 80 characters)
tests/test_avalanche_classification_dataset.py:1716:27: E741 ambiguous variable name 'l'
tests/distributed/test_distributed_helper.py:15:81: E501 line too long (82 > 80 characters)
tests/distributed/test_distributed_helper.py:16:81: E501 line too long (96 > 80 characters)
tests/distributed/test_distributed_helper.py:26:81: E501 line too long (95 > 80 characters)
tests/distributed/test_distributed_helper.py:165:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_helper.py:174:81: E501 line too long (97 > 80 characters)
tests/distributed/test_distributed_helper.py:188:81: E501 line too long (81 > 80 characters)
tests/distributed/test_distributed_helper.py:191:81: E501 line too long (96 > 80 characters)
tests/distributed/test_distributed_helper.py:194:81: E501 line too long (91 > 80 characters)
tests/distributed/test_distributed_helper.py:198:81: E501 line too long (98 > 80 characters)
tests/distributed/test_distributed_helper.py:203:81: E501 line too long (100 > 80 characters)
tests/distributed/test_distributed_helper.py:204:81: E501 line too long (104 > 80 characters)
tests/distributed/test_distributed_helper.py:205:81: E501 line too long (136 > 80 characters)
tests/distributed/test_distributed_helper.py:213:81: E501 line too long (83 > 80 characters)
tests/distributed/test_distributed_helper.py:219:81: E501 line too long (88 > 80 characters)
tests/distributed/test_distributed_helper.py:223:81: E501 line too long (106 > 80 characters)
tests/distributed/test_distributed_helper.py:232:81: E501 line too long (110 > 80 characters)
tests/distributed/test_distributed_helper.py:241:81: E501 line too long (117 > 80 characters)
tests/distributed/test_distributed_helper.py:246:81: E501 line too long (97 > 80 characters)
tests/distributed/test_distributed_helper.py:251:81: E501 line too long (99 > 80 characters)
tests/distributed/test_distributed_helper.py:252:81: E501 line too long (104 > 80 characters)
tests/distributed/test_distributed_helper.py:253:81: E501 line too long (136 > 80 characters)
tests/distributed/test_distributed_helper.py:260:81: E501 line too long (88 > 80 characters)
tests/distributed/test_distributed_helper.py:266:81: E501 line too long (97 > 80 characters)
tests/distributed/test_distributed_helper.py:271:81: E501 line too long (99 > 80 characters)
tests/distributed/test_distributed_helper.py:272:81: E501 line too long (104 > 80 characters)
tests/distributed/test_distributed_helper.py:273:81: E501 line too long (136 > 80 characters)
tests/distributed/test_distributed_helper.py:280:81: E501 line too long (106 > 80 characters)
tests/distributed/test_distributed_helper.py:291:81: E501 line too long (112 > 80 characters)
tests/distributed/test_distributed_helper.py:292:81: E501 line too long (104 > 80 characters)
tests/distributed/test_distributed_helper.py:293:81: E501 line too long (136 > 80 characters)
tests/distributed/test_distributed_helper.py:300:81: E501 line too long (83 > 80 characters)
tests/distributed/test_distributed_helper.py:389:13: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:391:17: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:394:21: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:403:17: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:408:21: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:417:17: E265 block comment should start with '# '
tests/distributed/test_distributed_helper.py:421:81: E501 line too long (87 > 80 characters)
6       E265 block comment should start with '# '
35      E501 line too long (109 > 80 characters)
1       E741 ambiguous variable name 'l'

@botcs
Copy link

botcs commented Jan 18, 2023

Hi,

Thanks for the amazing job!
Any update on this?

@lrzpellegrini
Copy link
Collaborator Author

I'm moving the development status to #1315. This PR, in its current state, is very difficult to merge in a single step as it is too big and based on a too old codebase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants