Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingestion function for ingesting files to vector search #532

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
5 changes: 5 additions & 0 deletions src/tiledb/cloud/vector_search/__init__.py
@@ -0,0 +1,5 @@
from .file_ingestion import ingest_files

__all__ = [
"ingest_files",
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
]
326 changes: 326 additions & 0 deletions src/tiledb/cloud/vector_search/file_ingestion.py
@@ -0,0 +1,326 @@
from typing import Dict, List, Optional, Sequence

from tiledb.cloud import dag
from tiledb.cloud.utilities import as_batch


def ingest_files_dag(
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
file_dir_uri: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work as expected. Passing in a TileDB file URI get ignored. Please test this and add unit tests for the relevant cases. Currently this does not cover the required use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is implemented atm this should be the group URI and we pick up the file from the group(applying regexp patterns if provided). What are the cases you are looking for supporting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirements are to support TileDB FileStore files or a group of files. This has been a hard requirement from day one and is outlined in our planning document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed file_dir_uri to search_uri to be consistent with other ingestion jobs. search_uri supports FileStore URIs. If we want to index one file using the FileStore URI we can pass it as search_uri. @Shelnutt2 is this covering your expectations of the function signature?

index_uri: str,
file_name: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This along with include/exclude don't make sense. How is this to work with TileDB files? There is no check if the TileDB file name or any parsing of the TileDB URIs. The goal again as outlined in the requirements is to use this with TileDB files, either standalone or from a group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the file_name option. FileStore URIs work directly using the search_uri param.

acn: Optional[str] = None,
config=None,
namespace: Optional[str] = None,
verbose: bool = False,
trace_id: Optional[str] = None,
# Index creation params
create_index: bool = True,
index_type: str = "IVF_FLAT",
index_creation_kwargs: Dict = {},
# DirectoryTextReader params
glob: str = "**/[!.]*",
exclude: Sequence[str] = (),
suffixes: Optional[Sequence[str]] = None,
text_splitter: str = "RecursiveCharacterTextSplitter",
text_splitter_kwargs: Optional[Dict] = {
"chunk_size": 500,
"chunk_overlap": 50,
},
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
# SentenceTransformersEmbedding params
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
# Index update params
index_timestamp: int = None,
workers: int = -1,
worker_resources: Dict = None,
worker_image: str = None,
extra_worker_modules: Optional[List[str]] = None,
driver_resources: Dict = {"cpu": "2", "memory": "8Gi"},
driver_image: str = None,
extra_driver_modules: Optional[List[str]] = None,
max_tasks_per_stage: int = -1,
embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything must default to batch mode. Running this in local is unexpected. The goals are that like all other verticals we support and default to batch ingestion capabilities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document indexing has multiple execution steps that can be run in different modes:

  • ingest_files creates a BATCH taskgraph that runs all the indexing. This means that all processing happens within a BATCH taskgraph with access_credentials even if the options here are set to LOCAL
  • embeddings_generation: reads the documents and creates text embeddings. This can spawn its own taskgraph.
  • vector_indexing: creates a vector index from the produced embeddings. This can spawn its own taskgraph.

The default configuration at the moment is:

  • ingest_files creates a BATCH taskgraph that runs all the indexing.
  • embeddings_generation, vector_indexing run in LOCAL mode within a UDF of the ingest_files taskgraph. Both of these tasks can leverage the available parallelism within the single worker.

This is expected to be a good default execution configuration for cost and latency even for sets of thousands of documents.

Do you want all the execution steps to be executed in BATCH mode by default?

Copy link
Member

@Shelnutt2 Shelnutt2 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirement again, as we discussed and as spelled out in the story is to have a robust batch mode ingestion that can scale to millions of documents. Local mode is a bad default and does not meet our intended goal, please change it and please be sure you actually test at scale. These issues are easy to see even running just our same test datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed embeddings_generation_mode, vector_indexing modes to BATCH.
Note: this setting can introduce some latency and resource overhead for small to medium size document datasets.

There were some bugs that led to execution failure for this setup. This was addressed in TileDB-Inc/TileDB-Vector-Search#351 Also added tests for the BATCH execution in Cloud.

embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL,
vector_indexing_mode: dag.Mode = dag.Mode.LOCAL,
index_update_kwargs: Dict = {"files_per_partition": 100},
):
"""
Ingest files into a vector search text index.

:param file_dir_uri: directory of the files to be loaded. For individual files,
also pass the `file_name` param.
:param index_uri: URI of the vector index to load files to.
:param file_name: Name of the file to be loaded.
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type),
defaults to None.
:param config: config dictionary, defaults to None.
:param namespace: TileDB-Cloud namespace, defaults to None.
:param verbose: verbose logging, defaults to False.
:param trace_id: trace ID for logging, defaults to None.
# Index creation params
:param create_index: If true, creates a new vector search index.
:param index_type: Vector search index type ("FLAT", "IVF_FLAT").
:param index_creation_kwargs: Arguments to be passed to the index creation method
# DirectoryTextReader params.
:param glob: Glob pattern relative to the specified path by default set to pick up
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
all non-hidden files.
:param exclude: Patterns to exclude from results, use glob syntax.
:param suffixes: Load only files with these suffixes. Suffixes must include the
dot, e.g. ".txt".
:param text_splitter_kwargs: Arguments for the splitter class.
# SentenceTransformersEmbedding params.
:param model_name_or_path: Huggingface SentenceTransformer model name or path
# Index update params.
:param index_timestamp: Timestamp to add index updates at.
:param workers: If `embeddings_generation_mode=BATCH` this is the number of
distributed workers to be used.
:param worker_resources: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker resources.
:param worker_image: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker Docker image.
:param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be used
to install extra pip package to the image.
:param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can be
used to specify the driver resources.
:param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be used
to specify the driver Docker image.
:param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this can
be used to install extra pip package to the image.
:param max_tasks_per_stage: Number of maximum udf tasks per computation stage.
:param embeddings_generation_mode: TaskGraph execution mode for embeddings
generation.
:param embeddings_generation_driver_mode: TaskGraph execution mode for the ingestion
driver.
:param vector_indexing_mode: TaskGraph execution mode for the vector indexing.
:param index_update_kwargs: Extra arguments to pass to the index update job.
"""

def ingest_files_udf(
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
file_dir_uri: str,
index_uri: str,
file_name: Optional[str] = None,
acn: Optional[str] = None,
config=None,
namespace: Optional[str] = None,
verbose: bool = False,
trace_id: Optional[str] = None,
# Index creation params
create_index: bool = True,
index_type: str = "IVF_FLAT",
index_creation_kwargs: Dict = {},
# DirectoryTextReader params
glob: str = "**/[!.]*",
exclude: Sequence[str] = (),
suffixes: Optional[Sequence[str]] = None,
text_splitter: str = "RecursiveCharacterTextSplitter",
text_splitter_kwargs: Optional[Dict] = {
"chunk_size": 500,
"chunk_overlap": 50,
},
# SentenceTransformersEmbedding params
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
# Index update params
index_timestamp: int = None,
workers: int = -1,
worker_resources: Dict = None,
worker_image: str = None,
extra_worker_modules: Optional[List[str]] = None,
driver_resources: Dict = {"cpu": "2", "memory": "8Gi"},
driver_image: str = None,
extra_driver_modules: Optional[List[str]] = None,
max_tasks_per_stage: int = -1,
embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL,
embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL,
vector_indexing_mode: dag.Mode = dag.Mode.LOCAL,
index_update_kwargs: Dict = {"files_per_partition": 100},
):
"""
Ingest files into a vector search text index.

:param file_dir_uri: directory of the files to be loaded. For individual files,
also pass the `file_name` param.
:param index_uri: URI of the vector index to load files to.
:param file_name: Name of the file to be loaded.
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type),
defaults to None.
:param config: config dictionary, defaults to None.
:param namespace: TileDB-Cloud namespace, defaults to None.
:param verbose: verbose logging, defaults to False.
:param trace_id: trace ID for logging, defaults to None.
# Index creation params
:param create_index: If true, creates a new vector search index.
:param index_type: Vector search index type ("FLAT", "IVF_FLAT").
:param index_creation_kwargs: Arguments to be passed to the index creation
method.
# DirectoryTextReader params.
:param glob: Glob pattern relative to the specified path by default set to
pick up all non-hidden files.
:param exclude: Patterns to exclude from results, use glob syntax.
:param suffixes: Load only files with these suffixes. Suffixes must include
the dot, e.g. ".txt".
:param text_splitter_kwargs: Arguments for the splitter class.
# SentenceTransformersEmbedding params.
:param model_name_or_path: Huggingface SentenceTransformer model name or path
# Index update params.
:param index_timestamp: Timestamp to add index updates at.
:param workers: If `embeddings_generation_mode=BATCH` this is the number of
distributed workers to be used.
:param worker_resources: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker resources.
:param worker_image: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker Docker image.
:param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be
used to install extra pip package to the image.
:param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can
be used to specify the driver resources.
:param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be
used to specify the driver Docker image.
:param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this
can be used to install extra pip package to the image.
:param max_tasks_per_stage: Number of maximum udf tasks per computation stage.
:param embeddings_generation_mode: TaskGraph execution mode for embeddings
generation.
:param embeddings_generation_driver_mode: TaskGraph execution mode for the
ingestion driver.
:param vector_indexing_mode: TaskGraph execution mode for the vector indexing.
:param index_update_kwargs: Extra arguments to pass to the index update job.
"""
import tiledb

# Install extra packages not yet available in vectorsearch UDF image.
# TODO(nikos) remove the pacakge installations after the image is updated
def install_extra_worker_modules():
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
import os
import subprocess
import sys

sys.path.insert(0, "/home/udf/.local/bin")
sys.path.insert(0, "/home/udf/.local/lib/python3.9/site-packages")
os.environ["PATH"] = f"/home/udf/.local/bin:{os.environ['PATH']}"
subprocess.check_call(
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
[
sys.executable,
"-m",
"pip",
"install",
"--force-reinstall",
"tiledb-vector-search==0.0.23",
"tiledb==0.25.0",
"tiledb-cloud==0.11.10",
]
)
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"install",
"transformers==4.37.1",
"PyMuPDF",
"beautifulsoup4",
]
)

install_extra_worker_modules()
import importlib

importlib.reload(tiledb)

from tiledb.vector_search.embeddings import SentenceTransformersEmbedding
from tiledb.vector_search.object_api import object_index
from tiledb.vector_search.object_readers import DirectoryTextReader

reader = DirectoryTextReader(
uri=file_dir_uri,
glob=f"**/{file_name}" if file_name is not None else glob,
exclude=exclude,
suffixes=suffixes,
text_splitter=text_splitter,
text_splitter_kwargs=text_splitter_kwargs,
)

embedding = SentenceTransformersEmbedding(
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
model_name_or_path=model_name_or_path,
)
if create_index:
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
index = object_index.create(
uri=index_uri,
index_type=index_type,
object_reader=reader,
embedding=embedding,
config=config,
**index_creation_kwargs,
)
else:
index = object_index.ObjectIndex(
uri=index_uri, load_metadata_in_memory=False, memory_budget=1
)

index.update_index(
index_timestamp=index_timestamp,
workers=workers,
worker_resources=worker_resources,
worker_image=worker_image,
extra_worker_modules=extra_worker_modules,
driver_resources=driver_resources,
driver_image=driver_image,
extra_driver_modules=extra_driver_modules,
worker_access_credentials_name=acn,
max_tasks_per_stage=max_tasks_per_stage,
embeddings_generation_mode=embeddings_generation_mode,
embeddings_generation_driver_mode=embeddings_generation_driver_mode,
vector_indexing_mode=vector_indexing_mode,
config=config,
namespace=namespace,
verbose=verbose,
trace_id=trace_id,
**index_update_kwargs,
)

graph = dag.DAG(
name="file-vector-search-ingestion",
mode=dag.Mode.BATCH,
max_workers=1,
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
namespace=namespace,
)
graph.submit(
ingest_files_udf,
file_dir_uri,
index_uri,
name="file-vector-search-ingestion",
access_credentials_name=acn,
resources=driver_resources,
image_name="vectorsearch",
acn=acn,
config=config,
namespace=namespace,
verbose=verbose,
trace_id=trace_id,
create_index=create_index,
index_type=index_type,
index_creation_kwargs=index_creation_kwargs,
file_name=file_name,
glob=glob,
exclude=exclude,
suffixes=suffixes,
text_splitter=text_splitter,
text_splitter_kwargs=text_splitter_kwargs,
model_name_or_path=model_name_or_path,
index_timestamp=index_timestamp,
workers=workers,
worker_resources=worker_resources,
worker_image=worker_image,
extra_worker_modules=extra_worker_modules,
driver_resources=driver_resources,
driver_image=driver_image,
extra_driver_modules=extra_driver_modules,
max_tasks_per_stage=max_tasks_per_stage,
embeddings_generation_mode=embeddings_generation_mode,
embeddings_generation_driver_mode=embeddings_generation_driver_mode,
vector_indexing_mode=vector_indexing_mode,
index_update_kwargs=index_update_kwargs,
)
graph.compute()
graph.wait()


ingest_files = as_batch(ingest_files_dag)
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved