Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to support multi-threaded parallel data preprocessing? #870

Open
YixinSong-e opened this issue Jan 14, 2024 · 11 comments
Open

How to support multi-threaded parallel data preprocessing? #870

YixinSong-e opened this issue Jan 14, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@YixinSong-e
Copy link

I want to pretrain an LLM with 2T tokens using llm-foundry. But before training, the data processing time is too long. Is there any way to accelerate it?

@YixinSong-e YixinSong-e added the enhancement New feature or request label Jan 14, 2024
@MFajcik
Copy link

MFajcik commented Jan 15, 2024

Agree, this would be very useful.

Would it be possible to implement sharding for convert_dataset_json.py? Simply add extra parameters to specify # of shards and index of shard. Script could then be run on multiple machines, targeting the same output directory. I checked the code, but I am not sure how to do it with MDSWriter yet.

@rlrs
Copy link

rlrs commented Jan 16, 2024

I think the example conversion script is perhaps not very good. One thing that helps a lot is to use the Datasets .map() to batch tokenize the dataset. I'm not sure how writing to the MDS file can be parallelized, but it probably can.

Also, there is a bug in tokenizers that might make it way slower than you would like - see huggingface/tokenizers#1413.

@dakinggg
Copy link
Collaborator

The text to MDS conversion script (https://github.com/mosaicml/llm-foundry/blob/main/scripts/data_prep/convert_text_to_mds.py) is parallelized, is that what you are looking for (or at least a good starting point)?

@YixinSong-e
Copy link
Author

The text to MDS conversion script (https://github.com/mosaicml/llm-foundry/blob/main/scripts/data_prep/convert_text_to_mds.py) is parallelized, is that what you are looking for (or at least a good starting point)?

Thanks, I will look into it.

@MFajcik
Copy link

MFajcik commented Jan 17, 2024

Isn't enough to just run the script in parallel, and merge the mds shards with this method?

def merge_shard_groups(root: str) -> None:

Currently, I am trying it like this.

I have large jsonl file. I used split -l to split it into number of procs files. Then I call convert_dataset_json.py independently on each of these, obtaining 1 output folder for each process, the output folder is in some output_root_folder.

Lastly, I hope it will be enough to just call the mentioned merge method on output_root_folder

(Will update once the progress is finished.).

@dakinggg
Copy link
Collaborator

Yes @MFajcik , that should work!

@MFajcik
Copy link

MFajcik commented Jan 20, 2024

Isn't enough to just run the script in parallel, and merge the mds shards with this method?

def merge_shard_groups(root: str) -> None:

Currently, I am trying it like this.

I have large jsonl file. I used split -l to split it into number of procs files. Then I call convert_dataset_json.py independently on each of these, obtaining 1 output folder for each process, the output folder is in some output_root_folder.

Lastly, I hope it will be enough to just call the mentioned merge method on output_root_folder

(Will update once the progress is finished.).

Yes @MFajcik , that should work!

It does work! Preprocessing was done in notime. Training is running right now. Thanks for the hint!

@Riccorl
Copy link

Riccorl commented Feb 13, 2024

I changed ConcatTokensDataset.__iter__ to this:

def __iter__(self) -> Iterable[Dict[str, bytes]]:

        buffer = []
        # self.write_batch_size = 10_000
        shards = self.hf_dataset.num_rows // self.write_batch_size + 1
        for i in range(shards):
            shard = self.hf_dataset[
                i * self.write_batch_size : (i + 1) * self.write_batch_size
            ]
            encoded_shard = self.tokenizer(
                shard["text"], truncation=False, padding=False
            )
            for encoded in encoded_shard["input_ids"]:
                iids = encoded  # ['input_ids']
                buffer = buffer + self.bos_tokens + iids + self.eos_tokens
                while len(buffer) >= self.max_length:
                    concat_sample = buffer[: self.max_length]
                    buffer = buffer[self.max_length :] if self.should_wrap else []
                    yield {
                        # convert to bytes to store in MDS binary format
                        "tokens": np.asarray(concat_sample).tobytes(),
                        "num_tokens": len(concat_sample),
                    }

Processing 7B tokens takes around 20 hours with the original code and 30 min with this change. It's not very robust though and doesn't scale very well: a fast tokenizer hangs after a while with very long text and more than 16 threads seem not to give you any speedup.

@YixinSong-e
Copy link
Author

I changed ConcatTokensDataset.__iter__ to this:

def __iter__(self) -> Iterable[Dict[str, bytes]]:

        buffer = []
        # self.write_batch_size = 10_000
        shards = self.hf_dataset.num_rows // self.write_batch_size + 1
        for i in range(shards):
            shard = self.hf_dataset[
                i * self.write_batch_size : (i + 1) * self.write_batch_size
            ]
            encoded_shard = self.tokenizer(
                shard["text"], truncation=False, padding=False
            )
            for encoded in encoded_shard["input_ids"]:
                iids = encoded  # ['input_ids']
                buffer = buffer + self.bos_tokens + iids + self.eos_tokens
                while len(buffer) >= self.max_length:
                    concat_sample = buffer[: self.max_length]
                    buffer = buffer[self.max_length :] if self.should_wrap else []
                    yield {
                        # convert to bytes to store in MDS binary format
                        "tokens": np.asarray(concat_sample).tobytes(),
                        "num_tokens": len(concat_sample),
                    }

Processing 7B tokens takes around 20 hours with the original code and 30 min with this change. It's not very robust though and doesn't scale very well: a fast tokenizer hangs after a while with very long text and more than 16 threads seem not to give you any speedup.

Thanks for your update! Do you modify other files to enable multithread?

@Riccorl
Copy link

Riccorl commented Feb 19, 2024

Thanks for your update! Do you modify other files to enable multithreaded?

Yes sorry, I also removed os.environ["TOKENIZERS_PARALLELISM"] = "false" from ConcatTokensDataset.__init__.

@YixinSong-e
Copy link
Author

Thanks for your update! Do you modify other files to enable multithreaded?

Yes sorry, I also removed os.environ["TOKENIZERS_PARALLELISM"] = "false" from ConcatTokensDataset.__init__.

It helps a lot. I can process 100B tokens within in 7 hours with your code! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants