Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG-REPORT] vaex save error #2396

Open
shawn0wang opened this issue Oct 16, 2023 · 0 comments
Open

[BUG-REPORT] vaex save error #2396

shawn0wang opened this issue Oct 16, 2023 · 0 comments

Comments

@shawn0wang
Copy link

shawn0wang commented Oct 16, 2023

When I use python multi-process and vaex, I want to save the text as embedding. Everything is normal in the early stage of the program running, but after a while, the saved hdf5 becomes like this:
image
everything is lost, here is my code:

`import gzip
import hashlib
import json
import logging
import os
import warnings
from multiprocessing import Pool

import numpy as np
import vaex
from sentence_transformers import SentenceTransformer

warnings.filterwarnings("ignore")

matching_files = ["x1.json.gz", "x2.json.gz", "x3.json.gz", ...]

print("TOTAL # JOBS:", len(matching_files))
print(matching_files)

def save_embedding(file_path):
cuda_num = int(file_path.split(".")[0][-4:]) % 8
save_name = file_path.split("/")[-1].split(".")[0]
save_path = "xxx"

# log
logging.basicConfig(filename=f"logs/{save_name}.log", level=logging.INFO,
                    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(f"{save_name}_logger")

print("file_path is: ", file_path, "device is: ", cuda_num, "save name is: ", save_name)
# process
model = SentenceTransformer("infgrad/stella-large-zh", device=f"cuda:{cuda_num}")
all_md5 = []
all_text = []
content_embedding = []
rdy_num = 0
total_num = 0
with gzip.open(file_path, "rt") as f:
    for line in f:
        total_num += 1
        json_data = json.loads(line.strip())
        content = json_data["content"]
        embedding = model.encode(
            sentences=content,
            batch_size=1,
            show_progress_bar=False,
            device=f"cuda:{cuda_num}",
            normalize_embeddings=True)
        text_md5 = hashlib.md5(content.encode(encoding="utf-8")).hexdigest()

        all_text.append(content)
        all_md5.append(text_md5)
        content_embedding.append(embedding)
        rdy_num += 1
        # save memory
        if total_num % 10000 == 0:
            print(f"{save_name}*** rdy_num: {rdy_num}, total_num: {total_num}")
            logger.info(
                f"rdy_num: {rdy_num}, total_num: {total_num}, finish: {rdy_num / total_num * 100}%")
            all_text = np.array(all_text)
            all_md5 = np.array(all_md5)
            content_embedding = np.array(content_embedding)
            df = vaex.from_arrays(text=all_text, md5=all_md5, content_embedding=content_embedding)
            if os.path.exists(save_path):
                old_df = vaex.open(save_path)
                new_df = vaex.concat([old_df, df], resolver="strict")
                # if not del old file, it only save new part, not all data, lost old part
                os.remove(save_path)
            else:
                new_df = df
            new_df.export_hdf5(save_path)
            all_md5 = []
            all_text = []
            content_embedding = []
# last part
if os.path.exists(save_path) and len(all_text) != 0 and len(all_md5) != 0:
    logger.info(
        f"rdy_num: {rdy_num}, total_num: {total_num}, finish: {rdy_num / total_num * 100}%")
    all_text = np.array(all_text)
    all_md5 = np.array(all_md5)
    content_embedding = np.array(content_embedding)
    df = vaex.from_arrays(text=all_text, md5=all_md5, content_embedding=content_embedding)
    old_df = vaex.open(save_path)
    new_df = vaex.concat([old_df, df], resolver="strict")
    os.remove(save_path)
    new_df.export_hdf5(save_path)

with Pool(8) as p:
p.map(save_embedding, matching_files)
`

@shawn0wang shawn0wang changed the title [BUG-REPORT] [BUG-REPORT] vaex save error Oct 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant