-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
394 lines (333 loc) · 15.5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
import re
import os
import requests
from enum import Enum
from dotenv import load_dotenv
from llama_index.core import (VectorStoreIndex, SimpleDirectoryReader, get_response_synthesizer, Settings)
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import CodeSplitter, SemanticSplitterNodeParser, JSONNodeParser
from llama_index.readers.web import BeautifulSoupWebReader, WholeSiteReader
from llama_index.readers.json import JSONReader
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.llms.openai import OpenAI
from llama_index.llms.anthropic import Anthropic
from llama_index.llms.mistralai import MistralAI
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform,
)
from llama_index.core.query_engine import MultiStepQueryEngine
from qdrant_client import QdrantClient
from llama_index.core import PromptTemplate
from mail_manager import *
import tempfile
from typing import Any, Dict, Optional, Union, List, Optional
# Load environment variables
load_dotenv()
class LLMSource(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
MISTRAL = "mistral"
LOCAL = "local"
class RAGTool:
"""
A tool for Retrieval-Augmented Generation (RAG) which initializes different language models,
sets up query engines, and ingests documents to be searchable.
"""
def __init__(self, directory: str, model_name: str = "BAAI/bge-small-en-v1.5", llm_source: str = "local") -> None:
"""
Initializes the RAGTool with the specified directory, model, and language model source.
:param directory: The directory where the documents are stored.
:param model_name: The name of the model to use for embeddings.
:param llm_source: The source of the language model, can be 'openai', 'anthropic', 'mistral', or 'local'.
"""
self._directory = directory
self._model_name = model_name
self._client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))
self._documents = []
self._query_engines: Dict[str, RetrieverQueryEngine] = {}
self._llm_source = llm_source
self._llm = self.initialize_llm()
self._embed_model = self.initialize_embed_model()
self._document_type: Optional[str] = None
def initialize_llm(self) -> Any:
"""
Initializes the language model based on the _llm_source attribute.
:return: An instance of the language model.
"""
if self._llm_source == "openai":
llm = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview")
elif self._llm_source == "anthropic":
llm = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229")
elif self._llm_source == "mistral":
# Assuming MistralAI is a class similar to OpenAI and Anthropic
llm = MistralAI(api_key=os.getenv("MISTRAL_API_KEY"))
elif self._llm_source == "local":
llm = OpenAI(base_url="http://localhost:1234/v1", api_base="http://localhost:1234/v1", api_key="not-needed")
else:
raise ValueError(f"Unsupported LLM source: {self._llm_source}")
Settings.llm = llm
return llm
def file_mapping(self, type: str) -> str:
"""
Maps a document type to its corresponding file extension.
:param type: The type of document (e.g., 'javascript', 'python').
:return: The file extension for the given document type.
"""
language_map = {
'javascript':'.js',
'python':'.py',
'pdf':'.pdf',
'docx':'.docx',
'csv':'.csv'
}
return language_map[type]
def initialize_embed_model(self) -> HuggingFaceEmbedding:
"""
Initializes the embedding model using HuggingFaceEmbedding.
:return: An instance of HuggingFaceEmbedding.
"""
embed_model = HuggingFaceEmbedding(model_name=self._model_name)
Settings.embed_model = embed_model
return embed_model
@property
def directory(self) -> str:
"""
Gets the directory of the RAGTool.
:return: The directory as a string.
"""
return self._directory
@directory.setter
def directory(self, directory: str) -> None:
"""
Sets the directory of the RAGTool.
:param directory: The directory as a string.
"""
self._directory = directory
@property
def document_type(self):
"""
Gets the document_type of the RAGTool.
:return: The document_type as a string.
"""
return self._document_type
@document_type.setter
def document_type(self, document_type):
"""
Sets the document_type of the RAGTool.
:param document_type: The document_type as a string.
"""
self._document_type = document_type
@property
def documents(self):
"""
Gets the documents of the RAGTool.
:return: The documents as a LlamaIndex Document.
"""
return self._documents
@documents.setter
def documents(self, documents):
"""
Sets the documents of the RAGTool.
:param documents: The documents as a LlamaIndex Document.
"""
self._documents = documents
def create_temp_json_file(self, json_data: Any) -> str:
"""
Creates a temporary JSON file with the provided JSON data.
:param json_data: The JSON data to be written to the file.
:return: The file path of the created temporary JSON file.
"""
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Define the file path
file_path = os.path.join(temp_dir, 'data.json')
# Write the JSON data to the file
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False)
return file_path
def process_json_data_with_reader(self, json_data: Any) -> List:
"""
Processes JSON data by reading it through a JSONReader, and optionally cleans up the created temporary files.
:param json_data: The JSON data to be processed.
:return: A list of document nodes obtained from processing the JSON data.
"""
# First, create a temporary JSON file
json_file_path = self.create_temp_json_file(json_data)
# Initialize your JSONReader (with desired configuration)
json_reader = JSONReader(levels_back=1, collapse_length=100, ensure_ascii=False)
# Process the data
documents = json_reader.load_data(json_file_path)
# Optionally, cleanup the temporary file and directory if desired
os.remove(json_file_path)
os.rmdir(os.path.dirname(json_file_path))
return documents
def _load_documents(self, document_type: str, directory: Optional[str] = None, crawl_depth: int = 0, after: Optional[datetime] = None) -> None:
"""
Loads documents based on the specified document type and additional parameters for document sourcing.
:param document_type: The type of documents to load ('web', 'email', or others).
:param directory: Optional directory or URL from where to load the documents.
:param crawl_depth: The depth for web crawling, applicable for web documents.
:param after: The datetime for fetching emails after this time, applicable for emails.
"""
if document_type == "web":
if directory:
url = directory
self.directory = url
loader = BeautifulSoupWebReader()
documents = self.clean_documents(loader.load_data(urls=[url]))
# print(documents[0])
# loader = WholeSiteReader(
# prefix=url,
# max_depth=crawl_depth
# )
# documents = self.clean_documents(loader.load_data(base_url=url))
self.documents = documents
elif document_type == "email":
self.document_type = "email"
new_emails = fetch_latest_emails(after)
documents = self.clean_documents(self.process_json_data_with_reader(new_emails))
self.documents = documents
else:
if directory:
self.directory = directory
file_extension = self.file_mapping(document_type)
reader = SimpleDirectoryReader(self.directory, recursive=True, required_exts=[file_extension])
self.documents = self.clean_documents(reader.load_data())
def clean_up_text(self, content: str) -> str:
"""
Cleans up the text content by removing unnecessary characters and formatting.
:param content: The original text content.
:return: The cleaned-up text content.
"""
# cleans up line breaks, etc.
content = re.sub(r'(\w+)-\n(\w+)', r'\1\2', content)
unwanted_patterns = ["\\n", " —", "——————————", "—————————", "—————", r'\\u[\dA-Fa-f]{4}', r'\uf075', r'\uf0b7']
for pattern in unwanted_patterns:
content = re.sub(pattern, "", content)
content = re.sub(r'(\w)\s*-\s*(\w)', r'\1-\2', content)
content = re.sub(r'\s+', ' ', content)
return content
def clean_documents(self, documents: List) -> List:
"""
Processes a list of documents to clean up the text content.
:param documents: A list of document nodes.
:return: A list of cleaned document nodes.
"""
# process all document context to clean up line breaks and wasteful character patterns
cleaned_docs = []
for d in documents:
cleaned_text = self.clean_up_text(d.text)
d.text = cleaned_text
cleaned_docs.append(d)
return cleaned_docs
@property
def node_parser(self):
"""
Gets the node parser of the RAGTool.
:return: The node parser as a NodeParser.
"""
return self._node_parser
@node_parser.setter
def node_parser(self, document_type):
"""
Sets the node parser of the RAGTool as a CodeSplitter, JSONNodeParser, or SemanticSplitterNodeParser (inherits from NodeParser)
:param document_type: The node parser as a string.
"""
if document_type == "javascript":
node_parser = CodeSplitter(
language="javascript",
chunk_lines=40,
chunk_lines_overlap=15,
max_chars=1500,
)
self._node_parser = node_parser
Settings.node_parser = node_parser
if document_type == "email":
# JSONNodeParser does not require specific initialization for emails
node_parser = JSONNodeParser()
self._node_parser = node_parser
Settings.node_parser = node_parser
if document_type == "python":
node_parser = CodeSplitter(
language="python",
chunk_lines=40,
chunk_lines_overlap=15,
max_chars=1500,
)
self._node_parser = node_parser
Settings.node_parser = node_parser
if document_type == "text":
node_parser = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=self.embed_model
)
self._node_parser = node_parser
Settings.node_parser = node_parser
if document_type == "web":
node_parser = CodeSplitter(
language="html",
chunk_lines=40,
chunk_lines_overlap=15,
max_chars=1500,
)
self._node_parser = node_parser
Settings.node_parser = node_parser
def get_or_create_query_engine(self, document_type, vector_store=None):
if document_type not in self._query_engines:
self.setup_query_engine(document_type, vector_store)
return self._query_engines[document_type]
def setup_query_engine(self, document_type: str, vector_store: Optional[QdrantVectorStore] = None) -> None:
"""
Sets up a query engine for a specific document type.
:param document_type: The type of document the query engine will be used for.
:param vector_store: An optional instance of QdrantVectorStore to be used with the query engine.
"""
if document_type not in self._query_engines:
if vector_store == None:
vector_store = QdrantVectorStore(client=self._client, collection_name=document_type)
vector_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
retriever = VectorIndexRetriever(
index=vector_index,
similarity_top_k=5
)
response_synthesizer = get_response_synthesizer(response_mode="tree_summarize")
query_engine = RetrieverQueryEngine(retriever=retriever, response_synthesizer=response_synthesizer)
self._query_engines[document_type] = query_engine
def run_pipeline(self, document_type: str, directory: Optional[str] = None, crawl_depth: int = 0, after: Optional[datetime] = None) -> None:
"""
Runs the ingestion pipeline for a specific document type.
:param document_type: The type of documents to process.
:param directory: Optional directory containing documents to process.
:param crawl_depth: The depth for web crawling (applicable for web documents).
:param after: The datetime for fetching emails after this time (applicable for emails).
"""
self._load_documents(document_type, directory, crawl_depth, after)
self.node_parser = self.document_type
# if document_type == "email":
# parser = self.node_parser
# self.documents = parser.get_nodes_from_documents(self.documents)
# documents_to_ingest = self.clean_documents() added cleaning to loading so stored documents are cleaned already
vector_store = QdrantVectorStore(client=self._client, collection_name=document_type)
pipeline = IngestionPipeline(vector_store=vector_store)
pipeline.run(documents=self.documents, num_workers=2)
self.get_or_create_query_engine(document_type, vector_store)
def query(self, query_text, document_type):
query_engine = self.get_or_create_query_engine(document_type)
response = query_engine.query(query_text)
return response
def initialize_rag_tool(directory: str = "", llm_source: str = "openai") -> RAGTool:
"""
Initializes and returns an instance of the RAGTool class.
:param directory: The directory where the documents are stored.
:param llm_source: The source of the language model, can be 'openai', 'anthropic', 'mistral', or 'local'.
:return: An instance of RAGTool.
"""
# Initialize and configure the RAGTool instance
rag_tool = RAGTool(directory=directory, llm_source=llm_source)
# You can run any initial setup here if necessary
return rag_tool
if __name__ == '__main__':
# For example, a demonstration of the tool's functionality
rag_tool_demo = initialize_rag_tool()