New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ingestion function for ingesting files to vector search #532
base: main
Are you sure you want to change the base?
Changes from 11 commits
f0691c9
f37de0a
987a26a
0b7f010
9b0e8a5
3befe31
9f8b5f8
fe0c981
fd8987b
7132369
011ffa1
61f389a
d483531
4b9c422
482bdbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .file_ingestion import ingest | ||
|
||
__all__ = [ | ||
"ingest", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
from typing import Dict, List, Mapping, Optional, Sequence | ||
|
||
from tiledb.cloud import dag | ||
from tiledb.cloud.utilities import as_batch | ||
|
||
|
||
def ingest_files_udf( | ||
file_dir_uri: str, | ||
index_uri: str, | ||
file_name: Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This along with include/exclude don't make sense. How is this to work with TileDB files? There is no check if the TileDB file name or any parsing of the TileDB URIs. The goal again as outlined in the requirements is to use this with TileDB files, either standalone or from a group. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the |
||
acn: Optional[str] = None, | ||
config=None, | ||
environment_variables: Optional[Mapping[str, str]] = None, | ||
namespace: Optional[str] = None, | ||
verbose: bool = False, | ||
trace_id: Optional[str] = None, | ||
# Index creation params | ||
create_index: bool = False, | ||
NikolaosPapailiou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
index_type: str = "IVF_FLAT", | ||
index_creation_kwargs: Optional[Dict] = None, | ||
# DirectoryTextReader params | ||
include: str = "*", | ||
exclude: Optional[Sequence[str]] = ("[.]*", "*/[.]*"), | ||
suffixes: Optional[Sequence[str]] = None, | ||
max_files: Optional[int] = None, | ||
text_splitter: str = "RecursiveCharacterTextSplitter", | ||
text_splitter_kwargs: Optional[Dict] = None, | ||
# Embedding params | ||
embedding_class: str = "LangChainEmbedding", | ||
embedding_kwargs: Optional[Dict] = None, | ||
openai_key: Optional[str] = None, | ||
# Index update params | ||
index_timestamp: Optional[int] = None, | ||
workers: int = -1, | ||
worker_resources: Optional[Dict] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not plumbing all the different resource parameters, is there a reason? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean the vector indexing resources? These can be passed in |
||
worker_image: Optional[str] = None, | ||
extra_worker_modules: Optional[List[str]] = None, | ||
driver_resources: Optional[Dict] = None, | ||
driver_image: Optional[str] = None, | ||
extra_driver_modules: Optional[List[str]] = None, | ||
max_tasks_per_stage: int = -1, | ||
embeddings_generation_mode: dag.Mode = dag.Mode.LOCAL, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything must default to batch mode. Running this in local is unexpected. The goals are that like all other verticals we support and default to batch ingestion capabilities. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Document indexing has multiple execution steps that can be run in different modes:
The default configuration at the moment is:
This is expected to be a good default execution configuration for cost and latency even for sets of thousands of documents. Do you want all the execution steps to be executed in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The requirement again, as we discussed and as spelled out in the story is to have a robust batch mode ingestion that can scale to millions of documents. Local mode is a bad default and does not meet our intended goal, please change it and please be sure you actually test at scale. These issues are easy to see even running just our same test datasets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed There were some bugs that led to execution failure for this setup. This was addressed in TileDB-Inc/TileDB-Vector-Search#351 Also added tests for the BATCH execution in Cloud. |
||
embeddings_generation_driver_mode: dag.Mode = dag.Mode.LOCAL, | ||
vector_indexing_mode: dag.Mode = dag.Mode.LOCAL, | ||
index_update_kwargs: Optional[Dict] = None, | ||
): | ||
""" | ||
Ingest files into a vector search text index. | ||
|
||
:param file_dir_uri: directory of the files to be loaded. For individual files, | ||
also pass the `file_name` param. | ||
:param index_uri: URI of the vector index to load files to. | ||
:param file_name: Name of the file to be loaded. | ||
:param acn: Access Credentials Name (ACN) registered in TileDB Cloud (ARN type), | ||
defaults to None. | ||
:param config: config dictionary, defaults to None. | ||
:param environment_variables: Environment variables to use during ingestion. | ||
:param namespace: TileDB-Cloud namespace, defaults to None. | ||
:param verbose: verbose logging, defaults to False. | ||
:param trace_id: trace ID for logging, defaults to None. | ||
# Index creation params | ||
:param create_index: If true, creates a new vector search index. | ||
:param index_type: Vector search index type ("FLAT", "IVF_FLAT"). | ||
:param index_creation_kwargs: Arguments to be passed to the index creation | ||
method. | ||
# DirectoryTextReader params. | ||
:param include: File pattern to iclude relative to `file_dir_uri`. By default | ||
set to include all files. | ||
:param exclude: File patterns to exclude relative to `file_dir_uri`. By default | ||
set to ignore all hidden files. | ||
:param suffixes: Provide to keep only files with these suffixes | ||
Useful when wanting to keep files with different suffixes | ||
Suffixes must include the dot, e.g. ".txt" | ||
:param max_files: Maximum number of files to include. | ||
:param text_splitter_kwargs: Arguments for the splitter class. | ||
# SentenceTransformersEmbedding params. | ||
:param model_name_or_path: Huggingface SentenceTransformer model name or path | ||
# Index update params. | ||
:param index_timestamp: Timestamp to add index updates at. | ||
:param workers: If `embeddings_generation_mode=BATCH` this is the number of | ||
distributed workers to be used. | ||
:param worker_resources: If `embeddings_generation_mode=BATCH` this can be used | ||
to specify the worker resources. | ||
:param worker_image: If `embeddings_generation_mode=BATCH` this can be used | ||
to specify the worker Docker image. | ||
:param extra_worker_modules: If `embeddings_generation_mode=BATCH` this can be | ||
used to install extra pip package to the image. | ||
:param driver_resources: If `embeddings_generation_driver_mode=BATCH` this can | ||
be used to specify the driver resources. | ||
:param driver_image: If `embeddings_generation_driver_mode=BATCH` this can be | ||
used to specify the driver Docker image. | ||
:param extra_driver_modules: If `embeddings_generation_driver_mode=BATCH` this | ||
can be used to install extra pip package to the image. | ||
:param max_tasks_per_stage: Number of maximum udf tasks per computation stage. | ||
:param embeddings_generation_mode: TaskGraph execution mode for embeddings | ||
generation. | ||
:param embeddings_generation_driver_mode: TaskGraph execution mode for the | ||
ingestion driver. | ||
:param vector_indexing_mode: TaskGraph execution mode for the vector indexing. | ||
:param index_update_kwargs: Extra arguments to pass to the index update job. | ||
""" | ||
import tiledb | ||
import tiledb.vector_search.embeddings as embeddings_module | ||
from tiledb.vector_search.object_api import object_index | ||
from tiledb.vector_search.object_readers import DirectoryTextReader | ||
|
||
if environment_variables is None: | ||
environment_variables = {} | ||
if index_creation_kwargs is None: | ||
index_creation_kwargs = {} | ||
if index_update_kwargs is None: | ||
index_update_kwargs = {"files_per_partition": 100} | ||
if embedding_kwargs is None: | ||
embedding_kwargs = { | ||
"dimensions": 1536, | ||
"embedding_class": "OpenAIEmbeddings", | ||
"embedding_kwargs": { | ||
"model": "text-embedding-ada-002", | ||
}, | ||
} | ||
if text_splitter_kwargs is None: | ||
text_splitter_kwargs = { | ||
"chunk_size": 500, | ||
"chunk_overlap": 50, | ||
} | ||
if driver_resources is None: | ||
driver_resources = {"cpu": "2", "memory": "8Gi"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean worker or driver here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed |
||
|
||
reader = DirectoryTextReader( | ||
search_uri=file_dir_uri, | ||
include=file_name if file_name is not None else include, | ||
exclude=exclude, | ||
suffixes=suffixes, | ||
max_files=max_files, | ||
text_splitter=text_splitter, | ||
text_splitter_kwargs=text_splitter_kwargs, | ||
) | ||
|
||
if openai_key is not None: | ||
environment_variables["OPENAI_API_KEY"] = openai_key | ||
embedding_class_ = getattr(embeddings_module, embedding_class) | ||
embedding = embedding_class_(**embedding_kwargs) | ||
|
||
with tiledb.scope_ctx(config): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be done as the first stage in the graph, not local to the caller. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand this, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created an alternative version of the PR that uses an extra taskgraph and creates the dataset as a first node in the taskgraph #547 Is this what you are expecting the ingestion structure to look like? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Shelnutt2 is the alternative version matching your expectation? Let me know if you still have concerns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applied the alternative taskgraph structure in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Shelnutt2 this PR needs your approval to move forward. Let me know if you need any more changes. |
||
index_uri_exists = tiledb.object_type(index_uri) == "group" | ||
if create_index: | ||
if index_uri_exists: | ||
raise ValueError(f"{index_uri} allready exists and `create_index` was set.") | ||
NikolaosPapailiou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
index = object_index.create( | ||
uri=index_uri, | ||
index_type=index_type, | ||
object_reader=reader, | ||
embedding=embedding, | ||
config=config, | ||
environment_variables=environment_variables, | ||
**index_creation_kwargs, | ||
) | ||
else: | ||
if index_uri_exists: | ||
index = object_index.ObjectIndex( | ||
uri=index_uri, | ||
environment_variables=environment_variables, | ||
load_metadata_in_memory=False, | ||
memory_budget=1, | ||
) | ||
else: | ||
index = object_index.create( | ||
uri=index_uri, | ||
index_type=index_type, | ||
object_reader=reader, | ||
embedding=embedding, | ||
config=config, | ||
environment_variables=environment_variables, | ||
**index_creation_kwargs, | ||
) | ||
|
||
index.update_index( | ||
index_timestamp=index_timestamp, | ||
workers=workers, | ||
worker_resources=worker_resources, | ||
worker_image=worker_image, | ||
extra_worker_modules=extra_worker_modules, | ||
driver_resources=driver_resources, | ||
driver_image=driver_image, | ||
extra_driver_modules=extra_driver_modules, | ||
worker_access_credentials_name=acn, | ||
max_tasks_per_stage=max_tasks_per_stage, | ||
embeddings_generation_mode=embeddings_generation_mode, | ||
embeddings_generation_driver_mode=embeddings_generation_driver_mode, | ||
vector_indexing_mode=vector_indexing_mode, | ||
config=config, | ||
environment_variables=environment_variables, | ||
namespace=namespace, | ||
verbose=verbose, | ||
trace_id=trace_id, | ||
**index_update_kwargs, | ||
) | ||
|
||
|
||
ingest = as_batch(ingest_files_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not work as expected. Passing in a TileDB file URI get ignored. Please test this and add unit tests for the relevant cases. Currently this does not cover the required use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it is implemented atm this should be the group URI and we pick up the file from the group(applying regexp patterns if provided). What are the cases you are looking for supporting here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some testcases for this here https://github.com/TileDB-Inc/TileDB-Vector-Search/blob/main/apis/python/test/test_directory_reader.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requirements are to support TileDB FileStore files or a group of files. This has been a hard requirement from day one and is outlined in our planning document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
file_dir_uri
tosearch_uri
to be consistent with other ingestion jobs.search_uri
supports FileStore URIs. If we want to index one file using the FileStore URI we can pass it assearch_uri
. @Shelnutt2 is this covering your expectations of the function signature?