Skip to content

Commit

Permalink
DeepMemory UX fixes (#2692)
Browse files Browse the repository at this point in the history
* listing jobs with no jobs

* Adding type checking for deepmemory args

* added pydantic

* changing search to use embedding_fumction.embed_query instead of embedding_fumction.embed_documents

* adding dm search tests

* added relevence and query pkl file

* fixing error when the best model doesn't get written to the embedding.meta

* fixing list_jobs and status reporting

* mypy fix

---------

Co-authored-by: adolkhan <adilkhan.sarsen@alumni.nu.edu.kz>
  • Loading branch information
adolkhan and adolkhan committed Nov 15, 2023
1 parent 9a01de0 commit 41b9e92
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 52 deletions.
34 changes: 32 additions & 2 deletions deeplake/client/utils.py
Expand Up @@ -280,19 +280,44 @@ def get_results(
progress = response["progress"]
for progress_key, progress_value in progress.items():
if progress_key == BEST_RECALL:
# verify that the recall and improvement coincide with the best recall
recall, improvement = get_best_recall_improvement(
recall, improvement, progress_value
)
if "(" not in improvement:
improvement = f"(+{improvement}%)"

output = f"recall@10: {str(recall)}% {improvement}"
return output


def get_best_recall_improvement(recall, improvement, best_recall):
brecall, bimprovement = get_recall_improvement(best_recall)
if float(improvement) > float(bimprovement):
return recall, improvement
elif float(improvement) < float(bimprovement):
return brecall, bimprovement
else:
return recall, improvement


def remove_paranthesis(string: str):
return string.replace("(", "").replace(")", "")


def get_recall_improvement(best_recall):
recall, improvement = best_recall.split(" ")
recall = recall[:-1]
improvement = remove_paranthesis(improvement).replace("+", "")[:-1]
return recall, improvement


def preprocess_progress(
response: Dict[str, Any],
progress_indent: str,
recall: str,
improvement: str,
add_vertical_bars: bool = False,
recall: Optional[str] = None,
improvement: Optional[str] = None,
):
allowed_progress_items = ["eta", BEST_RECALL, "error"]
progress_indent = (
Expand Down Expand Up @@ -355,6 +380,11 @@ def preprocess_progress(
key = "recall@10"
recall = recall or value.split("%")[0]
improvement = improvement or value.split("%")[1]

recall, improvement = get_best_recall_improvement(
recall, improvement, value
)

if "(" not in improvement:
improvement = f"(+{improvement}%)"
key_value_pair = f"{key}: {recall}% {improvement}"
Expand Down
49 changes: 46 additions & 3 deletions deeplake/core/vectorstore/deep_memory.py
@@ -1,13 +1,18 @@
import logging
import uuid
from collections import defaultdict
from pydantic import BaseModel, ValidationError
from typing import Any, Dict, Optional, List, Union, Callable, Tuple
from time import time

import numpy as np

import deeplake
from deeplake.enterprise.dataloader import indra_available
from deeplake.util.exceptions import (
IncorrectRelevanceTypeError,
IncorrectQueriesTypeError,
)
from deeplake.util.remove_cache import get_base_storage
from deeplake.constants import (
DEFAULT_QUERIES_VECTORSTORE_TENSORS,
Expand All @@ -28,6 +33,26 @@
from deeplake.util.version_control import load_meta


class Relevance(BaseModel):
data: List[List[Tuple[str, int]]]


class Queries(BaseModel):
data: List[str]


def validate_relevance_and_queries(relevance, queries):
try:
Relevance(data=relevance)
except ValidationError:
raise IncorrectRelevanceTypeError()

try:
Queries(data=queries)
except ValidationError:
raise IncorrectQueriesTypeError()


class DeepMemory:
def __init__(
self,
Expand Down Expand Up @@ -110,6 +135,8 @@ def train(
token=token or self.token,
)

validate_relevance_and_queries(relevance=relevance, queries=queries)

# TODO: Support for passing query_embeddings directly without embedding function
corpus_path = self.dataset.path
queries_path = corpus_path + "_queries"
Expand Down Expand Up @@ -278,7 +305,13 @@ def list_jobs(self, debug=False):

response_status_schema = JobResponseStatusSchema(response=response)

jobs = [job["id"] for job in response]
jobs = self._get_jobs(response)
if jobs is None:
reposnse_str = "No Deep Memory training jobs were found for this dataset"
print(reposnse_str)
if debug:
return reposnse_str
return None

recalls = {}
deltas = {}
Expand Down Expand Up @@ -407,6 +440,7 @@ def evaluate(
api.tql.prepare_deepmemory_metrics(indra_dataset)

parsed_qvs_params = parse_queries_params(qvs_params)
validate_relevance_and_queries(relevance=relevance, queries=queries)

start = time()
query_embs: Union[List[np.ndarray], List[List[float]]]
Expand Down Expand Up @@ -484,6 +518,12 @@ def evaluate(
self.queries_dataset.commit()
return recalls

def _get_jobs(self, response):
jobs = None
if response is not None and len(response) > 0:
jobs = [job["id"] for job in response]
return jobs


def recall_at_k(
indra_dataset: Any,
Expand Down Expand Up @@ -588,8 +628,11 @@ def _get_best_model(embedding: Any, job_id: str, latest_job: bool = False):
best_recall = 0
best_delta = 0
if latest_job:
best_recall = info["deepmemory/model.npy"]["recall@10"]
best_delta = info["deepmemory/model.npy"]["delta"]
try:
best_recall = info["deepmemory/model.npy"]["recall@10"]
best_delta = info["deepmemory/model.npy"]["delta"]
except KeyError:
pass

for job, value in info.items():
if job_id in job:
Expand Down
18 changes: 15 additions & 3 deletions deeplake/core/vectorstore/test_deeplake_vectorstore.py
Expand Up @@ -3,6 +3,7 @@
import sys
from math import isclose
from functools import partial
from typing import List

import numpy as np
import pytest
Expand Down Expand Up @@ -47,6 +48,16 @@
)


class OpenAILikeEmbedder:
def embed_documents(self, docs: List[str]):
return [np.ones(EMBEDDING_DIM) for _ in range(len(docs))]

def embed_query(self, query: str):
if not isinstance(query, str):
raise ValueError("Query must be a string")
return np.ones(EMBEDDING_DIM)


def embedding_fn(text, embedding_dim=EMBEDDING_DIM):
return np.zeros((len(text), EMBEDDING_DIM)) # pragma: no cover

Expand Down Expand Up @@ -240,6 +251,7 @@ def test_creds(gcs_path, gcs_creds):
@pytest.mark.slow
@requires_libdeeplake
def test_search_basic(local_path, hub_cloud_dev_token):
openai_embeddings = OpenAILikeEmbedder()
"""Test basic search features"""
# Initialize vector store object and add data
vector_store = DeepLakeVectorStore(
Expand Down Expand Up @@ -461,7 +473,7 @@ def filter_fn(x):
vector_store.add(embedding=embeddings, text=texts, metadata=metadatas)

data = vector_store.search(
embedding_function=embedding_fn3,
embedding_function=openai_embeddings.embed_query,
embedding_data=["dummy"],
return_view=True,
k=2,
Expand All @@ -471,7 +483,7 @@ def filter_fn(x):
assert data.embedding[0].numpy().size > 0

data = vector_store.search(
embedding_function=embedding_fn3,
embedding_function=openai_embeddings.embed_query,
embedding_data="dummy",
return_view=True,
k=2,
Expand Down Expand Up @@ -509,7 +521,7 @@ def filter_fn(x):

# Test that the embedding function during initalization works
vector_store = DeepLakeVectorStore(
path="mem://xyz", embedding_function=embedding_fn3
path="mem://xyz", embedding_function=openai_embeddings
)
assert vector_store.exec_option == "python"
vector_store.add(embedding=embeddings, text=texts, metadata=metadatas)
Expand Down
106 changes: 83 additions & 23 deletions deeplake/core/vectorstore/test_deepmemory.py
Expand Up @@ -10,7 +10,11 @@
from deeplake.core.vectorstore.unsupported_deep_memory import (
DeepMemory as UnsupportedDeepMemory,
)
from deeplake.util.exceptions import DeepMemoryWaitingListError
from deeplake.util.exceptions import (
DeepMemoryWaitingListError,
IncorrectQueriesTypeError,
IncorrectRelevanceTypeError,
)


class DummyEmbedder:
Expand Down Expand Up @@ -522,55 +526,54 @@ def test_deepmemory_status(capsys, job_id, corpus_query_pair_path, hub_cloud_dev
assert status.out[511:] == output_str[511:]


@pytest.mark.slow
@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
def test_deepmemory_search(
corpus_query_pair_path,
corpus_query_relevances_copy,
testing_relevance_query_deepmemory,
hub_cloud_dev_token,
):
corpus, _ = corpus_query_pair_path
corpus, _, _, _ = corpus_query_relevances_copy
relevance, query_embedding = testing_relevance_query_deepmemory

db = VectorStore(
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
)

query_embedding = np.random.uniform(low=-10, high=10, size=(1536)).astype(
np.float32
output = db.search(
embedding=query_embedding, deep_memory=True, return_tensors=["id"]
)

output = db.search(embedding=query_embedding)

assert db.deep_memory is not None
assert len(output) == 4
assert len(output["id"]) == 4
assert relevance in output["id"]

output = db.search(embedding=query_embedding, exec_option="compute_engine")
assert len(output) == 4
output = db.search(embedding=query_embedding)
assert len(output["id"]) == 4
assert relevance not in output["id"]
# TODO: add some logging checks


@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@pytest.mark.slow
@requires_libdeeplake
def test_deepmemory_search_on_local_datasets(
deep_memory_local_dataset, hub_cloud_dev_token
deep_memory_local_dataset,
testing_relevance_query_deepmemory,
hub_cloud_dev_token,
):
corpus_path, queries_path = deep_memory_local_dataset
corpus_path = deep_memory_local_dataset
relevance, query_embedding = testing_relevance_query_deepmemory

corpus = VectorStore(path=corpus_path, token=hub_cloud_dev_token)
queries = VectorStore(path=queries_path, token=hub_cloud_dev_token)

deep_memory_query_view = queries.search(
query="SELECT * WHERE deep_memory_recall > vector_search_recall",
return_view=True,
)

query_embedding = deep_memory_query_view[0].embedding.data()["value"]
correct_id = deep_memory_query_view[0].metadata.data()["value"]["relvence"][0][0]

output = corpus.search(embedding=query_embedding, deep_memory=True, k=10)
assert relevance in output["id"]
assert "score" in output

assert correct_id in output["id"]
output = corpus.search(embedding=query_embedding, deep_memory=False, k=10)
assert relevance not in output["id"]
assert "score" in output


Expand All @@ -596,3 +599,60 @@ def test_unsupported_deepmemory_users():
queries=[],
relevance=[],
)


@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@pytest.mark.slow
@requires_libdeeplake
def test_deepmemory_list_jobs_with_no_jobs(
corpus_query_relevances_copy, hub_cloud_dev_token
):
corpus, queries, relevances, _ = corpus_query_relevances_copy

db = VectorStore(
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
)

output_str = db.deep_memory.list_jobs(debug=True)
assert output_str == "No Deep Memory training jobs were found for this dataset"


@pytest.mark.skipif(sys.platform == "win32", reason="Does not run on Windows")
@pytest.mark.slow
@requires_libdeeplake
def test_not_supported_training_args(corpus_query_relevances_copy, hub_cloud_dev_token):
corpus, queries, relevances, _ = corpus_query_relevances_copy

db = VectorStore(
path=corpus,
runtime={"tensor_db": True},
token=hub_cloud_dev_token,
)

with pytest.raises(IncorrectQueriesTypeError):
db.deep_memory.train(
queries="queries",
relevance=relevances,
embedding_function=embedding_fn,
)

with pytest.raises(IncorrectRelevanceTypeError):
db.deep_memory.train(
queries=queries,
relevance="relevances",
embedding_function=embedding_fn,
)

with pytest.raises(IncorrectQueriesTypeError):
db.deep_memory.evaluate(
queries="queries",
relevance=relevances,
)

with pytest.raises(IncorrectRelevanceTypeError):
db.deep_memory.evaluate(
queries=queries,
relevance="relevances",
)

0 comments on commit 41b9e92

Please sign in to comment.