Commit Diff


commit - d0d125e7ac0a5cc3e9924691160301dae740bcc0
commit + 8d2a532b2c861b3e326e1855adb1cb2a8a9dcec4
blob - 6ffc0ca8fa7694b4627f0701462b98f3b70b378d
blob + a6929f9db205d5b1d1339e82fb1f8fd7e7489a8b
--- rag_backend.py
+++ rag_backend.py
@@ -1,10 +1,14 @@
 import argparse
 import logging
+from pathlib import Path
 
 from chromadb import Settings
 from langchain_chroma import Chroma
 from langchain.prompts import ChatPromptTemplate
+from langchain_community.document_loaders import TextLoader, PyPDFDirectoryLoader
+from langchain_core.documents import Document
 from langchain_ollama import OllamaLLM
+from langchain_text_splitters import RecursiveCharacterTextSplitter
 
 from configuration import embeddings, DB_PATH, PROMPT_TEMPLATE
 
@@ -31,7 +35,94 @@ class RagBackend:
         sources = [doc.metadata.get("id", None) for doc, _score in context_docs]
         return [response_text, sources]
 
+    @staticmethod
+    def load_pdf_documents(path: str) -> list[Document]:
+        return PyPDFDirectoryLoader(Path(path)).load()
 
+    @staticmethod
+    def load_text_documents(path: str) -> list[Document]:
+        items = Path(path).glob("**/[!.]*.txt")
+        documents: list[Document] = []
+        for item in items:
+            documents += TextLoader(item).load()
+        return documents
+
+    @staticmethod
+    def split_documents(documents: list[Document]) -> list[Document]:
+        text_splitter = RecursiveCharacterTextSplitter(
+            chunk_size=800,
+            chunk_overlap=80,
+            length_function=len,
+            is_separator_regex=False,
+        )
+        return text_splitter.split_documents(documents)
+
+    def get_docs(self, ids: set[str]) -> set[str]:
+        return set([id.split(":")[0] for id in ids])
+
+    def get_ids(self) -> set[str]:
+        existing_items = self.db.get(include=[])
+        existing_ids = set(existing_items["ids"])
+        logging.info(f"have {len(self.get_docs(existing_ids))} documents with {len(existing_ids)} chunks")
+        return existing_ids
+
+    def add_to_index(self, chunks: list[Document]) -> None:
+        # generate chunk ids from document list
+        chunks_with_ids = self.calculate_chunk_ids(chunks)
+
+        # check for updated or new chunks
+        existing_ids = self.get_ids()
+        new_chunks = []
+        for chunk in chunks_with_ids:
+            if chunk.metadata["id"] not in existing_ids:
+                new_chunks.append(chunk)
+
+        # add or update chunks
+        if len(new_chunks):
+            # batch size (max is somewhat between 5000-5600)
+            batch_size = 5000
+            total_chunks = len(new_chunks)
+            for start_idx in range(0, total_chunks, batch_size):
+                end_idx = min(start_idx + batch_size, total_chunks)
+                batch_chunks = new_chunks[start_idx:end_idx]
+                batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks]
+                self.db.add_documents(batch_chunks, ids=batch_chunk_ids)
+                logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added")
+        else:
+            logging.warning("no new or updated chunks found")
+
+    @staticmethod
+    def calculate_chunk_ids(chunks: list[Document]) -> list[Document]:
+        # This will create IDs like "source.ext:6:2"
+        # Page Source : Page Number : Chunk Index
+
+        last_page_id = None
+        current_chunk_index = 0
+
+        for chunk in chunks:
+            source = chunk.metadata.get("source")
+            page = chunk.metadata.get("page")
+            current_page_id = f"{source}:{page}"
+
+            # If the page ID is the same as the last one, increment the index.
+            if current_page_id == last_page_id:
+                current_chunk_index += 1
+            else:
+                current_chunk_index = 0
+
+            # Calculate the chunk ID.
+            chunk_id = f"{current_page_id}:{current_chunk_index}"
+            last_page_id = current_page_id
+
+            # Add it to the page meta-data.
+            chunk.metadata["id"] = chunk_id
+
+        return chunks
+
+    def reset(self) -> None:
+        self.db.reset_collection()
+
+
 if __name__ == "__main__":
     logging.basicConfig(
         level=logging.INFO,
blob - 28da3f54b65e1e8b1cb46f9c296e7ae2e2e40e0b
blob + 49748affccd22cfa540322215a430a4fd1ad2f6d
--- rag_indexer.py
+++ rag_indexer.py
@@ -1,110 +1,10 @@
 import argparse
 import logging
 import sys
-from pathlib import Path
 
-from chromadb import Settings
-from langchain.schema.document import Document
-from langchain_chroma import Chroma
-from langchain_community.document_loaders.pdf import PyPDFDirectoryLoader
-from langchain_community.document_loaders.text import TextLoader
-from langchain_text_splitters import RecursiveCharacterTextSplitter
+from configuration import DB_PATH
+from rag_backend import RagBackend
 
-from configuration import embeddings, DB_PATH
-
-
-class RagIndexer:
-    def __init__(self, db_path: str = None):
-        self._db_path = Path(db_path if db_path else DB_PATH)
-        # Load the existing database.
-        self._db = Chroma(
-            persist_directory=str(self._db_path),
-            embedding_function=embeddings(),
-            client_settings=Settings(anonymized_telemetry=False)
-        )
-
-    def load_pdf_documents(self, path: str) -> list[Document]:
-        return PyPDFDirectoryLoader(Path(path)).load()
-
-    def load_text_documents(self, path: str) -> list[Document]:
-        items = Path(path).glob("**/[!.]*.txt")
-        documents: list[Document] = []
-        for item in items:
-            documents += TextLoader(item).load()
-        return documents
-
-    def split_documents(self, documents: list[Document]) -> list[Document]:
-        text_splitter = RecursiveCharacterTextSplitter(
-            chunk_size=800,
-            chunk_overlap=80,
-            length_function=len,
-            is_separator_regex=False,
-        )
-        return text_splitter.split_documents(documents)
-
-    def get_ids(self) -> set[str]:
-        existing_items = self._db.get(include=[])
-        existing_ids = set(existing_items["ids"])
-        existing_docs = set([id.split(":")[0] for id in existing_ids])
-        logging.info(f"found {len(existing_docs)} documents with {len(existing_ids)} chunks")
-        return existing_ids
-
-    def add_to_index(self, chunks: list[Document]) -> None:
-        # generate chunk ids from document list
-        chunks_with_ids = self.calculate_chunk_ids(chunks)
-
-        # check for updated or new chunks
-        existing_ids = self.get_ids()
-        new_chunks = []
-        for chunk in chunks_with_ids:
-            if chunk.metadata["id"] not in existing_ids:
-                new_chunks.append(chunk)
-
-        # add or update chunks
-        if len(new_chunks):
-            # batch size (max is somewhat between 5000-5600)
-            batch_size = 5000
-            total_chunks = len(new_chunks)
-            for start_idx in range(0, total_chunks, batch_size):
-                end_idx = min(start_idx + batch_size, total_chunks)
-                batch_chunks = new_chunks[start_idx:end_idx]
-                batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks]
-                self._db.add_documents(batch_chunks, ids=batch_chunk_ids)
-                logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added")
-        else:
-            logging.warning("no new or updated chunks found")
-
-    def calculate_chunk_ids(self, chunks: list[Document]) -> list[Document]:
-        # This will create IDs like "source.ext:6:2"
-        # Page Source : Page Number : Chunk Index
-
-        last_page_id = None
-        current_chunk_index = 0
-
-        for chunk in chunks:
-            source = chunk.metadata.get("source")
-            page = chunk.metadata.get("page")
-            current_page_id = f"{source}:{page}"
-
-            # If the page ID is the same as the last one, increment the index.
-            if current_page_id == last_page_id:
-                current_chunk_index += 1
-            else:
-                current_chunk_index = 0
-
-            # Calculate the chunk ID.
-            chunk_id = f"{current_page_id}:{current_chunk_index}"
-            last_page_id = current_page_id
-
-            # Add it to the page meta-data.
-            chunk.metadata["id"] = chunk_id
-
-        return chunks
-
-    def reset(self) -> None:
-        self._db.reset_collection()
-
-
 if __name__ == "__main__":
     logging.basicConfig(
         level=logging.INFO,
@@ -122,7 +22,7 @@ if __name__ == "__main__":
         parser.print_help()
         sys.exit(1)
 
-    indexer = RagIndexer(args.db)
+    indexer = RagBackend(args.db)
     if args.reset:
         logging.info("deleting RAG indexer collection")
         indexer.reset()
@@ -130,8 +30,8 @@ if __name__ == "__main__":
 
     for source in args.sources:
         logging.info(f"searching {source}")
-        text_docs = indexer.load_text_documents(source)
         pdf_docs = indexer.load_pdf_documents(source)
         indexer.add_to_index(pdf_docs)
+        text_docs = indexer.load_text_documents(source)
         indexer.add_to_index(text_docs)
         logging.info(f"added {len(text_docs)} text documents to index")
blob - 54f332f4b361ede4044c6272cdf3d3da1c1fa3d1
blob + af114799cf2fb32ccad650221606b6a761d48f64
--- rag_interface.py
+++ rag_interface.py
@@ -31,9 +31,19 @@ def handle_mcp():
             "jsonrpc": "2.0",
             "result": {
                 'text': result,
-                'ref': refs},
+                'ref': refs
+            },
             "id": data.get("id")
         })
+    elif method == "sources":
+        return jsonify({
+            "jsonrpc": "2.0",
+            "result": {
+                "sources": rag.get_docs(rag.get_ids())
+            },
+            "id": data.get("id")
+        })
+
     # Add more tool handlers as needed
     return jsonify({"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": data.get("id")})