Skip to content

Commit

Permalink
Ingest deprecation (#2558)
Browse files Browse the repository at this point in the history
  • Loading branch information
adolkhan committed Aug 24, 2023
1 parent b5541a9 commit fc7e347
Showing 1 changed file with 8 additions and 40 deletions.
48 changes: 8 additions & 40 deletions deeplake/core/vectorstore/vector_search/dataset/dataset.py
Expand Up @@ -456,46 +456,14 @@ def extend_or_ingest_dataset(
total_samples_processed,
logger,
):
first_item = next(iter(processed_tensors))

htypes = [
dataset[item].meta.htype for item in dataset.tensors
] # Inspect raw htype (not parsed htype like tensor.htype) in order to avoid parsing links and sequences separately.
threshold_by_htype = [
VECTORSTORE_EXTEND_MAX_SIZE_BY_HTYPE.get(h, int(1e10)) for h in htypes
]
extend_threshold = min(threshold_by_htype + [VECTORSTORE_EXTEND_MAX_SIZE])

if len(processed_tensors[first_item]) <= extend_threshold:
extend(
embedding_function,
embedding_data,
embedding_tensor,
processed_tensors,
dataset,
)
else:
elements = create_elements(processed_tensors)

num_workers_auto = ceil(len(elements) / ingestion_batch_size)
if num_workers_auto < num_workers:
logger.warning(
f"Number of workers is {num_workers}, but {len(elements)} rows of data are being added and the ingestion_batch_size is {ingestion_batch_size}. "
f"Setting the number of workers to {num_workers_auto} instead, in order reduce overhead from excessive workers that will not accelerate ingestion."
f"If you want to parallelizing using more workers, please reduce ``ingestion_batch_size``."
)
num_workers = min(num_workers_auto, num_workers)

ingest_data.run_data_ingestion(
elements=elements,
dataset=dataset,
embedding_function=embedding_function,
embedding_tensor=embedding_tensor,
ingestion_batch_size=ingestion_batch_size,
num_workers=num_workers,
total_samples_processed=total_samples_processed,
logger=logger,
)
# TODO: Add back the old logic with checkpointing after indexing is fixed
extend(
embedding_function,
embedding_data,
embedding_tensor,
processed_tensors,
dataset,
)


def chunk_by_bytes(data, target_byte_size=TARGET_BYTE_SIZE):
Expand Down

0 comments on commit fc7e347

Please sign in to comment.