/
vectorstore.py
148 lines (115 loc) · 5.91 KB
/
vectorstore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright [2024] [Holosun ApS]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from ..logging import setup_logging
from ..utils.config import load_config_key
logging = setup_logging()
def load_memories(vector_name):
"""This function loads memory settings for a given vector name from the 'config/llm_config.yaml' file and returns the memory settings for the vector name. If no memory settings are found, it returns None."""
def pick_vectorstore(vs_str, vector_name, embeddings):
"""This function picks a vector store based on the 'vs_str' parameter. It currently supports 'supabase', 'cloudsql', 'alloydb', and 'lancedb'."""
logging.debug('Picking vectorstore')
if vs_str == 'supabase':
from supabase import Client, create_client
from langchain.vectorstores import SupabaseVectorStore
from ..database.database import setup_supabase
logging.debug(f"Initiating Supabase store: {vector_name}")
setup_supabase(vector_name)
# init embedding and vector store
supabase_url = os.getenv('SUPABASE_URL')
supabase_key = os.getenv('SUPABASE_KEY')
logging.debug(f"Supabase URL: {supabase_url} vector_name: {vector_name}")
supabase: Client = create_client(supabase_url, supabase_key)
vectorstore = SupabaseVectorStore(supabase,
embeddings,
table_name=vector_name,
query_name=f'match_documents_{vector_name}')
logging.debug("Chose Supabase")
return vectorstore
elif vs_str == 'cloudsql':
from langchain.vectorstores.pgvector import PGVector
logging.debug("Inititaing CloudSQL pgvector")
#setup_cloudsql(vector_name)
# https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/pgvector
CONNECTION_STRING = os.environ.get("PGVECTOR_CONNECTION_STRING")
# postgresql://brainuser:password@10.24.0.3:5432/brain
from ..database.database import get_vector_size
vector_size = get_vector_size(vector_name)
os.environ["PGVECTOR_VECTOR_SIZE"] = str(vector_size)
vectorstore = PGVector(connection_string=CONNECTION_STRING,
embedding_function=embeddings,
collection_name=vector_name,
#pre_delete_collection=True # for testing purposes
)
logging.debug("Chose CloudSQL")
return vectorstore
def pick_retriever(vector_name, embeddings=None):
"""This function creates a list of retrievers based on the memory settings for a given vector name and returns a ContextualCompressionRetriever object. If no retrievers are created, it returns None. The function takes a vector name and an optional embeddings parameter."""
elif vs_str == 'alloydb':
from langchain_google_alloydb_pg import AlloyDBEngine, AlloyDBVectorStore
from google.cloud.alloydb.connector import IPTypes
from ..database.alloydb import create_alloydb_table, create_alloydb_engine
alloydb_config = load_config_key(
'alloydb_config',
vector_name=vector_name,
filename = "config/llm_config.yaml"
)
if alloydb_config is None:
logging.error("No alloydb_config was found")
engine = create_alloydb_engine(alloydb_config, vector_name)
create_alloydb_table(vector_name, engine)
logging.info("Chose AlloyDB")
vectorstore = AlloyDBVectorStore.create_sync(
engine=engine,
table_name=vector_name,
embedding_service=embeddings,
metadata_columns=["source"]
#metadata_columns=["source", "eventTime"]
)
return vectorstore
elif vs_str == "lancedb":
from ..patches.langchain.lancedb import LanceDB
import lancedb
LANCEDB_BUCKET = os.environ.get("LANCEDB_BUCKET")
if LANCEDB_BUCKET is None:
logging.error(f"Could not locate LANCEDB_BUCKET environment variable for {vector_name}")
logging.info(f"LANCEDB_BUCKET environment variable found for {vector_name} - {LANCEDB_BUCKET}")
db = lancedb.connect(LANCEDB_BUCKET)
logging.info(f"LanceDB Tables: {db.table_names()} using {LANCEDB_BUCKET}")
logging.info(f"Opening LanceDB table: {vector_name} using {LANCEDB_BUCKET}")
try:
table = db.open_table(vector_name)
except FileNotFoundError as err:
logging.info(f"{err} - Could not open table for {vector_name} - creating new table")
init = f"Creating new table for {vector_name}"
table = db.create_table(
vector_name,
data=[
{
"vector": embeddings.embed_query(init),
"text": init,
"id": "1",
}
],
mode="overwrite",
)
logging.info(f"Inititaing LanceDB object for {vector_name} using {LANCEDB_BUCKET}")
vectorstore = LanceDB(
connection=table,
embedding=embeddings,
)
logging.info(f"Chose LanceDB for {vector_name} using {LANCEDB_BUCKET}")
return vectorstore
else:
raise NotImplementedError(f'No llm implemented for {vs_str}')