diff --git a/api/delete.py b/api/delete.py index 4c22f66a..53f5d40e 100644 --- a/api/delete.py +++ b/api/delete.py @@ -2,14 +2,18 @@ from models.delete import RequestPayload, ResponsePayload from service.vector_database import VectorService, get_vector_service +from service.embedding import get_encoder router = APIRouter() @router.delete("/delete", response_model=ResponsePayload) async def delete(payload: RequestPayload): + encoder = get_encoder(encoder_type=payload.encoder) vector_service: VectorService = get_vector_service( - index_name=payload.index_name, credentials=payload.vector_database + index_name=payload.index_name, + credentials=payload.vector_database, + encoder=encoder, ) - await vector_service.delete(file_url=payload.file_url) - return {"success": True} + data = await vector_service.delete(file_url=payload.file_url) + return {"success": True, "data": data} diff --git a/api/ingest.py b/api/ingest.py index f8046489..3fa2ce10 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -17,15 +17,15 @@ async def ingest(payload: RequestPayload) -> Dict: index_name=payload.index_name, vector_credentials=payload.vector_database, ) - documents = await embedding_service.generate_documents() + chunks = await embedding_service.generate_chunks() encoder = get_encoder(encoder_type=payload.encoder) summary_documents = await embedding_service.generate_summary_documents( - documents=documents + documents=chunks ) await asyncio.gather( embedding_service.generate_embeddings( - documents=documents, encoder=encoder, index_name=payload.index_name + documents=chunks, encoder=encoder, index_name=payload.index_name ), embedding_service.generate_embeddings( documents=summary_documents, diff --git a/api/query.py b/api/query.py index a16b5498..68f777f2 100644 --- a/api/query.py +++ b/api/query.py @@ -1,6 +1,6 @@ from fastapi import APIRouter -from models.query import RequestPayload, ResponsePayload +from models.query import RequestPayload, ResponseData, ResponsePayload from service.router import query as _query router = APIRouter() @@ -8,5 +8,11 @@ @router.post("/query", response_model=ResponsePayload) async def query(payload: RequestPayload): - output = await _query(payload=payload) - return {"success": True, "data": output} + chunks = await _query(payload=payload) + response_data = [ + ResponseData( + content=chunk.content, doc_url=chunk.doc_url, page_label=chunk.page_number + ) + for chunk in chunks + ] + return {"success": True, "data": response_data} diff --git a/dev/embedding.ipynb b/dev/embedding.ipynb index 104d9cc7..1c8e9890 100644 --- a/dev/embedding.ipynb +++ b/dev/embedding.ipynb @@ -17,7 +17,7 @@ "\n", "file = File(\n", " type=FileType.pdf,\n", - " url=\"https://arxiv.org/pdf/2402.05131.pdf\"\n", + " url=\"https://arxiv.org/pdf/2210.03629.pdf\"\n", ")\n", "vector_credentials = {\n", " \"type\": \"pinecone\",\n", @@ -40,7 +40,7 @@ "metadata": {}, "outputs": [], "source": [ - "docs = await embedding_service.generate_documents()" + "docs = await embedding_service.generate_chunks()" ] }, { @@ -49,7 +49,7 @@ "metadata": {}, "outputs": [], "source": [ - "chunks = await embedding_service.generate_chunks(docs)" + "texts = [doc.content for doc in docs]" ] }, { @@ -62,11 +62,11 @@ "\n", "concatenated_document = \"\"\n", "\n", - "for i, chunk in enumerate(chunks):\n", + "for i, chunk in enumerate(texts):\n", " color = colors[i % len(colors)]\n", - " colored_text = colored(chunk.text, color)\n", + " colored_text = colored(chunk, color)\n", " print(colored_text)\n", - " concatenated_document += chunk.text + \" \"\n", + " concatenated_document += chunk + \" \"\n", "\n", "print(\"\\nConcatenated Document:\\n\", concatenated_document)" ] diff --git a/dev/walkthrough.ipynb b/dev/walkthrough.ipynb index a768a172..0c4f35de 100644 --- a/dev/walkthrough.ipynb +++ b/dev/walkthrough.ipynb @@ -35,7 +35,7 @@ " \"files\": [\n", " {\n", " \"type\": \"PDF\",\n", - " \"url\": \"https://arxiv.org/pdf/2402.05131.pdf\"\n", + " \"url\": \"https://arxiv.org/pdf/2210.03629.pdf\"\n", " }\n", " ],\n", " \"vector_database\": {\n", @@ -46,7 +46,7 @@ " }\n", " },\n", " \"index_name\": PINECONE_INDEX,\n", - " \"encoder\": \"openai\",\n", + " \"encoder\": \"cohere\",\n", "}\n", "\n", "response = requests.post(url, json=payload)\n", @@ -64,7 +64,7 @@ "query_url = f\"{API_URL}/api/v1/query\"\n", "\n", "query_payload = {\n", - " \"input\": \"What is the best chunk strategy?\",\n", + " \"input\": \"What is CoT?\",\n", " \"vector_database\": {\n", " \"type\": \"pinecone\",\n", " \"config\": {\n", @@ -73,12 +73,22 @@ " }\n", " },\n", " \"index_name\": PINECONE_INDEX,\n", - " \"encoder\": \"openai\",\n", + " \"encoder\": \"cohere\",\n", "}\n", "\n", "query_response = requests.post(query_url, json=query_payload)\n", "\n", - "print(query_response.json())\n" + "print(query_response.json())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data = query_response.json().get('data', [])\n", + "data" ] }, { @@ -91,7 +101,7 @@ "query_url = f\"{API_URL}/api/v1/delete\"\n", "\n", "delete_payload = {\n", - " \"file_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"file_url\": \"https://arxiv.org/pdf/2210.03629.pdf\",\n", " \"vector_database\": {\n", " \"type\": \"pinecone\",\n", " \"config\": {\n", @@ -100,6 +110,7 @@ " }\n", " },\n", " \"index_name\": PINECONE_INDEX,\n", + " \"encoder\": \"cohere\",\n", "}\n", "\n", "delete_response = requests.delete(query_url, json=delete_payload)\n", diff --git a/encoders/cohere.py b/encoders/cohere.py index f4507199..76209ba1 100644 --- a/encoders/cohere.py +++ b/encoders/cohere.py @@ -9,6 +9,7 @@ class CohereEncoder(BaseEncoder): client: Optional[cohere.Client] = None type: str = "cohere" + dimension: int = 1024 # https://docs.cohere.com/reference/embed def __init__( self, diff --git a/models/delete.py b/models/delete.py index 2dccf200..9a10e415 100644 --- a/models/delete.py +++ b/models/delete.py @@ -1,4 +1,5 @@ from pydantic import BaseModel +from models.ingest import EncoderEnum from models.vector_database import VectorDatabase @@ -7,7 +8,9 @@ class RequestPayload(BaseModel): index_name: str file_url: str vector_database: VectorDatabase + encoder: EncoderEnum class ResponsePayload(BaseModel): success: bool + data: dict = {} diff --git a/models/document.py b/models/document.py index 8027e325..71bd7207 100644 --- a/models/document.py +++ b/models/document.py @@ -1,8 +1,14 @@ from pydantic import BaseModel -class Document(BaseModel): +class BaseDocument(BaseModel): id: str - text: str - file_url: str + content: str + doc_url: str metadata: dict | None = None + + +class BaseDocumentChunk(BaseDocument): + document_id: str + page_number: str = "" + dense_embedding: list[float] | None = None diff --git a/models/query.py b/models/query.py index 6910d28f..301a4b35 100644 --- a/models/query.py +++ b/models/query.py @@ -15,7 +15,7 @@ class RequestPayload(BaseModel): class ResponseData(BaseModel): content: str - file_url: str + doc_url: str page_label: Optional[str] diff --git a/service/embedding.py b/service/embedding.py index e99eacb5..e4d9b74c 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -1,5 +1,6 @@ import asyncio import copy +import uuid from tempfile import NamedTemporaryFile from typing import Any, List, Optional @@ -11,10 +12,11 @@ import encoders from encoders import BaseEncoder -from models.document import Document +from models.document import BaseDocument, BaseDocumentChunk from models.file import File from models.ingest import EncoderEnum from service.vector_database import get_vector_service +from utils.logger import logger from utils.summarise import completion @@ -38,76 +40,175 @@ def _get_datasource_suffix(self, type: str) -> str: except KeyError: raise ValueError("Unsupported datasource type") - async def generate_documents(self) -> List[Document]: - documents = [] - for file in tqdm(self.files, desc="Generating documents"): - suffix = self._get_datasource_suffix(file.type.value) - with NamedTemporaryFile(suffix=suffix, delete=True) as temp_file: - with requests.get(url=file.url) as response: # Add context manager here - temp_file.write(response.content) - temp_file.flush() - elements = partition(file=temp_file, include_page_breaks=True) + async def _download_and_extract_elements( + self, file, strategy="hi_res" + ) -> List[Any]: + """ + Downloads the file and extracts elements using the partition function. + Returns a list of unstructured elements. + """ + logger.info( + f"Downloading and extracting elements from {file.url}, " + f"using `{strategy}` strategy" + ) + suffix = self._get_datasource_suffix(file.type.value) + with NamedTemporaryFile(suffix=suffix, delete=True) as temp_file: + with requests.get(url=file.url) as response: + temp_file.write(response.content) + temp_file.flush() + elements = partition( + file=temp_file, include_page_breaks=True, strategy=strategy + ) + return elements + + async def generate_document( + self, file: File, elements: List[Any] + ) -> BaseDocument | None: + logger.info(f"Generating document from {file.url}") + try: + doc_content = "".join(element.text for element in elements) + if not doc_content: + logger.error(f"Cannot extract text from {file.url}") + return None + doc_metadata = { + "source": file.url, + "source_type": "document", + "document_type": self._get_datasource_suffix(file.type.value), + } + return BaseDocument( + id=f"doc_{uuid.uuid4()}", + content=doc_content, + doc_url=file.url, + metadata=doc_metadata, + ) + except Exception as e: + logger.error(f"Error loading document {file.url}: {e}") + + async def generate_summary_document( + self, documents: List[BaseDocument] + ) -> List[BaseDocument]: + pbar = tqdm(total=len(documents), desc="Summarizing documents") + pages = {} + for document in documents: + page_number = document.metadata.get("page_number") + if page_number not in pages: + doc = copy.deepcopy(document) + doc.text = await completion(document=doc) + pages[page_number] = doc + else: + pages[page_number].text += document.text + pbar.update() + pbar.close() + summary_documents = list(pages.values()) + return summary_documents + + async def generate_chunks(self) -> List[BaseDocumentChunk]: + doc_chunks = [] + for file in tqdm(self.files, desc="Generating chunks"): + try: + elements = await self._download_and_extract_elements(file) + document = await self.generate_document(file, elements) + if not document: + continue chunks = chunk_by_title(elements) for chunk in chunks: - documents.append( - Document( - id=file.url, - text=chunk.text, - file_url=file.url, - metadata={**chunk.metadata.to_dict()}, + # Ensure all metadata values are of a type acceptable to Pinecone + sanitized_metadata = { + key: ( + value + if isinstance(value, (str, int, float, bool, list)) + else str(value) + ) + for key, value in chunk.metadata.to_dict().items() + } + chunk_id = f"chk_{uuid.uuid4()}" + doc_chunks.append( + BaseDocumentChunk( + id=chunk_id, + document_id=document.id, + content=chunk.text, + doc_url=file.url, + metadata={ + "chunk_id": chunk_id, + "document_id": document.id, + "source": file.url, + "source_type": "document", + "document_type": self._get_datasource_suffix( + file.type.value + ), + "content": chunk.text, + **sanitized_metadata, + }, ) ) - return documents + except Exception as e: + logger.error(f"Error loading chunks from {file.url}: {e}") + return doc_chunks async def generate_embeddings( self, - documents: List[Document], + documents: List[BaseDocumentChunk], encoder: BaseEncoder, index_name: Optional[str] = None, ) -> List[tuple[str, list, dict[str, Any]]]: pbar = tqdm(total=len(documents), desc="Generating embeddings") - async def generate_embedding(document: Document): + async def safe_generate_embedding(document: BaseDocument): + try: + return await generate_embedding(document) + except Exception as e: + logger.error(f"Error embedding document {document.id}: {e}") + return None + + async def generate_embedding(document: BaseDocument): if document is not None: embeddings: List[np.ndarray] = [ - np.array(e) for e in encoder([document.text]) + np.array(e) for e in encoder([document.content]) ] + + logger.info(f"Embedding: {document.id}, metadata: {document.metadata}") embedding = ( document.id, embeddings[0].tolist(), - { - **document.metadata, - "content": document.text, - }, + document.metadata, ) pbar.update() return embedding - tasks = [generate_embedding(document) for document in documents] - embeddings = await asyncio.gather(*tasks) + tasks = [safe_generate_embedding(document) for document in documents] + embeddings = await asyncio.gather(*tasks, return_exceptions=False) pbar.close() - vector_service = get_vector_service( - index_name=index_name or self.index_name, - credentials=self.vector_credentials, - encoder=encoder, - ) - await vector_service.upsert(embeddings=[e for e in embeddings if e is not None]) - return [e for e in embeddings if e is not None] + # Filter out None values which indicate failed tasks + embeddings = [e for e in embeddings if e is not None] + + if embeddings: + vector_service = get_vector_service( + index_name=index_name or self.index_name, + credentials=self.vector_credentials, + encoder=encoder, + ) + try: + await vector_service.upsert(embeddings=embeddings) + except Exception as e: + logger.error(f"Error upserting embeddings: {e}") + raise Exception(f"Error upserting embeddings: {e}") + return embeddings + # TODO: Do we summarize the documents or chunks here? async def generate_summary_documents( - self, documents: List[Document] - ) -> List[Document]: + self, documents: List[BaseDocumentChunk] + ) -> List[BaseDocumentChunk]: pbar = tqdm(total=len(documents), desc="Summarizing documents") pages = {} for document in documents: - page_number = document.metadata.get("page_number") + page_number = document.page_number if page_number not in pages: doc = copy.deepcopy(document) - doc.text = await completion(document=doc) + doc.content = await completion(document=doc) pages[page_number] = doc else: - pages[page_number].text += document.text + pages[page_number].content += document.content pbar.update() pbar.close() summary_documents = list(pages.values()) diff --git a/service/router.py b/service/router.py index 82109017..0f90b413 100644 --- a/service/router.py +++ b/service/router.py @@ -1,13 +1,13 @@ -from typing import List - from decouple import config from semantic_router.encoders import CohereEncoder from semantic_router.layer import RouteLayer from semantic_router.route import Route +from models.document import BaseDocumentChunk from models.query import RequestPayload from service.embedding import get_encoder from service.vector_database import VectorService, get_vector_service +from utils.logger import logger def create_route_layer() -> RouteLayer: @@ -30,25 +30,25 @@ def create_route_layer() -> RouteLayer: async def get_documents( *, vector_service: VectorService, payload: RequestPayload -) -> List: - chunks = await vector_service.query(input=payload.input, top_k=4) - documents = await vector_service.convert_to_rerank_format(chunks=chunks) +) -> list[BaseDocumentChunk]: + chunks = await vector_service.query(input=payload.input, top_k=25) - if len(documents): - documents = await vector_service.rerank( - query=payload.input, documents=documents - ) - return documents + if not len(chunks): + logger.error(f"No documents found for query: {payload.input}") + return [] + + reranked_chunks = await vector_service.rerank(query=payload.input, documents=chunks) + return reranked_chunks -async def query(payload: RequestPayload) -> List: +async def query(payload: RequestPayload) -> list[BaseDocumentChunk]: rl = create_route_layer() decision = rl(payload.input).name encoder = get_encoder(encoder_type=payload.encoder) if decision == "summarize": vector_service: VectorService = get_vector_service( - index_name=f"{payload.index_name}summary", + index_name=f"{payload.index_name}-summary", credentials=payload.vector_database, encoder=encoder, ) diff --git a/service/vector_database.py b/service/vector_database.py index f0febb22..678552cb 100644 --- a/service/vector_database.py +++ b/service/vector_database.py @@ -11,6 +11,7 @@ from encoders.base import BaseEncoder from encoders.openai import OpenAIEncoder +from models.document import BaseDocumentChunk from models.vector_database import VectorDatabase from utils.logger import logger @@ -29,12 +30,12 @@ async def upsert(): pass @abstractmethod - async def query(): + async def query(self, input: str, top_k: int = 25) -> List[BaseDocumentChunk]: pass - @abstractmethod - async def convert_to_rerank_format(): - pass + # @abstractmethod + # async def convert_to_rerank_format(): + # pass @abstractmethod async def delete(self, file_url: str): @@ -43,25 +44,46 @@ async def delete(self, file_url: str): async def _generate_vectors(self, input: str) -> List[List[float]]: return self.encoder([input]) - async def rerank(self, query: str, documents: list, top_n: int = 4): + async def rerank( + self, query: str, documents: list[BaseDocumentChunk], top_n: int = 5 + ) -> list[BaseDocumentChunk]: from cohere import Client api_key = config("COHERE_API_KEY") if not api_key: raise ValueError("API key for Cohere is not present.") cohere_client = Client(api_key=api_key) - docs = [doc["content"] for doc in tqdm(documents, desc="Reranking")] - re_ranked = cohere_client.rerank( - model="rerank-multilingual-v2.0", - query=query, - documents=docs, - top_n=top_n, - ).results - results = [] - for r in tqdm(re_ranked, desc="Processing reranked results"): - doc = documents[r.index] - results.append(doc) - return results + + # Avoid duplications, TODO: fix ingestion for duplications + # Deduplicate documents based on content while preserving order + seen = set() + deduplicated_documents = [ + doc + for doc in documents + if doc.content not in seen and not seen.add(doc.content) + ] + docs_text = list( + doc.content + for doc in tqdm( + deduplicated_documents, + desc=f"Reranking {len(deduplicated_documents)} documents", + ) + ) + try: + re_ranked = cohere_client.rerank( + model="rerank-multilingual-v2.0", + query=query, + documents=docs_text, + top_n=top_n, + ).results + results = [] + for r in tqdm(re_ranked, desc="Processing reranked results "): + doc = deduplicated_documents[r.index] + results.append(doc) + return results + except Exception as e: + logger.error(f"Error while reranking: {e}") + raise Exception(f"Error while reranking: {e}") class PineconeVectorService(VectorService): @@ -84,17 +106,6 @@ def __init__( ) self.index = pinecone.Index(name=self.index_name) - async def convert_to_rerank_format(self, chunks: List): - docs = [ - { - "content": chunk.get("metadata")["content"], - "page_label": chunk.get("metadata")["page_label"], - "file_url": chunk.get("metadata")["file_url"], - } - for chunk in chunks - ] - return docs - async def upsert(self, embeddings: List[tuple[str, list, dict[str, Any]]]): if self.index is None: raise ValueError(f"Pinecone index {self.index_name} is not initialized.") @@ -104,25 +115,45 @@ async def upsert(self, embeddings: List[tuple[str, list, dict[str, Any]]]): pass self.index.upsert(vectors=embeddings) - async def query(self, input: str, top_k: int = 4, include_metadata: bool = True): + async def query( + self, input: str, top_k: int = 25, include_metadata: bool = True + ) -> list[BaseDocumentChunk]: if self.index is None: raise ValueError(f"Pinecone index {self.index_name} is not initialized.") - vectors = await self._generate_vectors(input=input) + query_vectors = await self._generate_vectors(input=input) results = self.index.query( - vector=vectors[0], + vector=query_vectors[0], top_k=top_k, include_metadata=include_metadata, ) - return results["matches"] + document_chunks = [] + for match in results["matches"]: + document_chunk = BaseDocumentChunk( + id=match["id"], + document_id=match["metadata"].get("document_id", ""), + content=match["metadata"]["content"], + doc_url=match["metadata"].get("source", ""), + page_number=str( + match["metadata"].get("page_number", "") + ), # TODO: is this correct? + metadata={ + key: value + for key, value in match["metadata"].items() + if key not in ["content", "file_url"] + }, + ) + document_chunks.append(document_chunk) + return document_chunks - async def delete(self, file_url: str) -> None: + async def delete(self, file_url: str) -> dict[str, int]: if self.index is None: raise ValueError(f"Pinecone index {self.index_name} is not initialized.") query_response = self.index.query( vector=[0.0] * self.dimension, top_k=1000, - filter={"file_url": {"$eq": file_url}}, + include_metadata=True, + filter={"source": {"$eq": file_url}}, ) chunks = query_response.matches logger.info( @@ -131,6 +162,7 @@ async def delete(self, file_url: str) -> None: if chunks: self.index.delete(ids=[chunk["id"] for chunk in chunks]) + return {"num_of_deleted_chunks": len(chunks)} class QdrantService(VectorService): @@ -160,6 +192,7 @@ def __init__( ), ) + # TODO: remove this async def convert_to_rerank_format(self, chunks: List[rest.PointStruct]): docs = [ { @@ -200,6 +233,7 @@ async def query(self, input: str, top_k: int) -> List: # ), with_payload=True, ) + # TODO: return list[BaseDocumentChunk] return search_result async def delete(self, file_url: str) -> None: @@ -243,6 +277,7 @@ def __init__( if not self.client.schema.exists(self.index_name): self.client.schema.create_class(schema) + # TODO: remove this async def convert_to_rerank_format(self, chunks: List) -> List: docs = [ { @@ -278,6 +313,7 @@ async def query(self, input: str, top_k: int = 4) -> List: .with_limit(top_k) .do() ) + # TODO: return list[BaseDocumentChunk] return result["data"]["Get"][self.index_name.capitalize()] async def delete(self, file_url: str) -> None: @@ -308,6 +344,7 @@ def __init__( ) self.collection = self.client.collection(collection_name=self.index_name) + # TODO: remove this async def convert_to_rerank_format(self, chunks: List) -> List: docs = [ { @@ -337,6 +374,7 @@ async def query(self, input: str, top_k: int = 4) -> List: results = self.collection.vector_find( vector=vectors, limit=top_k, fields={"text", "page_label", "file_url"} ) + # TODO: return list[BaseDocumentChunk] return results async def delete(self, file_url: str) -> None: diff --git a/utils/summarise.py b/utils/summarise.py index bddcc38a..4191b694 100644 --- a/utils/summarise.py +++ b/utils/summarise.py @@ -1,26 +1,26 @@ from decouple import config from openai import AsyncOpenAI -from models.document import Document +from models.document import BaseDocumentChunk client = AsyncOpenAI( api_key=config("OPENAI_API_KEY"), ) -def _generate_content(document: Document) -> str: +def _generate_content(*, document: BaseDocumentChunk) -> str: return f"""Make an in depth summary the block of text below. Text: ------------------------------------------ -{document.text} +{document.content} ------------------------------------------ Your summary:""" -async def completion(document: Document): - content = _generate_content(document) +async def completion(*, document: BaseDocumentChunk) -> str: + content = _generate_content(document=document) completion = await client.chat.completions.create( messages=[ { @@ -31,4 +31,4 @@ async def completion(document: Document): model="gpt-3.5-turbo-16k", ) - return completion.choices[0].message.content + return completion.choices[0].message.content or ""