commit - d0d125e7ac0a5cc3e9924691160301dae740bcc0
commit + 8d2a532b2c861b3e326e1855adb1cb2a8a9dcec4
blob - 6ffc0ca8fa7694b4627f0701462b98f3b70b378d
blob + a6929f9db205d5b1d1339e82fb1f8fd7e7489a8b
--- rag_backend.py
+++ rag_backend.py
import argparse
import logging
+from pathlib import Path
from chromadb import Settings
from langchain_chroma import Chroma
from langchain.prompts import ChatPromptTemplate
+from langchain_community.document_loaders import TextLoader, PyPDFDirectoryLoader
+from langchain_core.documents import Document
from langchain_ollama import OllamaLLM
+from langchain_text_splitters import RecursiveCharacterTextSplitter
from configuration import embeddings, DB_PATH, PROMPT_TEMPLATE
sources = [doc.metadata.get("id", None) for doc, _score in context_docs]
return [response_text, sources]
+ @staticmethod
+ def load_pdf_documents(path: str) -> list[Document]:
+ return PyPDFDirectoryLoader(Path(path)).load()
+ @staticmethod
+ def load_text_documents(path: str) -> list[Document]:
+ items = Path(path).glob("**/[!.]*.txt")
+ documents: list[Document] = []
+ for item in items:
+ documents += TextLoader(item).load()
+ return documents
+
+ @staticmethod
+ def split_documents(documents: list[Document]) -> list[Document]:
+ text_splitter = RecursiveCharacterTextSplitter(
+ chunk_size=800,
+ chunk_overlap=80,
+ length_function=len,
+ is_separator_regex=False,
+ )
+ return text_splitter.split_documents(documents)
+
+ def get_docs(self, ids: set[str]) -> set[str]:
+ return set([id.split(":")[0] for id in ids])
+
+ def get_ids(self) -> set[str]:
+ existing_items = self.db.get(include=[])
+ existing_ids = set(existing_items["ids"])
+ logging.info(f"have {len(self.get_docs(existing_ids))} documents with {len(existing_ids)} chunks")
+ return existing_ids
+
+ def add_to_index(self, chunks: list[Document]) -> None:
+ # generate chunk ids from document list
+ chunks_with_ids = self.calculate_chunk_ids(chunks)
+
+ # check for updated or new chunks
+ existing_ids = self.get_ids()
+ new_chunks = []
+ for chunk in chunks_with_ids:
+ if chunk.metadata["id"] not in existing_ids:
+ new_chunks.append(chunk)
+
+ # add or update chunks
+ if len(new_chunks):
+ # batch size (max is somewhat between 5000-5600)
+ batch_size = 5000
+ total_chunks = len(new_chunks)
+ for start_idx in range(0, total_chunks, batch_size):
+ end_idx = min(start_idx + batch_size, total_chunks)
+ batch_chunks = new_chunks[start_idx:end_idx]
+ batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks]
+ self.db.add_documents(batch_chunks, ids=batch_chunk_ids)
+ logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added")
+ else:
+ logging.warning("no new or updated chunks found")
+
+ @staticmethod
+ def calculate_chunk_ids(chunks: list[Document]) -> list[Document]:
+ # This will create IDs like "source.ext:6:2"
+ # Page Source : Page Number : Chunk Index
+
+ last_page_id = None
+ current_chunk_index = 0
+
+ for chunk in chunks:
+ source = chunk.metadata.get("source")
+ page = chunk.metadata.get("page")
+ current_page_id = f"{source}:{page}"
+
+ # If the page ID is the same as the last one, increment the index.
+ if current_page_id == last_page_id:
+ current_chunk_index += 1
+ else:
+ current_chunk_index = 0
+
+ # Calculate the chunk ID.
+ chunk_id = f"{current_page_id}:{current_chunk_index}"
+ last_page_id = current_page_id
+
+ # Add it to the page meta-data.
+ chunk.metadata["id"] = chunk_id
+
+ return chunks
+
+ def reset(self) -> None:
+ self.db.reset_collection()
+
+
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
blob - 28da3f54b65e1e8b1cb46f9c296e7ae2e2e40e0b
blob + 49748affccd22cfa540322215a430a4fd1ad2f6d
--- rag_indexer.py
+++ rag_indexer.py
import argparse
import logging
import sys
-from pathlib import Path
-from chromadb import Settings
-from langchain.schema.document import Document
-from langchain_chroma import Chroma
-from langchain_community.document_loaders.pdf import PyPDFDirectoryLoader
-from langchain_community.document_loaders.text import TextLoader
-from langchain_text_splitters import RecursiveCharacterTextSplitter
+from configuration import DB_PATH
+from rag_backend import RagBackend
-from configuration import embeddings, DB_PATH
-
-
-class RagIndexer:
- def __init__(self, db_path: str = None):
- self._db_path = Path(db_path if db_path else DB_PATH)
- # Load the existing database.
- self._db = Chroma(
- persist_directory=str(self._db_path),
- embedding_function=embeddings(),
- client_settings=Settings(anonymized_telemetry=False)
- )
-
- def load_pdf_documents(self, path: str) -> list[Document]:
- return PyPDFDirectoryLoader(Path(path)).load()
-
- def load_text_documents(self, path: str) -> list[Document]:
- items = Path(path).glob("**/[!.]*.txt")
- documents: list[Document] = []
- for item in items:
- documents += TextLoader(item).load()
- return documents
-
- def split_documents(self, documents: list[Document]) -> list[Document]:
- text_splitter = RecursiveCharacterTextSplitter(
- chunk_size=800,
- chunk_overlap=80,
- length_function=len,
- is_separator_regex=False,
- )
- return text_splitter.split_documents(documents)
-
- def get_ids(self) -> set[str]:
- existing_items = self._db.get(include=[])
- existing_ids = set(existing_items["ids"])
- existing_docs = set([id.split(":")[0] for id in existing_ids])
- logging.info(f"found {len(existing_docs)} documents with {len(existing_ids)} chunks")
- return existing_ids
-
- def add_to_index(self, chunks: list[Document]) -> None:
- # generate chunk ids from document list
- chunks_with_ids = self.calculate_chunk_ids(chunks)
-
- # check for updated or new chunks
- existing_ids = self.get_ids()
- new_chunks = []
- for chunk in chunks_with_ids:
- if chunk.metadata["id"] not in existing_ids:
- new_chunks.append(chunk)
-
- # add or update chunks
- if len(new_chunks):
- # batch size (max is somewhat between 5000-5600)
- batch_size = 5000
- total_chunks = len(new_chunks)
- for start_idx in range(0, total_chunks, batch_size):
- end_idx = min(start_idx + batch_size, total_chunks)
- batch_chunks = new_chunks[start_idx:end_idx]
- batch_chunk_ids = [chunk.metadata["id"] for chunk in batch_chunks]
- self._db.add_documents(batch_chunks, ids=batch_chunk_ids)
- logging.info(f"new chunk batch {start_idx + 1} to {end_idx} added")
- else:
- logging.warning("no new or updated chunks found")
-
- def calculate_chunk_ids(self, chunks: list[Document]) -> list[Document]:
- # This will create IDs like "source.ext:6:2"
- # Page Source : Page Number : Chunk Index
-
- last_page_id = None
- current_chunk_index = 0
-
- for chunk in chunks:
- source = chunk.metadata.get("source")
- page = chunk.metadata.get("page")
- current_page_id = f"{source}:{page}"
-
- # If the page ID is the same as the last one, increment the index.
- if current_page_id == last_page_id:
- current_chunk_index += 1
- else:
- current_chunk_index = 0
-
- # Calculate the chunk ID.
- chunk_id = f"{current_page_id}:{current_chunk_index}"
- last_page_id = current_page_id
-
- # Add it to the page meta-data.
- chunk.metadata["id"] = chunk_id
-
- return chunks
-
- def reset(self) -> None:
- self._db.reset_collection()
-
-
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
parser.print_help()
sys.exit(1)
- indexer = RagIndexer(args.db)
+ indexer = RagBackend(args.db)
if args.reset:
logging.info("deleting RAG indexer collection")
indexer.reset()
for source in args.sources:
logging.info(f"searching {source}")
- text_docs = indexer.load_text_documents(source)
pdf_docs = indexer.load_pdf_documents(source)
indexer.add_to_index(pdf_docs)
+ text_docs = indexer.load_text_documents(source)
indexer.add_to_index(text_docs)
logging.info(f"added {len(text_docs)} text documents to index")
blob - 54f332f4b361ede4044c6272cdf3d3da1c1fa3d1
blob + af114799cf2fb32ccad650221606b6a761d48f64
--- rag_interface.py
+++ rag_interface.py
"jsonrpc": "2.0",
"result": {
'text': result,
- 'ref': refs},
+ 'ref': refs
+ },
"id": data.get("id")
})
+ elif method == "sources":
+ return jsonify({
+ "jsonrpc": "2.0",
+ "result": {
+ "sources": rag.get_docs(rag.get_ids())
+ },
+ "id": data.get("id")
+ })
+
# Add more tool handlers as needed
return jsonify({"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": data.get("id")})