Skip to content

Commit

Permalink
Merge pull request #2686 from activeloopai/incremental-cache
Browse files Browse the repository at this point in the history
Incremental index and cache maintenance revisited
  • Loading branch information
nvoxland committed Nov 15, 2023
2 parents c291a64 + f214fbe commit 89482ec
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 385 deletions.
3 changes: 3 additions & 0 deletions deeplake/constants.py
Expand Up @@ -342,3 +342,6 @@
"bytes_per_minute": MAX_BYTES_PER_MINUTE,
"batch_byte_size": TARGET_BYTE_SIZE,
}

# Size of dataset view to expose as indra dataset wrapper.
INDRA_DATASET_SAMPLES_THRESHOLD = 10000000
7 changes: 3 additions & 4 deletions deeplake/core/dataset/dataset.py
Expand Up @@ -3344,8 +3344,6 @@ def get_sample_from_engine(
self,
dml_type=_INDEX_OPERATION_MAPPING["UPDATE"],
rowids=list(self.index.values[0].indices(len(self))),
index_regeneration=True,
index_delete=False,
)
raise e
finally:
Expand Down Expand Up @@ -4630,6 +4628,9 @@ def _pop(self, index: Optional[int] = None):
)

with self:
for tensor in self.tensors.values():
if tensor.num_samples > index:
tensor._check_for_pop(index)
for tensor in self.tensors.values():
if tensor.num_samples > index:
tensor._pop(index)
Expand All @@ -4652,7 +4653,6 @@ def pop(self, index: Optional[int] = None):
self,
dml_type=_INDEX_OPERATION_MAPPING["REMOVE"],
rowids=row_ids,
index_regeneration=True,
)

@invalid_view_op
Expand All @@ -4668,7 +4668,6 @@ def pop_multiple(self, index: List[int], progressbar=False):
self,
dml_type=_INDEX_OPERATION_MAPPING["REMOVE"],
rowids=index,
index_regeneration=True,
)

@property
Expand Down
175 changes: 68 additions & 107 deletions deeplake/core/index_maintenance.py
@@ -1,6 +1,5 @@
from deeplake.core.distance_type import DistanceType
from deeplake.core.storage import azure, gcs, google_drive, local, lru_cache, memory
from deeplake.constants import _INDEX_OPERATION_MAPPING
from enum import Enum


Expand All @@ -14,9 +13,8 @@
class INDEX_OP_TYPE(Enum):
NOOP = 0
CREATE_INDEX = 1
REMOVE_INDEX = 2
REGENERATE_INDEX = 3
INCREMENTAL_INDEX = 4
REGENERATE_INDEX = 2
INCREMENTAL_INDEX = 3


def is_embedding_tensor(tensor):
Expand Down Expand Up @@ -92,8 +90,12 @@ def parse_index_distance_metric_from_params(


def check_index_params(self):
emb_tensor = fetch_embedding_tensor(self.dataset)
indexes = emb_tensor.get_vdb_indexes()
if len(indexes) == 0:
return False
current_params = self.index_params
existing_params = fetch_embedding_tensor(self.dataset).get_vdb_indexes()[0]
existing_params = indexes[0]
curr_distance_str = current_params.get("distance_metric", "COS")
curr_distance = get_index_metric(curr_distance_str.upper())

Expand All @@ -107,48 +109,7 @@ def check_index_params(self):
return False


def check_incr_threshold(len_initial_data, len_changed_data):
"""
Determine if the index should be regenerated or built incrementally.
:param len_initial_data: int, length of the original data
:param len_changed_data: int, length of the changed data
:return: bool, True if the index should be regenerated, False otherwise
"""
threshold = 0.7 * len_initial_data
return len_changed_data < threshold


def index_operation_type_vectorstore(
self, changed_data_len, index_regeneration, index_delete=False
):
if not index_used(self.exec_option):
return INDEX_OP_TYPE.NOOP

if index_delete:
return INDEX_OP_TYPE.REMOVE_INDEX

if not index_exists(self.dataset):
threshold = self.index_params.get("threshold", -1)
below_threshold = threshold <= 0 or len(self.dataset) < threshold
if not below_threshold:
return INDEX_OP_TYPE.CREATE_INDEX
else:
if (
not index_regeneration
and check_index_params(self)
and check_incr_threshold(len(self.dataset), changed_data_len)
):
return INDEX_OP_TYPE.INCREMENTAL_INDEX
else:
return INDEX_OP_TYPE.REGENERATE_INDEX

return INDEX_OP_TYPE.NOOP


def index_operation_type_dataset(
self, num_rows, changed_data_len, index_regeneration, index_delete=False
):
def index_operation_type_dataset(self, num_rows, changed_data_len):
if not index_exists(self):
if self.index_params is None:
return INDEX_OP_TYPE.NOOP
Expand All @@ -160,13 +121,7 @@ def index_operation_type_dataset(
if not check_vdb_indexes(self):
return INDEX_OP_TYPE.NOOP

if index_delete:
return INDEX_OP_TYPE.REMOVE_INDEX

if not index_regeneration and check_incr_threshold(num_rows, changed_data_len):
return INDEX_OP_TYPE.INCREMENTAL_INDEX
else:
return INDEX_OP_TYPE.REGENERATE_INDEX
return INDEX_OP_TYPE.INCREMENTAL_INDEX


def get_index_metric(metric):
Expand Down Expand Up @@ -225,59 +180,61 @@ def check_vdb_indexes(dataset):
return False


def index_cache_cleanup(dataset):
# Gdrive and In memory datasets are not supported for libdeeplake
if dataset.path.startswith("gdrive://") or dataset.path.startswith("mem://"):
return
def _incr_maintenance_vdb_indexes(tensor, indexes, index_operation):
try:
is_embedding = tensor.htype == "embedding"
has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes")
try:
vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0
except AttributeError:
vdb_index_ids_present = False

tensors = dataset.tensors
for _, tensor in tensors.items():
is_embedding = is_embedding_tensor(tensor)
if is_embedding:
tensor.unload_index_cache()
if is_embedding and has_vdb_indexes and vdb_index_ids_present:
for vdb_index in tensor.meta.vdb_indexes:
tensor.update_vdb_index(
operation_kind=index_operation,
row_ids=indexes,
)
except Exception as e:
raise Exception(f"An error occurred while regenerating VDB indexes: {e}")


# Routine to identify the index Operation.
def index_operation_vectorstore(
self, dml_type, rowids, index_regeneration: bool = False, index_delete: bool = False
):
index_operation_type = index_operation_type_vectorstore(
self,
len(rowids) if rowids is not None else 0,
index_regeneration=index_regeneration,
index_delete=index_delete,
)
def index_operation_vectorstore(self):
if not index_used(self.exec_option):
return None

emb_tensor = fetch_embedding_tensor(self.dataset)

if index_operation_type == INDEX_OP_TYPE.NOOP:
return
if index_exists(self.dataset) and check_index_params(self):
return emb_tensor.get_vdb_indexes()[0]["distance"]

index_cache_cleanup(self.dataset)
if index_operation_type == INDEX_OP_TYPE.CREATE_INDEX:
distance_str = self.index_params.get("distance_metric", "COS")
additional_params_dict = self.index_params.get("additional_params", None)
distance = get_index_metric(distance_str.upper())
if additional_params_dict and len(additional_params_dict) > 0:
param_dict = normalize_additional_params(additional_params_dict)
emb_tensor.create_vdb_index(
"hnsw_1", distance=distance, additional_params=param_dict
)
else:
emb_tensor.create_vdb_index("hnsw_1", distance=distance)
elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX:
emb_tensor._incr_maintenance_vdb_indexes(rowids, dml_type)
elif index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX:
emb_tensor._regenerate_vdb_indexes()
elif index_operation_type == INDEX_OP_TYPE.REMOVE_INDEX:
vdb_indexes = emb_tensor.get_vdb_indexes()
emb_tensor.delete_vdb_index(vdb_indexes["id"])
threshold = self.index_params.get("threshold", -1)
below_threshold = threshold < 0 or len(self.dataset) < threshold
if below_threshold:
return None

if not check_index_params(self):
try:
vdb_indexes = emb_tensor.get_vdb_indexes()
for vdb_index in vdb_indexes:
emb_tensor.delete_vdb_index(vdb_index["id"])
except Exception as e:
raise Exception(f"An error occurred while removing VDB indexes: {e}")
distance_str = self.index_params.get("distance_metric", "COS")
additional_params_dict = self.index_params.get("additional_params", None)
distance = get_index_metric(distance_str.upper())
if additional_params_dict and len(additional_params_dict) > 0:
param_dict = normalize_additional_params(additional_params_dict)
emb_tensor.create_vdb_index(
"hnsw_1", distance=distance, additional_params=param_dict
)
else:
raise Exception("Unknown index operation")
emb_tensor.create_vdb_index("hnsw_1", distance=distance)
return distance


def index_operation_dataset(
self, dml_type, rowids, index_regeneration: bool = False, index_delete: bool = False
):
def index_operation_dataset(self, dml_type, rowids):
emb_tensor = fetch_embedding_tensor(self)
if emb_tensor is None:
return
Expand All @@ -286,15 +243,24 @@ def index_operation_dataset(
self,
emb_tensor.chunk_engine.num_samples,
len(rowids),
index_regeneration=index_regeneration,
index_delete=index_delete,
)

if index_operation_type == INDEX_OP_TYPE.NOOP:
return

index_cache_cleanup(self)
if index_operation_type == INDEX_OP_TYPE.CREATE_INDEX:
if (
index_operation_type == INDEX_OP_TYPE.CREATE_INDEX
or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX
):
if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX:
try:
vdb_indexes = emb_tensor.get_vdb_indexes()
for vdb_index in vdb_indexes:
emb_tensor.delete_vdb_index(vdb_index["id"])
except Exception as e:
raise Exception(
f"An error occurred while regenerating VDB indexes: {e}"
)
distance_str = self.index_params.get("distance_metric", "COS")
additional_params_dict = self.index_params.get("additional_params", None)
distance = get_index_metric(distance_str.upper())
Expand All @@ -306,11 +272,6 @@ def index_operation_dataset(
else:
emb_tensor.create_vdb_index("hnsw_1", distance=distance)
elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX:
emb_tensor._incr_maintenance_vdb_indexes(rowids, dml_type)
elif index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX:
emb_tensor._regenerate_vdb_indexes()
elif index_operation_type == INDEX_OP_TYPE.REMOVE_INDEX:
vdb_indexes = emb_tensor.get_vdb_indexes()
emb_tensor.delete_vdb_index(vdb_indexes["id"])
_incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type)
else:
raise Exception("Unknown index operation")

0 comments on commit 89482ec

Please sign in to comment.