Skip to content

Commit

Permalink
Deep Memory support on aws (#2635)
Browse files Browse the repository at this point in the history
* first changes

* fixes

* test fix

* test fix

* mypy fix

* addressed comments

---------

Co-authored-by: adolkhan <adilkhan.sarsen@alumni.nu.edu.kz>
  • Loading branch information
adolkhan and adolkhan committed Oct 4, 2023
1 parent 5593f58 commit 99bc81a
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 33 deletions.
15 changes: 9 additions & 6 deletions deeplake/client/client.py
Expand Up @@ -542,12 +542,15 @@ def deepmemory_is_available(self, org_id: str):
Returns:
bool: True if DeepMemory is available, False otherwise.
"""
response = self.request(
"GET",
f"/api/organizations/{org_id}/features/deepmemory",
endpoint=self.endpoint(),
)
return response.json()["available"]
try:
response = self.request(
"GET",
f"/api/organizations/{org_id}/features/deepmemory",
endpoint=self.endpoint(),
)
return response.json()["available"]
except Exception:
return False

def start_taining(
self,
Expand Down
11 changes: 10 additions & 1 deletion deeplake/core/vectorstore/deep_memory.py
Expand Up @@ -14,6 +14,7 @@
from deeplake.util.bugout_reporter import (
feature_report_path,
)
from deeplake.util.path import get_path_type


class DeepMemory:
Expand All @@ -23,6 +24,7 @@ def __init__(
client: DeepMemoryBackendClient,
embedding_function: Optional[Any] = None,
token: Optional[str] = None,
creds: Optional[Dict[str, Any]] = None,
):
"""Based Deep Memory class to train and evaluate models on DeepMemory managed service.
Expand All @@ -31,6 +33,7 @@ def __init__(
client (DeepMemoryBackendClient): Client to interact with the DeepMemory managed service. Defaults to None.
embedding_function (Optional[Any], optional): Embedding funtion class used to convert queries/documents to embeddings. Defaults to None.
token (Optional[str], optional): API token for the DeepMemory managed service. Defaults to None.
creds (Optional[Dict[str, Any]], optional): Credentials to access the dataset. Defaults to None.
Raises:
ImportError: if indra is not installed
Expand All @@ -49,10 +52,12 @@ def __init__(
self.token = token
self.embedding_function = embedding_function
self.client = client
self.creds = creds or {}
self.queries_dataset = deeplake.dataset(
self.dataset.path + "_eval_queries",
token=token,
read_only=False,
creds=self.creds,
)
if len(self.queries_dataset) == 0:
self.queries_dataset.commit(allow_empty=True)
Expand Down Expand Up @@ -109,12 +114,16 @@ def train(
if embedding_function is None and self.embedding_function is not None:
embedding_function = self.embedding_function.embed_documents

runtime = None
if get_path_type(corpus_path) == "hub":
runtime = {"tensor_db": True}
queries_vs = VectorStore(
path=queries_path,
overwrite=True,
runtime={"tensor_db": True},
runtime=runtime,
embedding_function=embedding_function,
token=token or self.token,
creds=self.creds,
)

queries_vs.add(
Expand Down
24 changes: 13 additions & 11 deletions deeplake/core/vectorstore/deeplake_vectorstore.py
Expand Up @@ -8,6 +8,7 @@
import deeplake
from deeplake.core.distance_type import DistanceType
from deeplake.util.dataset import try_flushing
from deeplake.util.path import convert_pathlib_to_string_if_needed

from deeplake.api import dataset
from deeplake.core.dataset import Dataset
Expand All @@ -24,10 +25,11 @@
from deeplake.core.vectorstore.vector_search import dataset as dataset_utils
from deeplake.core.vectorstore.vector_search import filter as filter_utils
from deeplake.core.vectorstore.vector_search.indra import index

from deeplake.util.bugout_reporter import (
feature_report_path,
)
from deeplake.util.path import get_path_type


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,8 +132,9 @@ def __init__(
self.indra_installed = False # pragma: no cover

self._token = token
self.path = path
self.path = convert_pathlib_to_string_if_needed(path)
self.logger = logger
self.org_id = org_id if get_path_type(self.path) == "local" else None

feature_report_path(
path,
Expand All @@ -158,22 +161,20 @@ def __init__(
self.ingestion_batch_size = ingestion_batch_size
self.index_params = utils.parse_index_params(index_params)
self.num_workers = num_workers

if creds is None:
creds = {}
self.creds = creds or {}

self.dataset = dataset_utils.create_or_load_dataset(
tensor_params,
path,
self.token,
creds,
self.creds,
self.logger,
read_only,
exec_option,
embedding_function,
overwrite,
runtime,
org_id,
self.org_id,
branch,
**kwargs,
)
Expand Down Expand Up @@ -439,7 +440,6 @@ def search(
return_view (bool): Return a Deep Lake dataset view that satisfied the search parameters, instead of a dictionary with data. Defaults to False. If ``True`` return_tensors is set to "*" beucase data is lazy-loaded and there is no cost to including all tensors in the view.
deep_memory (bool): Whether to use the Deep Memory model for improving search results. Defaults to False if deep_memory is not specified in the Vector Store initialization.
If True, the distance metric is set to "deepmemory_distance", which represents the metric with which the model was trained. The search is performed using the Deep Memory model. If False, the distance metric is set to "COS" or whatever distance metric user specifies.
token (str, optional): Activeloop token, used for fetching user credentials. This is Optional, tokens are normally autogenerated. Defaults to None.
..
# noqa: DAR101
Expand Down Expand Up @@ -485,9 +485,8 @@ def search(

if deep_memory and not self.deep_memory:
raise ValueError(
"Deep Memory dataset should have been created with runtime = {'tensor_db': True} during initialization. "
"Please create a new Vector Store with runtime = {'tensor_db': True} or deep copy with runtime = {'tensor_db': True}."
"NOTE: Deep Memory is only available for datasets stored in the Deep Lake Managed Database for paid accounts."
"Deep Memory is not available for this organization."
"Deep Memory is only available for waitlisted accounts."
)

utils.parse_search_args(
Expand Down Expand Up @@ -535,6 +534,9 @@ def search(
embedding_tensor=embedding_tensor,
return_tensors=return_tensors,
return_view=return_view,
deep_memory=deep_memory,
token=self.token,
org_id=self.org_id,
)

def delete(
Expand Down
11 changes: 3 additions & 8 deletions deeplake/core/vectorstore/deepmemory_vectorstore.py
Expand Up @@ -6,7 +6,6 @@
from deeplake.core.vectorstore.deeplake_vectorstore import VectorStore
from deeplake.core.vectorstore.deep_memory import DeepMemory
from deeplake.constants import DEFAULT_DEEPMEMORY_DISTANCE_METRIC
from deeplake.util.exceptions import LockedException


class DeepMemoryVectorStore(VectorStore):
Expand All @@ -17,6 +16,7 @@ def __init__(self, client, *arg, **kwargs):
token=self.token,
embedding_function=self.embedding_function,
client=client,
creds=self.creds,
)

def search(
Expand All @@ -34,12 +34,6 @@ def search(
return_view: bool = False,
deep_memory: bool = False,
) -> Union[Dict, Dataset]:
if exec_option is not None and exec_option != "tensor_db":
self.logger.warning(
"Specifying `exec_option` is not supported for this dataset. "
"The search will be executed on the Deep Lake Managed Database."
)

if deep_memory and not distance_metric:
distance_metric = DEFAULT_DEEPMEMORY_DISTANCE_METRIC

Expand All @@ -51,8 +45,9 @@ def search(
distance_metric=distance_metric,
query=query,
filter=filter,
exec_option="tensor_db",
exec_option=exec_option,
embedding_tensor=embedding_tensor,
return_tensors=return_tensors,
return_view=return_view,
deep_memory=deep_memory,
)
32 changes: 32 additions & 0 deletions deeplake/core/vectorstore/test_deepmemory.py
Expand Up @@ -6,6 +6,7 @@

import deeplake
from deeplake import VectorStore
from deeplake.tests.common import requires_libdeeplake


class DummyEmbedder:
Expand Down Expand Up @@ -120,6 +121,7 @@ def test_deepmemory_train_and_cancel(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -201,6 +203,7 @@ def test_deepmemory_evaluate(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_log_queries(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -242,6 +245,7 @@ def test_deepmemory_evaluate_log_queries(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_without_branch_name_with_logging(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -280,6 +284,7 @@ def test_deepmemory_evaluate_without_branch_name_with_logging(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_without_logging(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -317,6 +322,7 @@ def test_deepmemory_evaluate_without_logging(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_without_branch_name(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -353,6 +359,7 @@ def test_deepmemory_evaluate_without_branch_name(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_without_qvs_params(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -390,6 +397,7 @@ def test_deepmemory_evaluate_without_qvs_params(
@pytest.mark.slow
@pytest.mark.timeout(600)
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@requires_libdeeplake
def test_deepmemory_evaluate_with_embedding_func_in_init(
corpus_query_relevances_copy,
questions_embeddings_and_relevances,
Expand Down Expand Up @@ -533,3 +541,27 @@ def test_deepmemory_search(
output = db.search(embedding=query_embedding, exec_option="compute_engine")
assert len(output) == 4
# TODO: add some logging checks


@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@pytest.mark.slow
@requires_libdeeplake
def test_deepmemory_search_on_local_datasets(
deep_memory_local_dataset, hub_cloud_dev_token
):
corpus_path, queries_path = deep_memory_local_dataset

corpus = VectorStore(path=corpus_path, token=hub_cloud_dev_token)
queries = VectorStore(path=queries_path, token=hub_cloud_dev_token)

deep_memory_query_view = queries.search(
query="SELECT * WHERE deep_memory_recall > vector_search_recall",
return_view=True,
)

query_embedding = deep_memory_query_view[0].embedding.data()["value"]
correct_id = deep_memory_query_view[0].metadata.data()["value"]["relvence"][0][0]

output = corpus.search(embedding=query_embedding, deep_memory=True, k=10)

assert correct_id in output["id"]
2 changes: 0 additions & 2 deletions deeplake/core/vectorstore/vector_search/dataset/dataset.py
Expand Up @@ -8,7 +8,6 @@
import numpy as np

import deeplake
from deeplake.util.path import get_path_type
from deeplake.core.vectorstore.vector_search import utils
from deeplake.core.vectorstore.vector_search.ingestion import ingest_data
from deeplake.constants import (
Expand Down Expand Up @@ -47,7 +46,6 @@ def create_or_load_dataset(
utils.check_indra_installation(
exec_option=exec_option, indra_installed=_INDRA_INSTALLED
)
org_id = org_id if get_path_type(dataset_path) == "local" else None

if not overwrite and dataset_exists(dataset_path, token, creds, **kwargs):
if tensor_params is not None and tensor_params != DEFAULT_VECTORSTORE_TENSORS:
Expand Down
26 changes: 26 additions & 0 deletions deeplake/core/vectorstore/vector_search/indra/search_algorithm.py
Expand Up @@ -18,6 +18,9 @@ def search(
runtime: dict,
return_tensors: List[str],
return_view: bool = False,
deep_memory: bool = False,
token: Optional[str] = None,
org_id: Optional[str] = None,
) -> Union[Dict, DeepLakeDataset]:
"""Generalized search algorithm that uses indra. It combines vector search and other TQL queries.
Expand All @@ -32,6 +35,9 @@ def search(
runtime (dict): Runtime parameters for the query.
return_tensors (List[str]): List of tensors to return data for.
return_view (bool): Return a Deep Lake dataset view that satisfied the search parameters, instead of a dictinary with data. Defaults to False.
deep_memory (bool): Use DeepMemory for the search. Defaults to False.
token (Optional[str], optional): Token used for authentication. Defaults to None.
org_id (Optional[str], optional): Organization ID, is needed only for local datasets. Defaults to None.
Raises:
ValueError: If both tql_string and tql_filter are specified.
Expand Down Expand Up @@ -65,6 +71,26 @@ def search(
tql_query, runtime=runtime, return_data=True
)
return_data = data
elif deep_memory:
if not INDRA_INSTALLED:
raise raise_indra_installation_error(indra_import_error=None)

from deeplake.enterprise.convert_to_libdeeplake import import_indra_api

api = import_indra_api()

indra_dataset = api.dataset(deeplake_dataset.path, token=token, org_id=org_id)
api.tql.prepare_deepmemory_metrics(indra_dataset)

indra_view = indra_dataset.query(tql_query)
indexes = indra_view.indexes
view = deeplake_dataset[indexes]

return_data = {}

for tensor in view.tensors:
return_data[tensor] = utils.parse_tensor_return(view[tensor])

else:
if not INDRA_INSTALLED:
raise raise_indra_installation_error(
Expand Down
Expand Up @@ -18,6 +18,9 @@ def vector_search(
k,
return_tensors,
return_view,
deep_memory,
token,
org_id,
) -> Union[Dict, DeepLakeDataset]:
try:
from indra import api # type: ignore
Expand Down Expand Up @@ -52,4 +55,7 @@ def vector_search(
runtime=runtime,
return_tensors=return_tensors,
return_view=return_view,
deep_memory=deep_memory,
token=token,
org_id=org_id,
)

0 comments on commit 99bc81a

Please sign in to comment.