From 3b60df3b820f2a9ca49e889cdc3ee3fa78a809e9 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 8 Mar 2024 10:47:36 +0200 Subject: [PATCH 01/15] Add ingestion fuction for ingesting files to vector search --- src/tiledb/cloud/vector_search/__init__.py | 5 + .../cloud/vector_search/file_ingestion.py | 313 ++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 src/tiledb/cloud/vector_search/__init__.py create mode 100644 src/tiledb/cloud/vector_search/file_ingestion.py diff --git a/src/tiledb/cloud/vector_search/__init__.py b/src/tiledb/cloud/vector_search/__init__.py new file mode 100644 index 00000000..1df365e8 --- /dev/null +++ b/src/tiledb/cloud/vector_search/__init__.py @@ -0,0 +1,5 @@ +from .file_ingestion import ingest_files + +__all__ = [ + "ingest_files", +] diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py new file mode 100644 index 00000000..ca84f775 --- /dev/null +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -0,0 +1,313 @@ +from typing import Any, Mapping, Optional, Sequence, Union, Dict, List + +from tiledb.cloud import dag +from tiledb.cloud.utilities import as_batch + +def ingest_files_dag( + file_dir_uri: str, + index_uri: str, + file_name: Optional[str] = None, + acn: Optional[str] = None, + config=None, + namespace: Optional[str] = None, + verbose: bool = False, + trace_id: Optional[str] = None, + # Index creation params + create_index: bool = True, + index_type: str = "IVF_FLAT", + index_creation_kwargs: Dict = {}, + # DirectoryTextReader params + glob: str = "**/[!.]*", + exclude: Sequence[str] = (), + suffixes: Optional[Sequence[str]] = None, + text_splitter: str = "RecursiveCharacterTextSplitter", + text_splitter_kwargs: Optional[Dict] = { + "chunk_size": 500, + "chunk_overlap": 50, + }, + # SentenceTransformersEmbedding params + model_name_or_path: str = 'BAAI/bge-small-en-v1.5', + # Index update params + index_timestamp: int = None, + workers: int = -1, + worker_resources: Dict = None, + worker_image: str = None, + extra_worker_modules: Optional[List[str]] = None, + driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, + driver_image: str = None, + extra_driver_modules: Optional[List[str]] = None, + max_tasks_per_stage: int = -1, + embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, + embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, + vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, + index_update_kwargs: Dict = {"files_per_partition": 100}, +): + """ + Ingest files into a vector search text index. + + :param file_dir_uri: directory of the files to be loaded. For individual files, + also pass the `file_name` param. + :param index_uri: URI of the vector index to load files to. + :param file_name: Name of the file to be loaded. + :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), + defaults to None. + :param config: config dictionary, defaults to None. + :param namespace: TileDB-Cloud namespace, defaults to None. + :param verbose: verbose logging, defaults to False. + :param trace_id: trace ID for logging, defaults to None. + # Index creation params + :param create_index: If true, creates a new vector search index. + :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). + :param index_creation_kwargs: Arguments to be passed to the index creation method + # DirectoryTextReader params. + :param glob: Glob pattern relative to the specified path by default set to pick up + all non-hidden files. + :param exclude: Patterns to exclude from results, use glob syntax. + :param suffixes: Load only files with these suffixes. Suffixes must include the + dot, e.g. ".txt". + :param text_splitter_kwargs: Arguments for the splitter class. + # SentenceTransformersEmbedding params. + :param model_name_or_path: Huggingface SentenceTransformer model name or path + # Index update params. + :param index_timestamp: Timestamp to add index updates at. + :param workers: If `embeddings_generation_mode=BATCH` this is the number of + distributed workers to be used. + :param worker_resources: If `embeddings_generation_mode=BATCH` this can be used + to specify the worker resources. + :param worker_image: If `embeddings_generation_mode=BATCH` this can be used + to specify the worker Docker image. + :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used + to install extra pip package to the image. + :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be used + to specify the driver resources. + :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used + to specify the driver Docker image. + :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can + be used to install extra pip package to the image. + :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. + :param embeddings_generation_mode: TaskGraph execution mode for embeddings generation. + :param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion + driver. + :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. + :param index_update_kwargs: Extra arguments to pass to the index update job. + """ + def ingest_files_udf( + file_dir_uri: str, + index_uri: str, + file_name: Optional[str] = None, + acn: Optional[str] = None, + config=None, + namespace: Optional[str] = None, + verbose: bool = False, + trace_id: Optional[str] = None, + # Index creation params + create_index: bool = True, + index_type: str = "IVF_FLAT", + index_creation_kwargs: Dict = {}, + # DirectoryTextReader params + glob: str = "**/[!.]*", + exclude: Sequence[str] = (), + suffixes: Optional[Sequence[str]] = None, + text_splitter: str = "RecursiveCharacterTextSplitter", + text_splitter_kwargs: Optional[Dict] = { + "chunk_size": 500, + "chunk_overlap": 50, + }, + # SentenceTransformersEmbedding params + model_name_or_path: str = 'BAAI/bge-small-en-v1.5', + # Index update params + index_timestamp: int = None, + workers: int = -1, + worker_resources: Dict = None, + worker_image: str = None, + extra_worker_modules: Optional[List[str]] = None, + driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, + driver_image: str = None, + extra_driver_modules: Optional[List[str]] = None, + max_tasks_per_stage: int = -1, + embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, + embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, + vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, + index_update_kwargs: Dict = {"files_per_partition": 100}, + ): + """ + Ingest files into a vector search text index. + + :param file_dir_uri: directory of the files to be loaded. For individual files, + also pass the `file_name` param. + :param index_uri: URI of the vector index to load files to. + :param file_name: Name of the file to be loaded. + :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), + defaults to None. + :param config: config dictionary, defaults to None. + :param namespace: TileDB-Cloud namespace, defaults to None. + :param verbose: verbose logging, defaults to False. + :param trace_id: trace ID for logging, defaults to None. + # Index creation params + :param create_index: If true, creates a new vector search index. + :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). + :param index_creation_kwargs: Arguments to be passed to the index creation method + # DirectoryTextReader params. + :param glob: Glob pattern relative to the specified path by default set to pick up + all non-hidden files. + :param exclude: Patterns to exclude from results, use glob syntax. + :param suffixes: Load only files with these suffixes. Suffixes must include the + dot, e.g. ".txt". + :param text_splitter_kwargs: Arguments for the splitter class. + # SentenceTransformersEmbedding params. + :param model_name_or_path: Huggingface SentenceTransformer model name or path + # Index update params. + :param index_timestamp: Timestamp to add index updates at. + :param workers: If `embeddings_generation_mode=BATCH` this is the number of + distributed workers to be used. + :param worker_resources: If `embeddings_generation_mode=BATCH` this can be used + to specify the worker resources. + :param worker_image: If `embeddings_generation_mode=BATCH` this can be used + to specify the worker Docker image. + :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used + to install extra pip package to the image. + :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be used + to specify the driver resources. + :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used + to specify the driver Docker image. + :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can + be used to install extra pip package to the image. + :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. + :param embeddings_generation_mode: TaskGraph execution mode for embeddings generation. + :param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion + driver. + :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. + :param index_update_kwargs: Extra arguments to pass to the index update job. + """ + import tiledb + # Install extra packages not yet available in vectorsearch UDF image. + # TODO(nikos) remove the pacakge installations after the image is updated + def install_extra_worker_modules(): + import os + import subprocess + import sys + + sys.path.insert(0, "/home/udf/.local/bin") + sys.path.insert(0, "/home/udf/.local/lib/python3.9/site-packages") + os.environ["PATH"] = f"/home/udf/.local/bin:{os.environ['PATH']}" + subprocess.check_call( + [ + sys.executable, "-m", "pip", "install", "--force-reinstall", + "tiledb-vector-search==0.0.23", + "tiledb==0.25.0", + "tiledb-cloud==0.11.10", + ] + ) + subprocess.check_call( + [ + sys.executable, "-m", "pip", "install", + "transformers==4.37.1", + "PyMuPDF", + "beautifulsoup4", + ] + ) + install_extra_worker_modules() + import importlib + importlib.reload(tiledb) + + from tiledb.vector_search.object_api import object_index + from tiledb.vector_search.object_readers import DirectoryTextReader + from tiledb.vector_search.embeddings import SentenceTransformersEmbedding + + reader = DirectoryTextReader( + uri=file_dir_uri, + glob=f"**/{file_name}" if file_name is not None else glob, + exclude=exclude, + suffixes=suffixes, + text_splitter=text_splitter, + text_splitter_kwargs=text_splitter_kwargs, + ) + + embedding = SentenceTransformersEmbedding( + model_name_or_path=model_name_or_path, + ) + if create_index: + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + **index_creation_kwargs, + ) + else: + index = object_index.ObjectIndex( + uri=index_uri, + load_metadata_in_memory=False, + memory_budget=1 + ) + + index.update_index( + index_timestamp=index_timestamp, + workers=workers, + worker_resources=worker_resources, + worker_image=worker_image, + extra_worker_modules=extra_worker_modules, + driver_resources=driver_resources, + driver_image=driver_image, + extra_driver_modules=extra_driver_modules, + worker_access_credentials_name=acn, + max_tasks_per_stage=max_tasks_per_stage, + embeddings_generation_mode=embeddings_generation_mode, + embeddings_generation_driver_mode=embeddings_generation_driver_mode, + vector_indexing_mode=vector_indexing_mode, + config=config, + namespace=namespace, + verbose=verbose, + trace_id=trace_id, + **index_update_kwargs, + ) + + graph = dag.DAG( + name="file-vector-search-ingestion", + mode=dag.Mode.BATCH, + max_workers=1, + namespace=namespace, + ) + graph.submit( + ingest_files_udf, + file_dir_uri, + index_uri, + name="file-vector-search-ingestion", + access_credentials_name=acn, + resources=driver_resources, + image_name="vectorsearch", + acn=acn, + config=config, + namespace=namespace, + verbose=verbose, + trace_id=trace_id, + create_index=create_index, + index_type=index_type, + index_creation_kwargs=index_creation_kwargs, + file_name=file_name, + glob=glob, + exclude=exclude, + suffixes=suffixes, + text_splitter=text_splitter, + text_splitter_kwargs=text_splitter_kwargs, + model_name_or_path=model_name_or_path, + index_timestamp=index_timestamp, + workers=workers, + worker_resources=worker_resources, + worker_image=worker_image, + extra_worker_modules=extra_worker_modules, + driver_resources=driver_resources, + driver_image=driver_image, + extra_driver_modules=extra_driver_modules, + max_tasks_per_stage=max_tasks_per_stage, + embeddings_generation_mode=embeddings_generation_mode, + embeddings_generation_driver_mode=embeddings_generation_driver_mode, + vector_indexing_mode=vector_indexing_mode, + index_update_kwargs=index_update_kwargs, + ) + graph.compute() + graph.wait() + + +ingest_files = as_batch(ingest_files_dag) \ No newline at end of file From 6671ab074036002be4b5a3c9eb5dc43af47f931e Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 8 Mar 2024 10:51:43 +0200 Subject: [PATCH 02/15] Format --- .../cloud/vector_search/file_ingestion.py | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index ca84f775..abbe54fb 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -1,8 +1,9 @@ -from typing import Any, Mapping, Optional, Sequence, Union, Dict, List +from typing import Dict, List, Optional, Sequence from tiledb.cloud import dag from tiledb.cloud.utilities import as_batch + def ingest_files_dag( file_dir_uri: str, index_uri: str, @@ -26,7 +27,7 @@ def ingest_files_dag( "chunk_overlap": 50, }, # SentenceTransformersEmbedding params - model_name_or_path: str = 'BAAI/bge-small-en-v1.5', + model_name_or_path: str = "BAAI/bge-small-en-v1.5", # Index update params index_timestamp: int = None, workers: int = -1, @@ -91,6 +92,7 @@ def ingest_files_dag( :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ + def ingest_files_udf( file_dir_uri: str, index_uri: str, @@ -114,7 +116,7 @@ def ingest_files_udf( "chunk_overlap": 50, }, # SentenceTransformersEmbedding params - model_name_or_path: str = 'BAAI/bge-small-en-v1.5', + model_name_or_path: str = "BAAI/bge-small-en-v1.5", # Index update params index_timestamp: int = None, workers: int = -1, @@ -180,6 +182,7 @@ def ingest_files_udf( :param index_update_kwargs: Extra arguments to pass to the index update job. """ import tiledb + # Install extra packages not yet available in vectorsearch UDF image. # TODO(nikos) remove the pacakge installations after the image is updated def install_extra_worker_modules(): @@ -192,7 +195,11 @@ def install_extra_worker_modules(): os.environ["PATH"] = f"/home/udf/.local/bin:{os.environ['PATH']}" subprocess.check_call( [ - sys.executable, "-m", "pip", "install", "--force-reinstall", + sys.executable, + "-m", + "pip", + "install", + "--force-reinstall", "tiledb-vector-search==0.0.23", "tiledb==0.25.0", "tiledb-cloud==0.11.10", @@ -200,28 +207,33 @@ def install_extra_worker_modules(): ) subprocess.check_call( [ - sys.executable, "-m", "pip", "install", + sys.executable, + "-m", + "pip", + "install", "transformers==4.37.1", - "PyMuPDF", + "PyMuPDF", "beautifulsoup4", ] ) + install_extra_worker_modules() import importlib + importlib.reload(tiledb) + from tiledb.vector_search.embeddings import SentenceTransformersEmbedding from tiledb.vector_search.object_api import object_index from tiledb.vector_search.object_readers import DirectoryTextReader - from tiledb.vector_search.embeddings import SentenceTransformersEmbedding reader = DirectoryTextReader( - uri=file_dir_uri, - glob=f"**/{file_name}" if file_name is not None else glob, - exclude=exclude, - suffixes=suffixes, - text_splitter=text_splitter, - text_splitter_kwargs=text_splitter_kwargs, - ) + uri=file_dir_uri, + glob=f"**/{file_name}" if file_name is not None else glob, + exclude=exclude, + suffixes=suffixes, + text_splitter=text_splitter, + text_splitter_kwargs=text_splitter_kwargs, + ) embedding = SentenceTransformersEmbedding( model_name_or_path=model_name_or_path, @@ -237,9 +249,7 @@ def install_extra_worker_modules(): ) else: index = object_index.ObjectIndex( - uri=index_uri, - load_metadata_in_memory=False, - memory_budget=1 + uri=index_uri, load_metadata_in_memory=False, memory_budget=1 ) index.update_index( @@ -308,6 +318,6 @@ def install_extra_worker_modules(): ) graph.compute() graph.wait() - - -ingest_files = as_batch(ingest_files_dag) \ No newline at end of file + + +ingest_files = as_batch(ingest_files_dag) From df9fbecdb097d3b683e825431c9d380e4cf60c96 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 8 Mar 2024 11:01:15 +0200 Subject: [PATCH 03/15] Format --- .../cloud/vector_search/file_ingestion.py | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index abbe54fb..9c261fd8 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -79,14 +79,15 @@ def ingest_files_dag( to specify the worker Docker image. :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used to install extra pip package to the image. - :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be used - to specify the driver resources. + :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be + used to specify the driver resources. :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used to specify the driver Docker image. :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can be used to install extra pip package to the image. :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. - :param embeddings_generation_mode: TaskGraph execution mode for embeddings generation. + :param embeddings_generation_mode: TaskGraph execution mode for embeddings + generation. :param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion driver. :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. @@ -148,13 +149,14 @@ def ingest_files_udf( # Index creation params :param create_index: If true, creates a new vector search index. :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). - :param index_creation_kwargs: Arguments to be passed to the index creation method + :param index_creation_kwargs: Arguments to be passed to the index creation + method. # DirectoryTextReader params. - :param glob: Glob pattern relative to the specified path by default set to pick up - all non-hidden files. + :param glob: Glob pattern relative to the specified path by default set to + pick up all non-hidden files. :param exclude: Patterns to exclude from results, use glob syntax. - :param suffixes: Load only files with these suffixes. Suffixes must include the - dot, e.g. ".txt". + :param suffixes: Load only files with these suffixes. Suffixes must include + the dot, e.g. ".txt". :param text_splitter_kwargs: Arguments for the splitter class. # SentenceTransformersEmbedding params. :param model_name_or_path: Huggingface SentenceTransformer model name or path @@ -166,18 +168,19 @@ def ingest_files_udf( to specify the worker resources. :param worker_image: If `embeddings_generation_mode=BATCH` this can be used to specify the worker Docker image. - :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used - to install extra pip package to the image. - :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be used - to specify the driver resources. - :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used - to specify the driver Docker image. - :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can - be used to install extra pip package to the image. + :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be + used to install extra pip package to the image. + :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can + be used to specify the driver resources. + :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be + used to specify the driver Docker image. + :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this + can be used to install extra pip package to the image. :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. - :param embeddings_generation_mode: TaskGraph execution mode for embeddings generation. - :param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion - driver. + :param embeddings_generation_mode: TaskGraph execution mode for embeddings + generation. + :param embeddings_generation_driver_mode: TaskGraph execution mode for the + ingestion driver. :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ From 4e9ebbdb4621b7bf4076376f496e0bbd275aa497 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Tue, 12 Mar 2024 09:13:49 +0200 Subject: [PATCH 04/15] Address review comments --- .../cloud/vector_search/file_ingestion.py | 90 ++++++++----------- 1 file changed, 38 insertions(+), 52 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 9c261fd8..3700ac5c 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -14,7 +14,7 @@ def ingest_files_dag( verbose: bool = False, trace_id: Optional[str] = None, # Index creation params - create_index: bool = True, + create_index: bool = False, index_type: str = "IVF_FLAT", index_creation_kwargs: Dict = {}, # DirectoryTextReader params @@ -185,50 +185,17 @@ def ingest_files_udf( :param index_update_kwargs: Extra arguments to pass to the index update job. """ import tiledb - - # Install extra packages not yet available in vectorsearch UDF image. - # TODO(nikos) remove the pacakge installations after the image is updated - def install_extra_worker_modules(): - import os - import subprocess - import sys - - sys.path.insert(0, "/home/udf/.local/bin") - sys.path.insert(0, "/home/udf/.local/lib/python3.9/site-packages") - os.environ["PATH"] = f"/home/udf/.local/bin:{os.environ['PATH']}" - subprocess.check_call( - [ - sys.executable, - "-m", - "pip", - "install", - "--force-reinstall", - "tiledb-vector-search==0.0.23", - "tiledb==0.25.0", - "tiledb-cloud==0.11.10", - ] - ) - subprocess.check_call( - [ - sys.executable, - "-m", - "pip", - "install", - "transformers==4.37.1", - "PyMuPDF", - "beautifulsoup4", - ] - ) - - install_extra_worker_modules() - import importlib - - importlib.reload(tiledb) - from tiledb.vector_search.embeddings import SentenceTransformersEmbedding from tiledb.vector_search.object_api import object_index from tiledb.vector_search.object_readers import DirectoryTextReader + def index_exists( + index_uri: str, + config=None, + ) -> bool: + with tiledb.scope_ctx(config): + return tiledb.object_type(index_uri) == "group" + reader = DirectoryTextReader( uri=file_dir_uri, glob=f"**/{file_name}" if file_name is not None else glob, @@ -241,19 +208,38 @@ def install_extra_worker_modules(): embedding = SentenceTransformersEmbedding( model_name_or_path=model_name_or_path, ) + index_uri_exists = index_exists( + index_uri=index_uri, + config=config, + ) if create_index: - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - **index_creation_kwargs, - ) + if index_uri_exists: + raise ValueError( + f"Index: {index_uri} allready exists and `create_index` was set to True." + ) + else: + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + **index_creation_kwargs, + ) else: - index = object_index.ObjectIndex( - uri=index_uri, load_metadata_in_memory=False, memory_budget=1 - ) + if index_uri_exists: + index = object_index.ObjectIndex( + uri=index_uri, load_metadata_in_memory=False, memory_budget=1 + ) + else: + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + **index_creation_kwargs, + ) index.update_index( index_timestamp=index_timestamp, From dabe1f527d2417646190a94410c582c7c286d532 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Tue, 12 Mar 2024 09:18:54 +0200 Subject: [PATCH 05/15] Format --- src/tiledb/cloud/vector_search/file_ingestion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 3700ac5c..e91bc62f 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -215,7 +215,7 @@ def index_exists( if create_index: if index_uri_exists: raise ValueError( - f"Index: {index_uri} allready exists and `create_index` was set to True." + f"{index_uri} allready exists and `create_index` was set." ) else: index = object_index.create( From 6d59df5ecda6e2e0d3663442112f3b57c444c6e2 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 15 Mar 2024 13:21:01 +0200 Subject: [PATCH 06/15] Add support for OpenAI embeddings and consolidate directory listing args --- .../cloud/vector_search/file_ingestion.py | 118 ++++++++++++------ 1 file changed, 80 insertions(+), 38 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index e91bc62f..ec73d0d2 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Sequence +from typing import Dict, List, Mapping, Optional, Sequence from tiledb.cloud import dag from tiledb.cloud.utilities import as_batch @@ -10,6 +10,7 @@ def ingest_files_dag( file_name: Optional[str] = None, acn: Optional[str] = None, config=None, + environment_variables: Mapping[str, str] = {}, namespace: Optional[str] = None, verbose: bool = False, trace_id: Optional[str] = None, @@ -18,24 +19,33 @@ def ingest_files_dag( index_type: str = "IVF_FLAT", index_creation_kwargs: Dict = {}, # DirectoryTextReader params - glob: str = "**/[!.]*", - exclude: Sequence[str] = (), + include: str = "*", + exclude: Sequence[str] = ["[.]*", "*/[.]*"], suffixes: Optional[Sequence[str]] = None, + max_files: Optional[int] = None, text_splitter: str = "RecursiveCharacterTextSplitter", - text_splitter_kwargs: Optional[Dict] = { + text_splitter_kwargs: Dict = { "chunk_size": 500, "chunk_overlap": 50, }, - # SentenceTransformersEmbedding params - model_name_or_path: str = "BAAI/bge-small-en-v1.5", + # Embedding params + embedding_class: str = "LangChainEmbedding", + embedding_kwargs: Dict = { + "dimensions": 1536, + "embedding_class": "OpenAIEmbeddings", + "embedding_kwargs": { + "model": "text-embedding-ada-002", + }, + }, + openai_key: Optional[str] = None, # Index update params - index_timestamp: int = None, + index_timestamp: Optional[int] = None, workers: int = -1, - worker_resources: Dict = None, - worker_image: str = None, + worker_resources: Optional[Dict] = None, + worker_image: Optional[str] = None, extra_worker_modules: Optional[List[str]] = None, driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, - driver_image: str = None, + driver_image: Optional[str] = None, extra_driver_modules: Optional[List[str]] = None, max_tasks_per_stage: int = -1, embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, @@ -61,11 +71,14 @@ def ingest_files_dag( :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). :param index_creation_kwargs: Arguments to be passed to the index creation method # DirectoryTextReader params. - :param glob: Glob pattern relative to the specified path by default set to pick up - all non-hidden files. - :param exclude: Patterns to exclude from results, use glob syntax. - :param suffixes: Load only files with these suffixes. Suffixes must include the - dot, e.g. ".txt". + :param include: File pattern to iclude relative to `file_dir_uri`. By default set + to include all files. + :param exclude: File patterns to exclude relative to `file_dir_uri`. By default + set to ignore all hidden files. + :param suffixes: Provide to keep only files with these suffixes + Useful when wanting to keep files with different suffixes + Suffixes must include the dot, e.g. ".txt" + :param max_files: Maximum number of files to include. :param text_splitter_kwargs: Arguments for the splitter class. # SentenceTransformersEmbedding params. :param model_name_or_path: Huggingface SentenceTransformer model name or path @@ -100,32 +113,42 @@ def ingest_files_udf( file_name: Optional[str] = None, acn: Optional[str] = None, config=None, + environment_variables: Mapping[str, str] = {}, namespace: Optional[str] = None, verbose: bool = False, trace_id: Optional[str] = None, # Index creation params - create_index: bool = True, + create_index: bool = False, index_type: str = "IVF_FLAT", index_creation_kwargs: Dict = {}, # DirectoryTextReader params - glob: str = "**/[!.]*", - exclude: Sequence[str] = (), + include: str = "*", + exclude: Sequence[str] = ["[.]*", "*/[.]*"], suffixes: Optional[Sequence[str]] = None, + max_files: Optional[int] = None, text_splitter: str = "RecursiveCharacterTextSplitter", - text_splitter_kwargs: Optional[Dict] = { + text_splitter_kwargs: Dict = { "chunk_size": 500, "chunk_overlap": 50, }, - # SentenceTransformersEmbedding params - model_name_or_path: str = "BAAI/bge-small-en-v1.5", + # Embedding params + embedding_class: str = "LangChainEmbedding", + embedding_kwargs: Dict = { + "dimensions": 1536, + "embedding_class": "OpenAIEmbeddings", + "embedding_kwargs": { + "model": "text-embedding-ada-002", + }, + }, + openai_key: Optional[str] = None, # Index update params - index_timestamp: int = None, + index_timestamp: Optional[int] = None, workers: int = -1, - worker_resources: Dict = None, - worker_image: str = None, + worker_resources: Optional[Dict] = None, + worker_image: Optional[str] = None, extra_worker_modules: Optional[List[str]] = None, driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, - driver_image: str = None, + driver_image: Optional[str] = None, extra_driver_modules: Optional[List[str]] = None, max_tasks_per_stage: int = -1, embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, @@ -143,6 +166,7 @@ def ingest_files_udf( :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), defaults to None. :param config: config dictionary, defaults to None. + :param environment_variables: Environment variables to use during ingestion. :param namespace: TileDB-Cloud namespace, defaults to None. :param verbose: verbose logging, defaults to False. :param trace_id: trace ID for logging, defaults to None. @@ -152,11 +176,14 @@ def ingest_files_udf( :param index_creation_kwargs: Arguments to be passed to the index creation method. # DirectoryTextReader params. - :param glob: Glob pattern relative to the specified path by default set to - pick up all non-hidden files. - :param exclude: Patterns to exclude from results, use glob syntax. - :param suffixes: Load only files with these suffixes. Suffixes must include - the dot, e.g. ".txt". + :param include: File pattern to iclude relative to `file_dir_uri`. By default + set to include all files. + :param exclude: File patterns to exclude relative to `file_dir_uri`. By default + set to ignore all hidden files. + :param suffixes: Provide to keep only files with these suffixes + Useful when wanting to keep files with different suffixes + Suffixes must include the dot, e.g. ".txt" + :param max_files: Maximum number of files to include. :param text_splitter_kwargs: Arguments for the splitter class. # SentenceTransformersEmbedding params. :param model_name_or_path: Huggingface SentenceTransformer model name or path @@ -184,8 +211,9 @@ def ingest_files_udf( :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ + import importlib + import tiledb - from tiledb.vector_search.embeddings import SentenceTransformersEmbedding from tiledb.vector_search.object_api import object_index from tiledb.vector_search.object_readers import DirectoryTextReader @@ -198,16 +226,20 @@ def index_exists( reader = DirectoryTextReader( uri=file_dir_uri, - glob=f"**/{file_name}" if file_name is not None else glob, + include=f"{file_name}" if file_name is not None else include, exclude=exclude, suffixes=suffixes, + max_files=max_files, text_splitter=text_splitter, text_splitter_kwargs=text_splitter_kwargs, ) - embedding = SentenceTransformersEmbedding( - model_name_or_path=model_name_or_path, - ) + if openai_key is not None: + environment_variables["OPENAI_API_KEY"] = openai_key + embeddings_module = importlib.import_module("tiledb.vector_search.embeddings") + embedding_class_ = getattr(embeddings_module, embedding_class) + embedding = embedding_class_(**embedding_kwargs) + index_uri_exists = index_exists( index_uri=index_uri, config=config, @@ -224,12 +256,16 @@ def index_exists( object_reader=reader, embedding=embedding, config=config, + environment_variables=environment_variables, **index_creation_kwargs, ) else: if index_uri_exists: index = object_index.ObjectIndex( - uri=index_uri, load_metadata_in_memory=False, memory_budget=1 + uri=index_uri, + environment_variables=environment_variables, + load_metadata_in_memory=False, + memory_budget=1, ) else: index = object_index.create( @@ -238,6 +274,7 @@ def index_exists( object_reader=reader, embedding=embedding, config=config, + environment_variables=environment_variables, **index_creation_kwargs, ) @@ -256,6 +293,7 @@ def index_exists( embeddings_generation_driver_mode=embeddings_generation_driver_mode, vector_indexing_mode=vector_indexing_mode, config=config, + environment_variables=environment_variables, namespace=namespace, verbose=verbose, trace_id=trace_id, @@ -278,6 +316,7 @@ def index_exists( image_name="vectorsearch", acn=acn, config=config, + environment_variables=environment_variables, namespace=namespace, verbose=verbose, trace_id=trace_id, @@ -285,12 +324,15 @@ def index_exists( index_type=index_type, index_creation_kwargs=index_creation_kwargs, file_name=file_name, - glob=glob, + include=include, exclude=exclude, suffixes=suffixes, + max_files=max_files, text_splitter=text_splitter, text_splitter_kwargs=text_splitter_kwargs, - model_name_or_path=model_name_or_path, + embedding_class=embedding_class, + embedding_kwargs=embedding_class, + openai_key=openai_key, index_timestamp=index_timestamp, workers=workers, worker_resources=worker_resources, From a4ea59e957f7502a82e88f6f6ae1c7208b7d8be1 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Wed, 27 Mar 2024 11:55:39 +0200 Subject: [PATCH 07/15] Address review comments --- src/tiledb/cloud/utilities/_common.py | 14 +- .../cloud/vector_search/file_ingestion.py | 335 +++++------------- 2 files changed, 104 insertions(+), 245 deletions(-) diff --git a/src/tiledb/cloud/utilities/_common.py b/src/tiledb/cloud/utilities/_common.py index 360ba9c6..40502947 100644 --- a/src/tiledb/cloud/utilities/_common.py +++ b/src/tiledb/cloud/utilities/_common.py @@ -379,13 +379,24 @@ def as_batch(func: _CT) -> _CT: @functools.wraps(func) def wrapper(*args, **kwargs) -> Dict[str, object]: - """Run the function as a batch UDF on TileDB Cloud.""" + """ + Run the function as a batch UDF on TileDB Cloud. + + kwargs optionally includes: + - name: name of the node in the DAG, defaults to func.__name__ + - namespace: TileDB Cloud namespace, defaults to the user's default namespace + - acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type) + - access_credentials_name: alias for acn, for backwards compatibility + - resources: resources to allocate for the UDF, defaults to None + - image_name: Docker image_name to use for UDFs, defaults to None + """ name = kwargs.get("name", func.__name__) namespace = kwargs.get("namespace", None) acn = kwargs.get("acn", kwargs.pop("access_credentials_name", None)) kwargs["acn"] = acn # for backwards compatibility resources = kwargs.pop("resources", None) + image_name = kwargs.pop("image_name", None) # Create a new DAG graph = dag.DAG( @@ -401,6 +412,7 @@ def wrapper(*args, **kwargs) -> Dict[str, object]: name=name, access_credentials_name=acn, resources=resources, + image_name=image_name, **_filter_kwargs(func, kwargs), ) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index ec73d0d2..35722f12 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -4,33 +4,33 @@ from tiledb.cloud.utilities import as_batch -def ingest_files_dag( +def ingest_files_udf( file_dir_uri: str, index_uri: str, file_name: Optional[str] = None, acn: Optional[str] = None, config=None, - environment_variables: Mapping[str, str] = {}, + environment_variables: Optional[Mapping[str, str]] = None, namespace: Optional[str] = None, verbose: bool = False, trace_id: Optional[str] = None, # Index creation params create_index: bool = False, index_type: str = "IVF_FLAT", - index_creation_kwargs: Dict = {}, + index_creation_kwargs: Optional[Dict] = None, # DirectoryTextReader params include: str = "*", - exclude: Sequence[str] = ["[.]*", "*/[.]*"], + exclude: Optional[Sequence[str]] = ("[.]*", "*/[.]*"), suffixes: Optional[Sequence[str]] = None, max_files: Optional[int] = None, text_splitter: str = "RecursiveCharacterTextSplitter", - text_splitter_kwargs: Dict = { + text_splitter_kwargs: Optional[Dict] = { "chunk_size": 500, "chunk_overlap": 50, }, # Embedding params embedding_class: str = "LangChainEmbedding", - embedding_kwargs: Dict = { + embedding_kwargs: Optional[Dict] = { "dimensions": 1536, "embedding_class": "OpenAIEmbeddings", "embedding_kwargs": { @@ -44,14 +44,14 @@ def ingest_files_dag( worker_resources: Optional[Dict] = None, worker_image: Optional[str] = None, extra_worker_modules: Optional[List[str]] = None, - driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, + driver_resources: Optional[Dict] = {"cpu": "2", "memory": "8Gi"}, driver_image: Optional[str] = None, extra_driver_modules: Optional[List[str]] = None, max_tasks_per_stage: int = -1, embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, - index_update_kwargs: Dict = {"files_per_partition": 100}, + index_update_kwargs: Optional[Dict] = {"files_per_partition": 100}, ): """ Ingest files into a vector search text index. @@ -63,16 +63,18 @@ def ingest_files_dag( :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), defaults to None. :param config: config dictionary, defaults to None. + :param environment_variables: Environment variables to use during ingestion. :param namespace: TileDB-Cloud namespace, defaults to None. :param verbose: verbose logging, defaults to False. :param trace_id: trace ID for logging, defaults to None. # Index creation params :param create_index: If true, creates a new vector search index. :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). - :param index_creation_kwargs: Arguments to be passed to the index creation method + :param index_creation_kwargs: Arguments to be passed to the index creation + method. # DirectoryTextReader params. - :param include: File pattern to iclude relative to `file_dir_uri`. By default set - to include all files. + :param include: File pattern to iclude relative to `file_dir_uri`. By default + set to include all files. :param exclude: File patterns to exclude relative to `file_dir_uri`. By default set to ignore all hidden files. :param suffixes: Provide to keep only files with these suffixes @@ -90,249 +92,90 @@ def ingest_files_dag( to specify the worker resources. :param worker_image: If `embeddings_generation_mode=BATCH` this can be used to specify the worker Docker image. - :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used - to install extra pip package to the image. - :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be - used to specify the driver resources. - :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used - to specify the driver Docker image. - :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can - be used to install extra pip package to the image. + :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be + used to install extra pip package to the image. + :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can + be used to specify the driver resources. + :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be + used to specify the driver Docker image. + :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this + can be used to install extra pip package to the image. :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. :param embeddings_generation_mode: TaskGraph execution mode for embeddings generation. - :param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion - driver. + :param embeddings_generation_driver_mode: TaskGraph execution mode for the + ingestion driver. :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ + import importlib - def ingest_files_udf( - file_dir_uri: str, - index_uri: str, - file_name: Optional[str] = None, - acn: Optional[str] = None, - config=None, - environment_variables: Mapping[str, str] = {}, - namespace: Optional[str] = None, - verbose: bool = False, - trace_id: Optional[str] = None, - # Index creation params - create_index: bool = False, - index_type: str = "IVF_FLAT", - index_creation_kwargs: Dict = {}, - # DirectoryTextReader params - include: str = "*", - exclude: Sequence[str] = ["[.]*", "*/[.]*"], - suffixes: Optional[Sequence[str]] = None, - max_files: Optional[int] = None, - text_splitter: str = "RecursiveCharacterTextSplitter", - text_splitter_kwargs: Dict = { - "chunk_size": 500, - "chunk_overlap": 50, - }, - # Embedding params - embedding_class: str = "LangChainEmbedding", - embedding_kwargs: Dict = { - "dimensions": 1536, - "embedding_class": "OpenAIEmbeddings", - "embedding_kwargs": { - "model": "text-embedding-ada-002", - }, - }, - openai_key: Optional[str] = None, - # Index update params - index_timestamp: Optional[int] = None, - workers: int = -1, - worker_resources: Optional[Dict] = None, - worker_image: Optional[str] = None, - extra_worker_modules: Optional[List[str]] = None, - driver_resources: Dict = {"cpu": "2", "memory": "8Gi"}, - driver_image: Optional[str] = None, - extra_driver_modules: Optional[List[str]] = None, - max_tasks_per_stage: int = -1, - embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, - embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, - vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, - index_update_kwargs: Dict = {"files_per_partition": 100}, - ): - """ - Ingest files into a vector search text index. - - :param file_dir_uri: directory of the files to be loaded. For individual files, - also pass the `file_name` param. - :param index_uri: URI of the vector index to load files to. - :param file_name: Name of the file to be loaded. - :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), - defaults to None. - :param config: config dictionary, defaults to None. - :param environment_variables: Environment variables to use during ingestion. - :param namespace: TileDB-Cloud namespace, defaults to None. - :param verbose: verbose logging, defaults to False. - :param trace_id: trace ID for logging, defaults to None. - # Index creation params - :param create_index: If true, creates a new vector search index. - :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). - :param index_creation_kwargs: Arguments to be passed to the index creation - method. - # DirectoryTextReader params. - :param include: File pattern to iclude relative to `file_dir_uri`. By default - set to include all files. - :param exclude: File patterns to exclude relative to `file_dir_uri`. By default - set to ignore all hidden files. - :param suffixes: Provide to keep only files with these suffixes - Useful when wanting to keep files with different suffixes - Suffixes must include the dot, e.g. ".txt" - :param max_files: Maximum number of files to include. - :param text_splitter_kwargs: Arguments for the splitter class. - # SentenceTransformersEmbedding params. - :param model_name_or_path: Huggingface SentenceTransformer model name or path - # Index update params. - :param index_timestamp: Timestamp to add index updates at. - :param workers: If `embeddings_generation_mode=BATCH` this is the number of - distributed workers to be used. - :param worker_resources: If `embeddings_generation_mode=BATCH` this can be used - to specify the worker resources. - :param worker_image: If `embeddings_generation_mode=BATCH` this can be used - to specify the worker Docker image. - :param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be - used to install extra pip package to the image. - :param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can - be used to specify the driver resources. - :param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be - used to specify the driver Docker image. - :param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this - can be used to install extra pip package to the image. - :param max_tasks_per_stage: Number of maximum udf tasks per computation stage. - :param embeddings_generation_mode: TaskGraph execution mode for embeddings - generation. - :param embeddings_generation_driver_mode: TaskGraph execution mode for the - ingestion driver. - :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. - :param index_update_kwargs: Extra arguments to pass to the index update job. - """ - import importlib - - import tiledb - from tiledb.vector_search.object_api import object_index - from tiledb.vector_search.object_readers import DirectoryTextReader - - def index_exists( - index_uri: str, - config=None, - ) -> bool: - with tiledb.scope_ctx(config): - return tiledb.object_type(index_uri) == "group" - - reader = DirectoryTextReader( - uri=file_dir_uri, - include=f"{file_name}" if file_name is not None else include, - exclude=exclude, - suffixes=suffixes, - max_files=max_files, - text_splitter=text_splitter, - text_splitter_kwargs=text_splitter_kwargs, - ) - - if openai_key is not None: - environment_variables["OPENAI_API_KEY"] = openai_key - embeddings_module = importlib.import_module("tiledb.vector_search.embeddings") - embedding_class_ = getattr(embeddings_module, embedding_class) - embedding = embedding_class_(**embedding_kwargs) + import tiledb + from tiledb.vector_search.object_api import object_index + from tiledb.vector_search.object_readers import DirectoryTextReader - index_uri_exists = index_exists( - index_uri=index_uri, - config=config, - ) - if create_index: - if index_uri_exists: - raise ValueError( - f"{index_uri} allready exists and `create_index` was set." - ) - else: - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - environment_variables=environment_variables, - **index_creation_kwargs, - ) - else: - if index_uri_exists: - index = object_index.ObjectIndex( - uri=index_uri, - environment_variables=environment_variables, - load_metadata_in_memory=False, - memory_budget=1, - ) - else: - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - environment_variables=environment_variables, - **index_creation_kwargs, - ) - - index.update_index( - index_timestamp=index_timestamp, - workers=workers, - worker_resources=worker_resources, - worker_image=worker_image, - extra_worker_modules=extra_worker_modules, - driver_resources=driver_resources, - driver_image=driver_image, - extra_driver_modules=extra_driver_modules, - worker_access_credentials_name=acn, - max_tasks_per_stage=max_tasks_per_stage, - embeddings_generation_mode=embeddings_generation_mode, - embeddings_generation_driver_mode=embeddings_generation_driver_mode, - vector_indexing_mode=vector_indexing_mode, - config=config, - environment_variables=environment_variables, - namespace=namespace, - verbose=verbose, - trace_id=trace_id, - **index_update_kwargs, - ) + if environment_variables is None: + environment_variables = {} + if index_creation_kwargs is None: + index_creation_kwargs = {} + if index_update_kwargs is None: + index_update_kwargs = {} + if embedding_kwargs is None: + embedding_kwargs = {} + if text_splitter_kwargs is None: + text_splitter_kwargs = {} - graph = dag.DAG( - name="file-vector-search-ingestion", - mode=dag.Mode.BATCH, - max_workers=1, - namespace=namespace, - ) - graph.submit( - ingest_files_udf, - file_dir_uri, - index_uri, - name="file-vector-search-ingestion", - access_credentials_name=acn, - resources=driver_resources, - image_name="vectorsearch", - acn=acn, - config=config, - environment_variables=environment_variables, - namespace=namespace, - verbose=verbose, - trace_id=trace_id, - create_index=create_index, - index_type=index_type, - index_creation_kwargs=index_creation_kwargs, - file_name=file_name, - include=include, + reader = DirectoryTextReader( + uri=file_dir_uri, + include=f"{file_name}" if file_name is not None else include, exclude=exclude, suffixes=suffixes, max_files=max_files, text_splitter=text_splitter, text_splitter_kwargs=text_splitter_kwargs, - embedding_class=embedding_class, - embedding_kwargs=embedding_class, - openai_key=openai_key, + ) + + if openai_key is not None: + environment_variables["OPENAI_API_KEY"] = openai_key + embeddings_module = importlib.import_module("tiledb.vector_search.embeddings") + embedding_class_ = getattr(embeddings_module, embedding_class) + embedding = embedding_class_(**embedding_kwargs) + + with tiledb.scope_ctx(config): + index_uri_exists = tiledb.object_type(index_uri) == "group" + if create_index: + if index_uri_exists: + raise ValueError(f"{index_uri} allready exists and `create_index` was set.") + else: + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + environment_variables=environment_variables, + **index_creation_kwargs, + ) + else: + if index_uri_exists: + index = object_index.ObjectIndex( + uri=index_uri, + environment_variables=environment_variables, + load_metadata_in_memory=False, + memory_budget=1, + ) + else: + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + environment_variables=environment_variables, + **index_creation_kwargs, + ) + + index.update_index( index_timestamp=index_timestamp, workers=workers, worker_resources=worker_resources, @@ -341,14 +184,18 @@ def index_exists( driver_resources=driver_resources, driver_image=driver_image, extra_driver_modules=extra_driver_modules, + worker_access_credentials_name=acn, max_tasks_per_stage=max_tasks_per_stage, embeddings_generation_mode=embeddings_generation_mode, embeddings_generation_driver_mode=embeddings_generation_driver_mode, vector_indexing_mode=vector_indexing_mode, - index_update_kwargs=index_update_kwargs, + config=config, + environment_variables=environment_variables, + namespace=namespace, + verbose=verbose, + trace_id=trace_id, + **index_update_kwargs, ) - graph.compute() - graph.wait() -ingest_files = as_batch(ingest_files_dag) +ingest = as_batch(ingest_files_udf) From b620a9b3315d524be818aefa2efb119bd849c8eb Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Thu, 28 Mar 2024 13:39:46 +0200 Subject: [PATCH 08/15] Fix param --- src/tiledb/cloud/vector_search/file_ingestion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 35722f12..535c007c 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -126,7 +126,7 @@ def ingest_files_udf( text_splitter_kwargs = {} reader = DirectoryTextReader( - uri=file_dir_uri, + search_uri=file_dir_uri, include=f"{file_name}" if file_name is not None else include, exclude=exclude, suffixes=suffixes, From 1a3f0205fd909e93145a29d61218f6a294cb7806 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 29 Mar 2024 17:09:33 +0200 Subject: [PATCH 09/15] Address review comments --- .../cloud/vector_search/file_ingestion.py | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 535c007c..0e3cbe6a 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -108,9 +108,8 @@ def ingest_files_udf( :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ - import importlib - import tiledb + import tiledb.vector_search.embeddings as embeddings_module from tiledb.vector_search.object_api import object_index from tiledb.vector_search.object_readers import DirectoryTextReader @@ -127,7 +126,7 @@ def ingest_files_udf( reader = DirectoryTextReader( search_uri=file_dir_uri, - include=f"{file_name}" if file_name is not None else include, + include=file_name if file_name is not None else include, exclude=exclude, suffixes=suffixes, max_files=max_files, @@ -137,7 +136,6 @@ def ingest_files_udf( if openai_key is not None: environment_variables["OPENAI_API_KEY"] = openai_key - embeddings_module = importlib.import_module("tiledb.vector_search.embeddings") embedding_class_ = getattr(embeddings_module, embedding_class) embedding = embedding_class_(**embedding_kwargs) @@ -146,16 +144,15 @@ def ingest_files_udf( if create_index: if index_uri_exists: raise ValueError(f"{index_uri} allready exists and `create_index` was set.") - else: - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - environment_variables=environment_variables, - **index_creation_kwargs, - ) + index = object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + environment_variables=environment_variables, + **index_creation_kwargs, + ) else: if index_uri_exists: index = object_index.ObjectIndex( From 81ade3a160640885950f47137d59462017ca56bd Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Fri, 29 Mar 2024 17:24:44 +0200 Subject: [PATCH 10/15] Removed default arg dicts --- .../cloud/vector_search/file_ingestion.py | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 0e3cbe6a..907c83ce 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -24,19 +24,10 @@ def ingest_files_udf( suffixes: Optional[Sequence[str]] = None, max_files: Optional[int] = None, text_splitter: str = "RecursiveCharacterTextSplitter", - text_splitter_kwargs: Optional[Dict] = { - "chunk_size": 500, - "chunk_overlap": 50, - }, + text_splitter_kwargs: Optional[Dict] = None, # Embedding params embedding_class: str = "LangChainEmbedding", - embedding_kwargs: Optional[Dict] = { - "dimensions": 1536, - "embedding_class": "OpenAIEmbeddings", - "embedding_kwargs": { - "model": "text-embedding-ada-002", - }, - }, + embedding_kwargs: Optional[Dict] = None, openai_key: Optional[str] = None, # Index update params index_timestamp: Optional[int] = None, @@ -44,14 +35,14 @@ def ingest_files_udf( worker_resources: Optional[Dict] = None, worker_image: Optional[str] = None, extra_worker_modules: Optional[List[str]] = None, - driver_resources: Optional[Dict] = {"cpu": "2", "memory": "8Gi"}, + driver_resources: Optional[Dict] = None, driver_image: Optional[str] = None, extra_driver_modules: Optional[List[str]] = None, max_tasks_per_stage: int = -1, embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, - index_update_kwargs: Optional[Dict] = {"files_per_partition": 100}, + index_update_kwargs: Optional[Dict] = None, ): """ Ingest files into a vector search text index. @@ -118,11 +109,22 @@ def ingest_files_udf( if index_creation_kwargs is None: index_creation_kwargs = {} if index_update_kwargs is None: - index_update_kwargs = {} + index_update_kwargs = {"files_per_partition": 100} if embedding_kwargs is None: - embedding_kwargs = {} + embedding_kwargs = { + "dimensions": 1536, + "embedding_class": "OpenAIEmbeddings", + "embedding_kwargs": { + "model": "text-embedding-ada-002", + }, + } if text_splitter_kwargs is None: - text_splitter_kwargs = {} + text_splitter_kwargs = { + "chunk_size": 500, + "chunk_overlap": 50, + } + if driver_resources is None: + driver_resources = {"cpu": "2", "memory": "8Gi"} reader = DirectoryTextReader( search_uri=file_dir_uri, From 2bdcd70a607f23944dcddc9d7c69660f0131801f Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Tue, 2 Apr 2024 15:15:46 +0300 Subject: [PATCH 11/15] Fix function name --- src/tiledb/cloud/vector_search/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tiledb/cloud/vector_search/__init__.py b/src/tiledb/cloud/vector_search/__init__.py index 1df365e8..9256bc78 100644 --- a/src/tiledb/cloud/vector_search/__init__.py +++ b/src/tiledb/cloud/vector_search/__init__.py @@ -1,5 +1,5 @@ -from .file_ingestion import ingest_files +from .file_ingestion import ingest __all__ = [ - "ingest_files", + "ingest", ] From 743c7c02ff297f1c0fd8e4b6d07243522a4289ac Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Wed, 3 Apr 2024 11:18:05 +0300 Subject: [PATCH 12/15] Fix typoand remove create_index argument --- .../cloud/vector_search/file_ingestion.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 907c83ce..cd732d92 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -15,7 +15,6 @@ def ingest_files_udf( verbose: bool = False, trace_id: Optional[str] = None, # Index creation params - create_index: bool = False, index_type: str = "IVF_FLAT", index_creation_kwargs: Optional[Dict] = None, # DirectoryTextReader params @@ -59,7 +58,6 @@ def ingest_files_udf( :param verbose: verbose logging, defaults to False. :param trace_id: trace ID for logging, defaults to None. # Index creation params - :param create_index: If true, creates a new vector search index. :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). :param index_creation_kwargs: Arguments to be passed to the index creation method. @@ -142,21 +140,7 @@ def ingest_files_udf( embedding = embedding_class_(**embedding_kwargs) with tiledb.scope_ctx(config): - index_uri_exists = tiledb.object_type(index_uri) == "group" - if create_index: - if index_uri_exists: - raise ValueError(f"{index_uri} allready exists and `create_index` was set.") - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - environment_variables=environment_variables, - **index_creation_kwargs, - ) - else: - if index_uri_exists: + if tiledb.object_type(index_uri) == "group": index = object_index.ObjectIndex( uri=index_uri, environment_variables=environment_variables, From f3897b7d106080b1c4dda074906f3311014d7377 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Thu, 11 Apr 2024 14:30:13 +0300 Subject: [PATCH 13/15] Apply code restructure suggestion --- .../cloud/vector_search/file_ingestion.py | 299 ++++++++++++++---- 1 file changed, 235 insertions(+), 64 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index cd732d92..65ecc568 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -4,7 +4,7 @@ from tiledb.cloud.utilities import as_batch -def ingest_files_udf( +def ingest_files( file_dir_uri: str, index_uri: str, file_name: Optional[str] = None, @@ -57,7 +57,7 @@ def ingest_files_udf( :param namespace: TileDB-Cloud namespace, defaults to None. :param verbose: verbose logging, defaults to False. :param trace_id: trace ID for logging, defaults to None. - # Index creation params + # Vector Index params :param index_type: Vector search index type ("FLAT", "IVF_FLAT"). :param index_creation_kwargs: Arguments to be passed to the index creation method. @@ -97,88 +97,259 @@ def ingest_files_udf( :param vector_indexing_mode: TaskGraph execution mode for the vector indexing. :param index_update_kwargs: Extra arguments to pass to the index update job. """ + import logging + import tiledb - import tiledb.vector_search.embeddings as embeddings_module - from tiledb.vector_search.object_api import object_index - from tiledb.vector_search.object_readers import DirectoryTextReader - - if environment_variables is None: - environment_variables = {} - if index_creation_kwargs is None: - index_creation_kwargs = {} - if index_update_kwargs is None: - index_update_kwargs = {"files_per_partition": 100} - if embedding_kwargs is None: - embedding_kwargs = { - "dimensions": 1536, - "embedding_class": "OpenAIEmbeddings", - "embedding_kwargs": { - "model": "text-embedding-ada-002", - }, - } - if text_splitter_kwargs is None: - text_splitter_kwargs = { - "chunk_size": 500, - "chunk_overlap": 50, - } - if driver_resources is None: + from tiledb.cloud import dag + from tiledb.cloud.utilities import get_logger + from tiledb.cloud.utilities import run_dag + + DEFAULT_IMG_NAME = "vectorsearch" + + def get_logger_wrapper( + verbose: bool = False, + ) -> logging.Logger: + """ + Get a logger instance and log version information. + + :param verbose: verbose logging, defaults to False + :return: logger instance + """ + + level = logging.DEBUG if verbose else logging.INFO + logger = get_logger(level) + + logger.debug( + "tiledb.cloud=%s, tiledb=%s, libtiledb=%s", + tiledb.cloud.version.version, + tiledb.version(), + tiledb.libtiledb.version(), + ) + + return logger + + # -------------------------------------------------------------------- + # UDFs + # -------------------------------------------------------------------- + + def create_dataset_udf( + file_dir_uri: str, + index_uri: str, + file_name: Optional[str] = None, + config=None, + environment_variables: Optional[Mapping[str, str]] = None, + verbose: bool = False, + # Index creation params + index_type: str = "IVF_FLAT", + index_creation_kwargs: Optional[Dict] = None, + # DirectoryTextReader params + include: str = "*", + exclude: Optional[Sequence[str]] = ("[.]*", "*/[.]*"), + suffixes: Optional[Sequence[str]] = None, + max_files: Optional[int] = None, + text_splitter: str = "RecursiveCharacterTextSplitter", + text_splitter_kwargs: Optional[Dict] = None, + # Embedding params + embedding_class: str = "LangChainEmbedding", + embedding_kwargs: Optional[Dict] = None, + ) -> str: + """ + Create a TileDB vector search dataset. + """ + import tiledb.vector_search as vs + import tiledb.vector_search.embeddings as embeddings_module + from tiledb.vector_search.object_api import object_index + from tiledb.vector_search.object_readers import DirectoryTextReader + + logger = get_logger_wrapper(verbose) + logger.debug("tiledb-vector-search=%s", vs.__version__) + + # Check if the dataset already exists + with tiledb.scope_ctx(config): + if environment_variables is None: + environment_variables = {} + if index_creation_kwargs is None: + index_creation_kwargs = {} + if embedding_kwargs is None: + embedding_kwargs = { + "dimensions": 1536, + "embedding_class": "OpenAIEmbeddings", + "embedding_kwargs": { + "model": "text-embedding-ada-002", + }, + } + if text_splitter_kwargs is None: + text_splitter_kwargs = { + "chunk_size": 500, + "chunk_overlap": 50, + } + + reader = DirectoryTextReader( + search_uri=file_dir_uri, + include=file_name if file_name is not None else include, + exclude=exclude, + suffixes=suffixes, + max_files=max_files, + text_splitter=text_splitter, + text_splitter_kwargs=text_splitter_kwargs, + ) + + embedding_class_ = getattr(embeddings_module, embedding_class) + embedding = embedding_class_(**embedding_kwargs) + + if tiledb.object_type(index_uri) == "group": + logger.info("Existing dataset: %r. Updating reader.", index_uri) + index = object_index.ObjectIndex( + uri=index_uri, + environment_variables=environment_variables, + load_embedding=False, + load_metadata_in_memory=False, + memory_budget=1, + ) + index.update_object_reader(reader) + return index_uri + else: + logger.info("Creating dataset: %r", index_uri) + object_index.create( + uri=index_uri, + index_type=index_type, + object_reader=reader, + embedding=embedding, + config=config, + environment_variables=environment_variables, + **index_creation_kwargs, + ) + return index_uri + + def ingest_files_udf( + index_uri: str, + acn: Optional[str] = None, + config=None, + environment_variables: Optional[Mapping[str, str]] = None, + openai_key: Optional[str] = None, + namespace: Optional[str] = None, + verbose: bool = False, + trace_id: Optional[str] = None, + # Index update params + index_timestamp: Optional[int] = None, + workers: int = -1, + worker_resources: Optional[Dict] = None, + worker_image: Optional[str] = None, + extra_worker_modules: Optional[List[str]] = None, + driver_resources: Optional[Dict] = None, + driver_image: Optional[str] = None, + extra_driver_modules: Optional[List[str]] = None, + max_tasks_per_stage: int = -1, + embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, + embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, + vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, + index_update_kwargs: Optional[Dict] = None, + ): + """ + Ingest files into a vector search text index. + """ + from tiledb.vector_search.object_api import object_index + + if environment_variables is None: + environment_variables = {} + if openai_key is not None: + environment_variables["OPENAI_API_KEY"] = openai_key + if index_update_kwargs is None: + index_update_kwargs = {"files_per_partition": 100} + + index = object_index.ObjectIndex( + uri=index_uri, + environment_variables=environment_variables, + load_metadata_in_memory=False, + memory_budget=1, + ) + index.update_index( + index_timestamp=index_timestamp, + workers=workers, + worker_resources=worker_resources, + worker_image=worker_image, + extra_worker_modules=extra_worker_modules, + driver_resources=driver_resources, + driver_image=driver_image, + extra_driver_modules=extra_driver_modules, + worker_access_credentials_name=acn, + max_tasks_per_stage=max_tasks_per_stage, + embeddings_generation_mode=embeddings_generation_mode, + embeddings_generation_driver_mode=embeddings_generation_driver_mode, + vector_indexing_mode=vector_indexing_mode, + config=config, + environment_variables=environment_variables, + namespace=namespace, + verbose=verbose, + trace_id=trace_id, + **index_update_kwargs, + ) + + # -------------------------------------------------------------------- + # DAG + # -------------------------------------------------------------------- + + graph = dag.DAG( + name="vector-search-file-indexing", + namespace=namespace, + mode=dag.Mode.BATCH, + ) + if worker_resources is None: driver_resources = {"cpu": "2", "memory": "8Gi"} - reader = DirectoryTextReader( - search_uri=file_dir_uri, - include=file_name if file_name is not None else include, + create_index_node = graph.submit( + create_dataset_udf, + file_dir_uri=file_dir_uri, + index_uri=index_uri, + file_name=file_name, + config=config, + environment_variables=environment_variables, + verbose=verbose, + index_type=index_type, + index_creation_kwargs=index_creation_kwargs, + include=include, exclude=exclude, suffixes=suffixes, max_files=max_files, text_splitter=text_splitter, text_splitter_kwargs=text_splitter_kwargs, + embedding_class=embedding_class, + embedding_kwargs=embedding_kwargs, + name="Create Vector Search index", + access_credentials_name=acn, + resources={"cpu": "1", "memory": "2Gi"}, + image_name=DEFAULT_IMG_NAME, ) - if openai_key is not None: - environment_variables["OPENAI_API_KEY"] = openai_key - embedding_class_ = getattr(embeddings_module, embedding_class) - embedding = embedding_class_(**embedding_kwargs) - - with tiledb.scope_ctx(config): - if tiledb.object_type(index_uri) == "group": - index = object_index.ObjectIndex( - uri=index_uri, - environment_variables=environment_variables, - load_metadata_in_memory=False, - memory_budget=1, - ) - else: - index = object_index.create( - uri=index_uri, - index_type=index_type, - object_reader=reader, - embedding=embedding, - config=config, - environment_variables=environment_variables, - **index_creation_kwargs, - ) - - index.update_index( - index_timestamp=index_timestamp, + ingest_files_node = graph.submit( + ingest_files_udf, + index_uri=index_uri, + acn=acn, + config=config, + environment_variables=environment_variables, + openai_key=openai_key, + namespace=namespace, + verbose=verbose, + trace_id=trace_id, workers=workers, - worker_resources=worker_resources, worker_image=worker_image, extra_worker_modules=extra_worker_modules, driver_resources=driver_resources, driver_image=driver_image, extra_driver_modules=extra_driver_modules, - worker_access_credentials_name=acn, max_tasks_per_stage=max_tasks_per_stage, embeddings_generation_mode=embeddings_generation_mode, embeddings_generation_driver_mode=embeddings_generation_driver_mode, vector_indexing_mode=vector_indexing_mode, - config=config, - environment_variables=environment_variables, - namespace=namespace, - verbose=verbose, - trace_id=trace_id, - **index_update_kwargs, + index_update_kwargs=index_update_kwargs, + name="Ingest Files to Vector Search index", + access_credentials_name=acn, + resources=worker_resources, + image_name=DEFAULT_IMG_NAME, ) + ingest_files_node.depends_on(create_index_node) + run_dag(graph, debug=verbose) + -ingest = as_batch(ingest_files_udf) +ingest = as_batch(ingest_files) From 3b717c57ed36ba5cdc3af36ca469c186f9df6260 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Mon, 29 Apr 2024 17:10:29 +0300 Subject: [PATCH 14/15] Address review comments --- .../cloud/vector_search/file_ingestion.py | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index 65ecc568..cddfa0c4 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -5,9 +5,8 @@ def ingest_files( - file_dir_uri: str, + search_uri: str, index_uri: str, - file_name: Optional[str] = None, acn: Optional[str] = None, config=None, environment_variables: Optional[Mapping[str, str]] = None, @@ -38,18 +37,17 @@ def ingest_files( driver_image: Optional[str] = None, extra_driver_modules: Optional[List[str]] = None, max_tasks_per_stage: int = -1, - embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, + embeddings_generation_mode: dag.Mode = dag.Mode.BATCH, embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, - vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, + vector_indexing_mode: dag.Mode = dag.Mode.BATCH, index_update_kwargs: Optional[Dict] = None, ): """ Ingest files into a vector search text index. - :param file_dir_uri: directory of the files to be loaded. For individual files, - also pass the `file_name` param. + :param search_uri: Uri to load files from. This can be a directory URI or a FileStore + file URI. :param index_uri: URI of the vector index to load files to. - :param file_name: Name of the file to be loaded. :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), defaults to None. :param config: config dictionary, defaults to None. @@ -62,9 +60,9 @@ def ingest_files( :param index_creation_kwargs: Arguments to be passed to the index creation method. # DirectoryTextReader params. - :param include: File pattern to iclude relative to `file_dir_uri`. By default + :param include: File pattern to iclude relative to `search_uri`. By default set to include all files. - :param exclude: File patterns to exclude relative to `file_dir_uri`. By default + :param exclude: File patterns to exclude relative to `search_uri`. By default set to ignore all hidden files. :param suffixes: Provide to keep only files with these suffixes Useful when wanting to keep files with different suffixes @@ -133,9 +131,8 @@ def get_logger_wrapper( # -------------------------------------------------------------------- def create_dataset_udf( - file_dir_uri: str, + search_uri: str, index_uri: str, - file_name: Optional[str] = None, config=None, environment_variables: Optional[Mapping[str, str]] = None, verbose: bool = False, @@ -185,8 +182,8 @@ def create_dataset_udf( } reader = DirectoryTextReader( - search_uri=file_dir_uri, - include=file_name if file_name is not None else include, + search_uri=search_uri, + include=include, exclude=exclude, suffixes=suffixes, max_files=max_files, @@ -202,8 +199,11 @@ def create_dataset_udf( index = object_index.ObjectIndex( uri=index_uri, environment_variables=environment_variables, + # We don't want to perform any queries here. We open the index without + # loading the embedding model, metadata and vector data. load_embedding=False, load_metadata_in_memory=False, + # `memory_budget=1` avoids loading the array data in main memory. memory_budget=1, ) index.update_object_reader(reader) @@ -260,7 +260,11 @@ def ingest_files_udf( index = object_index.ObjectIndex( uri=index_uri, environment_variables=environment_variables, + # We don't want to perform any queries here. We open the index without + # loading the embedding model, metadata and vector data. + load_embedding=False, load_metadata_in_memory=False, + # `memory_budget=1` avoids loading the array data in main memory. memory_budget=1, ) index.update_index( @@ -295,13 +299,12 @@ def ingest_files_udf( mode=dag.Mode.BATCH, ) if worker_resources is None: - driver_resources = {"cpu": "2", "memory": "8Gi"} + worker_resources = {"cpu": "2", "memory": "8Gi"} create_index_node = graph.submit( create_dataset_udf, - file_dir_uri=file_dir_uri, + search_uri=search_uri, index_uri=index_uri, - file_name=file_name, config=config, environment_variables=environment_variables, verbose=verbose, @@ -331,7 +334,9 @@ def ingest_files_udf( namespace=namespace, verbose=verbose, trace_id=trace_id, + index_timestamp=index_timestamp, workers=workers, + worker_resources=worker_resources, worker_image=worker_image, extra_worker_modules=extra_worker_modules, driver_resources=driver_resources, From 35ef6e75a22a21bdf25d8f3dd09dd48f7f3a0944 Mon Sep 17 00:00:00 2001 From: Nikos Papailiou Date: Mon, 29 Apr 2024 17:14:19 +0300 Subject: [PATCH 15/15] Fix format --- src/tiledb/cloud/vector_search/file_ingestion.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tiledb/cloud/vector_search/file_ingestion.py b/src/tiledb/cloud/vector_search/file_ingestion.py index cddfa0c4..abbcac62 100644 --- a/src/tiledb/cloud/vector_search/file_ingestion.py +++ b/src/tiledb/cloud/vector_search/file_ingestion.py @@ -45,8 +45,8 @@ def ingest_files( """ Ingest files into a vector search text index. - :param search_uri: Uri to load files from. This can be a directory URI or a FileStore - file URI. + :param search_uri: Uri to load files from. This can be a directory URI or + a FileStore file URI. :param index_uri: URI of the vector index to load files to. :param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), defaults to None. @@ -199,8 +199,8 @@ def create_dataset_udf( index = object_index.ObjectIndex( uri=index_uri, environment_variables=environment_variables, - # We don't want to perform any queries here. We open the index without - # loading the embedding model, metadata and vector data. + # We don't want to perform any queries here. We open the index + # without loading the embedding model, metadata and vector data. load_embedding=False, load_metadata_in_memory=False, # `memory_budget=1` avoids loading the array data in main memory.