Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingestion function for ingesting files to vector search #532

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions src/tiledb/cloud/utilities/_common.py
Expand Up @@ -310,13 +310,15 @@ def wrapper(*args, **kwargs) -> Dict[str, object]:
- acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type)
- access_credentials_name: alias for acn, for backwards compatibility
- resources: resources to allocate for the UDF, defaults to None
- image_name: Docker image_name to use for UDFs, defaults to None
"""

name = kwargs.get("name", func.__name__)
namespace = kwargs.get("namespace", None)
acn = kwargs.get("acn", kwargs.pop("access_credentials_name", None))
kwargs["acn"] = acn # for backwards compatibility
resources = kwargs.pop("resources", None)
image_name = kwargs.pop("image_name", None)

# Create a new DAG
graph = dag.DAG(
Expand All @@ -332,6 +334,7 @@ def wrapper(*args, **kwargs) -> Dict[str, object]:
name=name,
access_credentials_name=acn,
resources=resources,
image_name=image_name,
**_filter_kwargs(func, kwargs),
)

Expand Down
5 changes: 5 additions & 0 deletions src/tiledb/cloud/vector_search/__init__.py
@@ -0,0 +1,5 @@
from .file_ingestion import ingest_files

__all__ = [
"ingest_files",
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
]
201 changes: 201 additions & 0 deletions src/tiledb/cloud/vector_search/file_ingestion.py
@@ -0,0 +1,201 @@
from typing import Dict, List, Mapping, Optional, Sequence

from tiledb.cloud import dag
from tiledb.cloud.utilities import as_batch


def ingest_files_udf(
file_dir_uri: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work as expected. Passing in a TileDB file URI get ignored. Please test this and add unit tests for the relevant cases. Currently this does not cover the required use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is implemented atm this should be the group URI and we pick up the file from the group(applying regexp patterns if provided). What are the cases you are looking for supporting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirements are to support TileDB FileStore files or a group of files. This has been a hard requirement from day one and is outlined in our planning document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed file_dir_uri to search_uri to be consistent with other ingestion jobs. search_uri supports FileStore URIs. If we want to index one file using the FileStore URI we can pass it as search_uri. @Shelnutt2 is this covering your expectations of the function signature?

index_uri: str,
file_name: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This along with include/exclude don't make sense. How is this to work with TileDB files? There is no check if the TileDB file name or any parsing of the TileDB URIs. The goal again as outlined in the requirements is to use this with TileDB files, either standalone or from a group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the file_name option. FileStore URIs work directly using the search_uri param.

acn: Optional[str] = None,
config=None,
environment_variables: Optional[Mapping[str, str]] = None,
namespace: Optional[str] = None,
verbose: bool = False,
trace_id: Optional[str] = None,
# Index creation params
create_index: bool = False,
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
index_type: str = "IVF_FLAT",
index_creation_kwargs: Optional[Dict] = None,
# DirectoryTextReader params
include: str = "*",
exclude: Optional[Sequence[str]] = ("[.]*", "*/[.]*"),
suffixes: Optional[Sequence[str]] = None,
max_files: Optional[int] = None,
text_splitter: str = "RecursiveCharacterTextSplitter",
text_splitter_kwargs: Optional[Dict] = {
"chunk_size": 500,
"chunk_overlap": 50,
},
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
# Embedding params
embedding_class: str = "LangChainEmbedding",
embedding_kwargs: Optional[Dict] = {
"dimensions": 1536,
"embedding_class": "OpenAIEmbeddings",
"embedding_kwargs": {
"model": "text-embedding-ada-002",
},
},
openai_key: Optional[str] = None,
# Index update params
index_timestamp: Optional[int] = None,
workers: int = -1,
worker_resources: Optional[Dict] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not plumbing all the different resource parameters, is there a reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the vector indexing resources? These can be passed in index_update_kwargs

worker_image: Optional[str] = None,
extra_worker_modules: Optional[List[str]] = None,
driver_resources: Optional[Dict] = {"cpu": "2", "memory": "8Gi"},
driver_image: Optional[str] = None,
extra_driver_modules: Optional[List[str]] = None,
max_tasks_per_stage: int = -1,
embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything must default to batch mode. Running this in local is unexpected. The goals are that like all other verticals we support and default to batch ingestion capabilities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document indexing has multiple execution steps that can be run in different modes:

  • ingest_files creates a BATCH taskgraph that runs all the indexing. This means that all processing happens within a BATCH taskgraph with access_credentials even if the options here are set to LOCAL
  • embeddings_generation: reads the documents and creates text embeddings. This can spawn its own taskgraph.
  • vector_indexing: creates a vector index from the produced embeddings. This can spawn its own taskgraph.

The default configuration at the moment is:

  • ingest_files creates a BATCH taskgraph that runs all the indexing.
  • embeddings_generation, vector_indexing run in LOCAL mode within a UDF of the ingest_files taskgraph. Both of these tasks can leverage the available parallelism within the single worker.

This is expected to be a good default execution configuration for cost and latency even for sets of thousands of documents.

Do you want all the execution steps to be executed in BATCH mode by default?

Copy link
Member

@Shelnutt2 Shelnutt2 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirement again, as we discussed and as spelled out in the story is to have a robust batch mode ingestion that can scale to millions of documents. Local mode is a bad default and does not meet our intended goal, please change it and please be sure you actually test at scale. These issues are easy to see even running just our same test datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed embeddings_generation_mode, vector_indexing modes to BATCH.
Note: this setting can introduce some latency and resource overhead for small to medium size document datasets.

There were some bugs that led to execution failure for this setup. This was addressed in TileDB-Inc/TileDB-Vector-Search#351 Also added tests for the BATCH execution in Cloud.

embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL,
vector_indexing_mode: dag.Mode = dag.Mode.LOCAL,
index_update_kwargs: Optional[Dict] = {"files_per_partition": 100},
):
"""
Ingest files into a vector search text index.

:param file_dir_uri: directory of the files to be loaded. For individual files,
also pass the `file_name` param.
:param index_uri: URI of the vector index to load files to.
:param file_name: Name of the file to be loaded.
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type),
defaults to None.
:param config: config dictionary, defaults to None.
:param environment_variables: Environment variables to use during ingestion.
:param namespace: TileDB-Cloud namespace, defaults to None.
:param verbose: verbose logging, defaults to False.
:param trace_id: trace ID for logging, defaults to None.
# Index creation params
:param create_index: If true, creates a new vector search index.
:param index_type: Vector search index type ("FLAT", "IVF_FLAT").
:param index_creation_kwargs: Arguments to be passed to the index creation
method.
# DirectoryTextReader params.
:param include: File pattern to iclude relative to `file_dir_uri`. By default
set to include all files.
:param exclude: File patterns to exclude relative to `file_dir_uri`. By default
set to ignore all hidden files.
:param suffixes: Provide to keep only files with these suffixes
Useful when wanting to keep files with different suffixes
Suffixes must include the dot, e.g. ".txt"
:param max_files: Maximum number of files to include.
:param text_splitter_kwargs: Arguments for the splitter class.
# SentenceTransformersEmbedding params.
:param model_name_or_path: Huggingface SentenceTransformer model name or path
# Index update params.
:param index_timestamp: Timestamp to add index updates at.
:param workers: If `embeddings_generation_mode=BATCH` this is the number of
distributed workers to be used.
:param worker_resources: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker resources.
:param worker_image: If `embeddings_generation_mode=BATCH` this can be used
to specify the worker Docker image.
:param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be
used to install extra pip package to the image.
:param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can
be used to specify the driver resources.
:param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be
used to specify the driver Docker image.
:param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this
can be used to install extra pip package to the image.
:param max_tasks_per_stage: Number of maximum udf tasks per computation stage.
:param embeddings_generation_mode: TaskGraph execution mode for embeddings
generation.
:param embeddings_generation_driver_mode: TaskGraph execution mode for the
ingestion driver.
:param vector_indexing_mode: TaskGraph execution mode for the vector indexing.
:param index_update_kwargs: Extra arguments to pass to the index update job.
"""
import importlib

import tiledb
from tiledb.vector_search.object_api import object_index
from tiledb.vector_search.object_readers import DirectoryTextReader

if environment_variables is None:
environment_variables = {}
if index_creation_kwargs is None:
index_creation_kwargs = {}
if index_update_kwargs is None:
index_update_kwargs = {}
if embedding_kwargs is None:
embedding_kwargs = {}
if text_splitter_kwargs is None:
text_splitter_kwargs = {}

reader = DirectoryTextReader(
uri=file_dir_uri,
include=f"{file_name}" if file_name is not None else include,
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
exclude=exclude,
suffixes=suffixes,
max_files=max_files,
text_splitter=text_splitter,
text_splitter_kwargs=text_splitter_kwargs,
)

if openai_key is not None:
environment_variables["OPENAI_API_KEY"] = openai_key
embeddings_module = importlib.import_module("tiledb.vector_search.embeddings")
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
embedding_class_ = getattr(embeddings_module, embedding_class)
embedding = embedding_class_(**embedding_kwargs)

with tiledb.scope_ctx(config):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done as the first stage in the graph, not local to the caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this, ingest_files_udf is running within the taskgraph

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an alternative version of the PR that uses an extra taskgraph and creates the dataset as a first node in the taskgraph #547 Is this what you are expecting the ingestion structure to look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shelnutt2 is the alternative version matching your expectation? Let me know if you still have concerns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the alternative taskgraph structure in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shelnutt2 this PR needs your approval to move forward. Let me know if you need any more changes.

index_uri_exists = tiledb.object_type(index_uri) == "group"
if create_index:
if index_uri_exists:
raise ValueError(f"{index_uri} allready exists and `create_index` was set.")
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
else:
NikolaosPapailiou marked this conversation as resolved.
Show resolved Hide resolved
index = object_index.create(
uri=index_uri,
index_type=index_type,
object_reader=reader,
embedding=embedding,
config=config,
environment_variables=environment_variables,
**index_creation_kwargs,
)
else:
if index_uri_exists:
index = object_index.ObjectIndex(
uri=index_uri,
environment_variables=environment_variables,
load_metadata_in_memory=False,
memory_budget=1,
)
else:
index = object_index.create(
uri=index_uri,
index_type=index_type,
object_reader=reader,
embedding=embedding,
config=config,
environment_variables=environment_variables,
**index_creation_kwargs,
)

index.update_index(
index_timestamp=index_timestamp,
workers=workers,
worker_resources=worker_resources,
worker_image=worker_image,
extra_worker_modules=extra_worker_modules,
driver_resources=driver_resources,
driver_image=driver_image,
extra_driver_modules=extra_driver_modules,
worker_access_credentials_name=acn,
max_tasks_per_stage=max_tasks_per_stage,
embeddings_generation_mode=embeddings_generation_mode,
embeddings_generation_driver_mode=embeddings_generation_driver_mode,
vector_indexing_mode=vector_indexing_mode,
config=config,
environment_variables=environment_variables,
namespace=namespace,
verbose=verbose,
trace_id=trace_id,
**index_update_kwargs,
)


ingest = as_batch(ingest_files_udf)