-
Notifications
You must be signed in to change notification settings - Fork 3
/
docs_loader.py
executable file
·273 lines (227 loc) · 10.8 KB
/
docs_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#!/usr/bin/env python3
import os
from dotenv import load_dotenv
import markdown
import chromadb
from chromadb.config import Settings
import glob
from typing import List
from multiprocessing import Pool
from tqdm import tqdm
import logging
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, filename='document_processing.log', filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
from langchain_community.document_loaders import (
UnstructuredFileLoader,
CSVLoader,
EverNoteLoader,
PyMuPDFLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
UnstructuredMarkdownLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
# Load environment variables
default_num_processes = os.getenv('DEFAULT_NUM_PROCESSES')
persist_directory = os.getenv('PERSISTENT_DATABASE', 'chroma_db')
anonymize_telemetry = os.getenv('ANONYMIZE_TELEMETRY', 'True') == 'True'
source_directory = os.getenv('SOURCE_DIRECTORY', 'private_documents')
embeddings_model_name = os.getenv('EMBEDDINGS_MODEL_NAME', 'sentence-transformers/all-MiniLM-L6-v2')
documents_batch_size = int(os.getenv('DOCUMENTS_BATCH_SIZE', 200))
embeddings_batch_size = int(os.getenv('BATCH_SIZE', 5461))
chunk_size = int(os.getenv('CHUNK_SIZE', 500))
chunk_overlap = int(os.getenv('CHUNK_OVERLAP', 3))
max_file_size_mb = int(os.getenv('MAX_FILE_SIZE_MB', 200))
# Define anonymize telemetry for Chroma DB
client = chromadb.Client(Settings(anonymized_telemetry=anonymize_telemetry))
def submit_embeddings_in_batches(embeddings, texts, metadatas, ids, chroma_collection, batch_size=embeddings_batch_size):
total_batches = (len(embeddings) + batch_size - 1) // batch_size
for i in tqdm(range(total_batches), desc="Submitting Embeddings", ncols=80):
start_idx = i * batch_size
end_idx = min(start_idx + batch_size, len(embeddings))
batch_embeddings = embeddings[start_idx:end_idx]
batch_texts = texts[start_idx:end_idx] if texts else None
batch_metadatas = metadatas[start_idx:end_idx] if metadatas else None
batch_ids = ids[start_idx:end_idx] if ids else None
chroma_collection.add_texts(texts=batch_texts, embeddings=batch_embeddings, metadatas=batch_metadatas, ids=batch_ids)
class ElmLoader(UnstructuredEmailLoader):
def load(self) -> List[UnstructuredFileLoader]:
try:
docs = super().load()
except ValueError as e:
if 'text/html content not found in email' in str(e):
self.unstructured_kwargs["content_source"] = "text/plain"
docs = super().load() # Retry loading with updated kwargs
else:
raise
except Exception as e:
raise type(e)(f"{self.file_path}: {e}") from e
return docs
# Map file extensions to document loaders and their arguments
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".eml": (ElmLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PyMuPDFLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
".md": (UnstructuredMarkdownLoader, {"encoding": "utf8"}),
# Add more mappings for other file extensions and loaders as needed
}
# Create console handler and set level to warning, so we can scare the cats
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# Add the handler to the root logger
logging.getLogger('').addHandler(console_handler)
def validate_file(file_path: str) -> bool:
if not os.path.exists(file_path):
logging.warning(f"File does not exist: {file_path}")
return False
file_size = os.path.getsize(file_path)
# Check if the file is an empty promise
if file_size == 0:
logging.warning(f"Skipping empty file: {file_path}")
return False
# Check for file size, just in case...
max_file_size_bytes = max_file_size_mb * 1024 * 1024
if file_size > max_file_size_bytes:
logging.warning(f"File too large (over {max_file_size_mb}MB): {file_path}")
return False
file_extension = os.path.splitext(file_path)[1].lower()
# Ensure the file extension is supported
if file_extension not in LOADER_MAPPING:
logging.warning(f"Unsupported file extension '{file_extension}' for file: {file_path}")
return False
return True
def load_single_document(file_path: str) -> List[UnstructuredFileLoader]:
ext = "." + file_path.rsplit(".", 1)[-1]
if ext in LOADER_MAPPING:
if not validate_file(file_path):
log_message = f"Skipped the loading of invalid file: {file_path}"
print(log_message)
logging.warning(log_message)
return []
loader_class, loader_args = LOADER_MAPPING[ext]
try:
loader = loader_class(file_path, **loader_args)
return loader.load()
except Exception as e:
log_message = f"Error loading file {file_path}: {e}"
print(log_message)
logging.error(log_message, exc_info=True)
return []
log_message = f"Unsupported file extension '{ext}' for file: {file_path}"
print(log_message)
logging.warning(log_message)
return []
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[UnstructuredFileLoader]:
all_files = []
for ext in LOADER_MAPPING:
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
)
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
# keep track of the definition source
process_source = ""
if default_num_processes and default_num_processes.isdigit():
num_processes = int(default_num_processes)
process_source = ".env"
else:
num_processes = os.cpu_count()
process_source = "CPU discovery"
# Display the number of processes used and their definition source
print(f"Number of processes used: {num_processes} (defined by {process_source})")
# Create a Pool with the determined number of processes
with Pool(processes=num_processes) as pool:
results = []
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
results.extend(docs)
pbar.update()
return results
def process_documents(ignored_files: List[str] = []) -> List[UnstructuredFileLoader]:
"""
Load documents and split in chunks
"""
print(f"Loading documents from {source_directory}")
documents = load_documents(source_directory, ignored_files)
if not documents:
print("No new documents to load")
exit(0)
print(f"Loaded {len(documents)} new documents from {source_directory}")
# Adjust as needed
document_batches = [documents[i:i+documents_batch_size] for i in range(0, len(documents), documents_batch_size)]
# Process each batch of documents
processed_texts = []
for batch in tqdm(document_batches, desc='Processing document batches', ncols=80):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(batch)
processed_texts.extend(texts)
print(f"Split into {len(processed_texts)} chunks of text (max. {chunk_size} tokens each)")
return processed_texts
# check if Chroma store exists
def does_vectorstore_exist(persist_directory: str) -> bool:
return os.path.exists(os.path.join(persist_directory, 'chroma.sqlite3'))
def main():
# Check if the source_directory exists and is accessible
if not os.path.isdir(source_directory):
logging.error(f"source_directory does not exist: {source_directory}")
print(f"Error: Documents directory does not exist: {source_directory}")
exit(1)
elif not os.access(source_directory, os.R_OK):
logging.error(f"source_directory is not accessible: {source_directory}")
print(f"Error: Documents directory is not accessible: {source_directory}")
exit(1)
# Create embeddings
embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
if does_vectorstore_exist(persist_directory):
# Update and store locally vectorstore
print(f"Appending existing vectorstore to {persist_directory}")
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
collection = db.get()
texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
print(f"Creating embeddings. Please wait...")
# Calculate the total number of batches
total_batches = (len(texts) + embeddings_batch_size - 1) // embeddings_batch_size
# Process texts in batches with tqdm for progress tracking
for i in tqdm(range(total_batches), desc="Creating embeddings", ncols=80):
batch_texts = texts[i*embeddings_batch_size : (i+1)*embeddings_batch_size]
db.add_documents(batch_texts)
else:
# Create and store locally vectorstore
print(f"Creating new vectorstore in {persist_directory}")
texts = process_documents()
print(f"Creating embeddings. Please wait...")
# Calculate the total number of batches
total_batches = (len(texts) + embeddings_batch_size - 1) // embeddings_batch_size
# Initialize db outside of the loop to avoid reinitializing it for each batch
db = None
# Process texts in batches with tqdm for progress tracking
for i in tqdm(range(total_batches), desc="Creating embeddings", ncols=80):
batch_texts = texts[i*embeddings_batch_size : (i+1)*embeddings_batch_size]
if db is None:
db = Chroma.from_documents(batch_texts, embeddings, persist_directory=persist_directory)
else:
db.add_documents(batch_texts)
print(f"Documents are ready! You can now run vaultChat.py to query your model with your private documents")
if __name__ == "__main__":
main()