commit 8d2a532b2c861b3e326e1855adb1cb2a8a9dcec4 from: Matthias L. Jugel date: Fri Jul 11 18:45:08 2025 UTC modify architecture, remove duplicate chroma db settings commit - d0d125e7ac0a5cc3e9924691160301dae740bcc0 commit + 8d2a532b2c861b3e326e1855adb1cb2a8a9dcec4 blob - 6ffc0ca8fa7694b4627f0701462b98f3b70b378d blob + a6929f9db205d5b1d1339e82fb1f8fd7e7489a8b --- rag_backend.py +++ rag_backend.py @@ -1,10 +1,14 @@ import argparse import logging +from pathlib import Path from chromadb import Settings from langchain_chroma import Chroma from langchain.prompts import ChatPromptTemplate +from langchain_community.document_loaders import TextLoader, PyPDFDirectoryLoader +from langchain_core.documents import Document from langchain_ollama import OllamaLLM +from langchain_text_splitters import RecursiveCharacterTextSplitter from configuration import embeddings, DB_PATH, PROMPT_TEMPLATE @@ -31,7 +35,94 @@ class RagBackend: sources = [doc.metadata.get("id", None) for doc, _score in context_docs] return [response_text, sources] + @staticmethod + def load_pdf_documents(path: str) -> list[Document]: + return PyPDFDirectoryLoader(Path(path)).load() + @staticmethod + def load_text_documents(path: str) -> list[Document]: + items = Path(path).glob("**/[!.]*.txt") + documents: list[Document] = [] + for item in items: + documents += TextLoader(item).load() + return documents + + @staticmethod + def split_documents(documents: list[Document]) -> list[Document]: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=800, + chunk_overlap=80, + length_function=len, + is_separator_regex=False, + ) + return text_splitter.split_documents(documents) + + def get_docs(self, ids: set[str]) -> set[str]: + return set([id.split(":")[0] for id in ids]) + + def get_ids(self) -> set[str]: + existing_items = self.db.get(include=[]) + existing_ids = set(existing_items["ids"]) + logging.info(f"have {len(self.get_docs(existing_ids))} documents with {len(existing_ids)} chunks") + return existing_ids + + def add_to_index(self, chunks: list[Document]) -> None: + # generate chunk ids from document list + chunks_with_ids = self.calculate_chunk_ids(chunks) + + # check for updated or new chunks + existing_ids = self.get_ids() + new_chunks = [] + for chunk in chunks_with_ids: + if chunk.metadata["id"] not in existing_ids: + new_chunks.append(chunk) + + # add or update chunks + if len(new_chunks): + # batch size (max is somewhat between 5000-5600) + batch_size = 5000 + total_chunks = len(new_chunks) + for start_idx in range(0, total_chunks, batch_size): + end_idx = min(start_idx + batch_size, total_chunks) + batch_chunks = new_chunks[start_idx:end_idx] + batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks] + self.db.add_documents(batch_chunks, ids=batch_chunk_ids) + logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added") + else: + logging.warning("no new or updated chunks found") + + @staticmethod + def calculate_chunk_ids(chunks: list[Document]) -> list[Document]: + # This will create IDs like "source.ext:6:2" + # Page Source : Page Number : Chunk Index + + last_page_id = None + current_chunk_index = 0 + + for chunk in chunks: + source = chunk.metadata.get("source") + page = chunk.metadata.get("page") + current_page_id = f"{source}:{page}" + + # If the page ID is the same as the last one, increment the index. + if current_page_id == last_page_id: + current_chunk_index += 1 + else: + current_chunk_index = 0 + + # Calculate the chunk ID. + chunk_id = f"{current_page_id}:{current_chunk_index}" + last_page_id = current_page_id + + # Add it to the page meta-data. + chunk.metadata["id"] = chunk_id + + return chunks + + def reset(self) -> None: + self.db.reset_collection() + + if __name__ == "__main__": logging.basicConfig( level=logging.INFO, blob - 28da3f54b65e1e8b1cb46f9c296e7ae2e2e40e0b blob + 49748affccd22cfa540322215a430a4fd1ad2f6d --- rag_indexer.py +++ rag_indexer.py @@ -1,110 +1,10 @@ import argparse import logging import sys -from pathlib import Path -from chromadb import Settings -from langchain.schema.document import Document -from langchain_chroma import Chroma -from langchain_community.document_loaders.pdf import PyPDFDirectoryLoader -from langchain_community.document_loaders.text import TextLoader -from langchain_text_splitters import RecursiveCharacterTextSplitter +from configuration import DB_PATH +from rag_backend import RagBackend -from configuration import embeddings, DB_PATH - - -class RagIndexer: - def __init__(self, db_path: str = None): - self._db_path = Path(db_path if db_path else DB_PATH) - # Load the existing database. - self._db = Chroma( - persist_directory=str(self._db_path), - embedding_function=embeddings(), - client_settings=Settings(anonymized_telemetry=False) - ) - - def load_pdf_documents(self, path: str) -> list[Document]: - return PyPDFDirectoryLoader(Path(path)).load() - - def load_text_documents(self, path: str) -> list[Document]: - items = Path(path).glob("**/[!.]*.txt") - documents: list[Document] = [] - for item in items: - documents += TextLoader(item).load() - return documents - - def split_documents(self, documents: list[Document]) -> list[Document]: - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=800, - chunk_overlap=80, - length_function=len, - is_separator_regex=False, - ) - return text_splitter.split_documents(documents) - - def get_ids(self) -> set[str]: - existing_items = self._db.get(include=[]) - existing_ids = set(existing_items["ids"]) - existing_docs = set([id.split(":")[0] for id in existing_ids]) - logging.info(f"found {len(existing_docs)} documents with {len(existing_ids)} chunks") - return existing_ids - - def add_to_index(self, chunks: list[Document]) -> None: - # generate chunk ids from document list - chunks_with_ids = self.calculate_chunk_ids(chunks) - - # check for updated or new chunks - existing_ids = self.get_ids() - new_chunks = [] - for chunk in chunks_with_ids: - if chunk.metadata["id"] not in existing_ids: - new_chunks.append(chunk) - - # add or update chunks - if len(new_chunks): - # batch size (max is somewhat between 5000-5600) - batch_size = 5000 - total_chunks = len(new_chunks) - for start_idx in range(0, total_chunks, batch_size): - end_idx = min(start_idx + batch_size, total_chunks) - batch_chunks = new_chunks[start_idx:end_idx] - batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks] - self._db.add_documents(batch_chunks, ids=batch_chunk_ids) - logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added") - else: - logging.warning("no new or updated chunks found") - - def calculate_chunk_ids(self, chunks: list[Document]) -> list[Document]: - # This will create IDs like "source.ext:6:2" - # Page Source : Page Number : Chunk Index - - last_page_id = None - current_chunk_index = 0 - - for chunk in chunks: - source = chunk.metadata.get("source") - page = chunk.metadata.get("page") - current_page_id = f"{source}:{page}" - - # If the page ID is the same as the last one, increment the index. - if current_page_id == last_page_id: - current_chunk_index += 1 - else: - current_chunk_index = 0 - - # Calculate the chunk ID. - chunk_id = f"{current_page_id}:{current_chunk_index}" - last_page_id = current_page_id - - # Add it to the page meta-data. - chunk.metadata["id"] = chunk_id - - return chunks - - def reset(self) -> None: - self._db.reset_collection() - - if __name__ == "__main__": logging.basicConfig( level=logging.INFO, @@ -122,7 +22,7 @@ if __name__ == "__main__": parser.print_help() sys.exit(1) - indexer = RagIndexer(args.db) + indexer = RagBackend(args.db) if args.reset: logging.info("deleting RAG indexer collection") indexer.reset() @@ -130,8 +30,8 @@ if __name__ == "__main__": for source in args.sources: logging.info(f"searching {source}") - text_docs = indexer.load_text_documents(source) pdf_docs = indexer.load_pdf_documents(source) indexer.add_to_index(pdf_docs) + text_docs = indexer.load_text_documents(source) indexer.add_to_index(text_docs) logging.info(f"added {len(text_docs)} text documents to index") blob - 54f332f4b361ede4044c6272cdf3d3da1c1fa3d1 blob + af114799cf2fb32ccad650221606b6a761d48f64 --- rag_interface.py +++ rag_interface.py @@ -31,9 +31,19 @@ def handle_mcp(): "jsonrpc": "2.0", "result": { 'text': result, - 'ref': refs}, + 'ref': refs + }, "id": data.get("id") }) + elif method == "sources": + return jsonify({ + "jsonrpc": "2.0", + "result": { + "sources": rag.get_docs(rag.get_ids()) + }, + "id": data.get("id") + }) + # Add more tool handlers as needed return jsonify({"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": data.get("id")})