RAG Integration¶
Connecting RAG pipelines to applications, chat interfaces, and workflows.
Chat Interface Integration¶
Streamlit Chat App¶
# app.py
import streamlit as st
from rag_pipeline import RAGChain, VectorStore
st.title("Document Q&A")
# Initialize RAG
@st.cache_resource
def get_rag():
vectorstore = VectorStore(
persist_directory="./data/vectordb",
collection_name="documents",
)
return RAGChain(vectorstore=vectorstore)
rag = get_rag()
# Chat interface
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if prompt := st.chat_input("Ask about your documents"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Searching..."):
result = rag.query_with_sources(prompt)
st.markdown(result["answer"])
with st.expander("Sources"):
for source in result["sources"]:
st.markdown(f"- {source['metadata'].get('source', 'Unknown')}")
st.session_state.messages.append({
"role": "assistant",
"content": result["answer"]
})
Gradio Interface¶
import gradio as gr
from rag_pipeline import RAGChain, VectorStore
vectorstore = VectorStore(
persist_directory="./data/vectordb",
collection_name="documents",
)
rag = RAGChain(vectorstore=vectorstore)
def query_rag(question: str, history: list) -> str:
result = rag.query_with_sources(question)
answer = result["answer"]
sources = "\n".join([
f"- {s['metadata'].get('source', 'Unknown')}"
for s in result["sources"]
])
return f"{answer}\n\n**Sources:**\n{sources}"
demo = gr.ChatInterface(
fn=query_rag,
title="Document Q&A",
description="Ask questions about your documents",
examples=["What are the main topics?", "Summarize the key points"],
)
demo.launch()
OpenAI-Compatible API¶
Wrap RAG as an OpenAI-compatible endpoint:
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
import uuid
import time
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: list[Message]
temperature: Optional[float] = 0.7
stream: Optional[bool] = False
class ChatResponse(BaseModel):
id: str
object: str
created: int
model: str
choices: list[dict]
usage: dict
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
# Extract the last user message
user_message = next(
(m.content for m in reversed(request.messages) if m.role == "user"),
""
)
# Query RAG
result = rag.query_with_sources(user_message)
return ChatResponse(
id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
object="chat.completion",
created=int(time.time()),
model=request.model,
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": result["answer"]
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": len(user_message.split()),
"completion_tokens": len(result["answer"].split()),
"total_tokens": len(user_message.split()) + len(result["answer"].split())
}
)
MCP Server Integration¶
Create an MCP server for RAG:
#!/usr/bin/env python3
"""RAG MCP Server"""
import asyncio
from mcp.server import Server
from mcp.types import Tool, TextContent
from rag_pipeline import RAGChain, VectorStore
server = Server("rag-server")
# Initialize RAG
vectorstore = VectorStore(
persist_directory="./data/vectordb",
collection_name="documents",
)
rag = RAGChain(vectorstore=vectorstore)
@server.list_tools()
async def list_tools():
return [
Tool(
name="query_documents",
description="Search and answer questions from indexed documents",
inputSchema={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to answer"
}
},
"required": ["question"]
}
),
Tool(
name="search_documents",
description="Search for relevant document chunks",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"k": {
"type": "integer",
"description": "Number of results",
"default": 4
}
},
"required": ["query"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_documents":
result = rag.query_with_sources(arguments["question"])
return [TextContent(
type="text",
text=f"Answer: {result['answer']}\n\nSources: {[s['metadata'].get('source') for s in result['sources']]}"
)]
elif name == "search_documents":
docs = vectorstore.search(
arguments["query"],
k=arguments.get("k", 4)
)
results = "\n\n---\n\n".join([
f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.page_content[:500]}..."
for doc in docs
])
return [TextContent(type="text", text=results)]
async def main():
from mcp.server.stdio import stdio_server
async with stdio_server() as (read, write):
await server.run(read, write)
if __name__ == "__main__":
asyncio.run(main())
Webhook Integration¶
Trigger RAG queries from external events:
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
WEBHOOK_SECRET = "your-secret-key"
def verify_signature(payload: bytes, signature: str) -> bool:
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
@app.post("/webhook/slack")
async def slack_webhook(request: Request):
"""Handle Slack slash commands."""
form = await request.form()
question = form.get("text", "")
if not question:
return {"text": "Please provide a question"}
result = rag.query_with_sources(question)
return {
"response_type": "in_channel",
"text": result["answer"],
"attachments": [{
"text": f"Sources: {', '.join([s['metadata'].get('source', 'Unknown') for s in result['sources']])}"
}]
}
@app.post("/webhook/github")
async def github_webhook(request: Request):
"""Handle GitHub issue comments."""
signature = request.headers.get("X-Hub-Signature-256", "")
payload = await request.body()
if not verify_signature(payload, signature):
raise HTTPException(status_code=401)
data = await request.json()
if data.get("action") == "created" and "comment" in data:
comment = data["comment"]["body"]
if comment.startswith("/ask"):
question = comment[4:].strip()
result = rag.query(question)
# Post response (implement GitHub API call)
# post_github_comment(data["issue"]["number"], result)
return {"status": "ok"}
Batch Processing¶
Process multiple queries efficiently:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class BatchRAG:
def __init__(self, rag_chain: RAGChain, max_workers: int = 4):
self.rag = rag_chain
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def query_batch(self, questions: list[str]) -> list[dict]:
"""Process multiple questions concurrently."""
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
self.executor,
self.rag.query_with_sources,
question
)
for question in questions
]
return await asyncio.gather(*tasks)
def query_batch_sync(self, questions: list[str]) -> list[dict]:
"""Synchronous batch processing."""
return asyncio.run(self.query_batch(questions))
# Usage
batch_rag = BatchRAG(rag)
questions = [
"What is the main topic?",
"Who are the authors?",
"What are the key findings?",
]
results = batch_rag.query_batch_sync(questions)
Conversation Memory¶
Maintain context across queries:
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
class ConversationalRAG:
def __init__(self, rag_chain: RAGChain, memory_window: int = 5):
self.rag = rag_chain
self.memory = ConversationBufferWindowMemory(
k=memory_window,
return_messages=True,
memory_key="chat_history"
)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant answering questions based on documents.
Use the context provided to answer questions.
If you don't know, say so."""),
MessagesPlaceholder(variable_name="chat_history"),
("human", """Context: {context}
Question: {question}"""),
])
def query(self, question: str) -> str:
# Get context
docs = self.rag.retriever.invoke(question)
context = "\n\n".join([doc.page_content for doc in docs])
# Get chat history
history = self.memory.load_memory_variables({})["chat_history"]
# Generate response
messages = self.prompt.format_messages(
chat_history=history,
context=context,
question=question
)
response = self.rag.llm.invoke(messages)
# Save to memory
self.memory.save_context(
{"input": question},
{"output": response}
)
return response
def clear_memory(self):
self.memory.clear()
Scheduled Ingestion¶
Automatically update the knowledge base:
import schedule
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class DocumentHandler(FileSystemEventHandler):
def __init__(self, vectorstore, loader, chunker):
self.vectorstore = vectorstore
self.loader = loader
self.chunker = chunker
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix in [".pdf", ".md", ".txt"]:
print(f"New document: {path}")
self.ingest_file(path)
def ingest_file(self, path: Path):
docs = self.loader.load_file(path)
chunks = self.chunker.chunk(docs)
self.vectorstore.add_documents(chunks)
print(f"Ingested {len(chunks)} chunks from {path.name}")
# File watcher
def start_watcher(documents_dir: Path):
handler = DocumentHandler(vectorstore, loader, chunker)
observer = Observer()
observer.schedule(handler, str(documents_dir), recursive=True)
observer.start()
return observer
# Scheduled re-index
def full_reindex():
print("Starting full reindex...")
vectorstore.delete_collection()
documents = loader.load_all()
chunks = chunker.chunk(documents)
vectorstore.add_documents(chunks)
print(f"Reindexed {len(chunks)} chunks")
schedule.every().sunday.at("02:00").do(full_reindex)
# Run scheduler
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)