RAG Implementation Guide¶
Step-by-step guide to building a production-ready RAG pipeline.
Architecture Overview¶
┌─────────────────────────────────────────────────────────────────┐
│ Production RAG Stack │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Ingestion │ │ Serving │ │ Monitoring │ │
│ │ Pipeline │ │ Layer │ │ │ │
│ │ │ │ │ │ - Query latency │ │
│ │ - Loaders │ │ - FastAPI │ │ - Retrieval quality │ │
│ │ - Chunkers │ │ - Streaming │ │ - Token usage │ │
│ │ - Embedders │ │ - Caching │ │ - Error rates │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │ │ │
│ v v │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Vector Database ││
│ │ (ChromaDB) ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │ │
│ v v │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ LLM (Ollama) ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────┘
Project Setup¶
Directory Structure¶
rag-project/
├── src/
│ ├── __init__.py
│ ├── config.py
│ ├── ingestion/
│ │ ├── __init__.py
│ │ ├── loaders.py
│ │ ├── chunkers.py
│ │ └── embedders.py
│ ├── retrieval/
│ │ ├── __init__.py
│ │ ├── vectorstore.py
│ │ └── retrievers.py
│ ├── generation/
│ │ ├── __init__.py
│ │ ├── llm.py
│ │ └── chains.py
│ └── api/
│ ├── __init__.py
│ └── server.py
├── data/
│ ├── documents/
│ └── vectordb/
├── tests/
├── pyproject.toml
└── docker-compose.yml
Dependencies¶
# pyproject.toml
[project]
name = "rag-pipeline"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"langchain>=0.1.0",
"langchain-community>=0.0.20",
"chromadb>=0.4.0",
"ollama>=0.1.0",
"fastapi>=0.109.0",
"uvicorn>=0.27.0",
"python-multipart>=0.0.6",
"pypdf>=4.0.0",
"unstructured>=0.12.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ragas>=0.1.0",
]
Implementation¶
Configuration¶
# src/config.py
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Paths
documents_dir: Path = Path("data/documents")
vectordb_dir: Path = Path("data/vectordb")
# Chunking
chunk_size: int = 1000
chunk_overlap: int = 200
# Embedding
embedding_model: str = "nomic-embed-text"
# LLM
llm_model: str = "llama3.2"
llm_base_url: str = "http://localhost:11434"
# Retrieval
retrieval_k: int = 4
# ChromaDB
collection_name: str = "documents"
class Config:
env_prefix = "RAG_"
settings = Settings()
Document Loaders¶
# src/ingestion/loaders.py
from pathlib import Path
from typing import List
from langchain_core.documents import Document
from langchain_community.document_loaders import (
DirectoryLoader,
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader,
)
class DocumentLoader:
"""Load documents from various sources."""
LOADERS = {
".pdf": PyPDFLoader,
".txt": TextLoader,
".md": UnstructuredMarkdownLoader,
}
def __init__(self, documents_dir: Path):
self.documents_dir = documents_dir
def load_all(self) -> List[Document]:
"""Load all documents from the directory."""
documents = []
for ext, loader_cls in self.LOADERS.items():
loader = DirectoryLoader(
str(self.documents_dir),
glob=f"**/*{ext}",
loader_cls=loader_cls,
show_progress=True,
)
documents.extend(loader.load())
return documents
def load_file(self, file_path: Path) -> List[Document]:
"""Load a single file."""
ext = file_path.suffix.lower()
loader_cls = self.LOADERS.get(ext)
if not loader_cls:
raise ValueError(f"Unsupported file type: {ext}")
loader = loader_cls(str(file_path))
return loader.load()
Chunking¶
# src/ingestion/chunkers.py
from typing import List
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
class DocumentChunker:
"""Split documents into chunks."""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""],
)
def chunk(self, documents: List[Document]) -> List[Document]:
"""Split documents into chunks."""
return self.splitter.split_documents(documents)
def chunk_with_metadata(
self,
documents: List[Document],
add_chunk_index: bool = True
) -> List[Document]:
"""Split documents and enrich metadata."""
chunks = self.splitter.split_documents(documents)
if add_chunk_index:
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["chunk_total"] = len(chunks)
return chunks
Vector Store¶
# src/retrieval/vectorstore.py
from pathlib import Path
from typing import List, Optional
import chromadb
from chromadb.config import Settings as ChromaSettings
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings
from langchain_core.documents import Document
class VectorStore:
"""Manage vector storage and retrieval."""
def __init__(
self,
persist_directory: Path,
collection_name: str,
embedding_model: str = "nomic-embed-text",
):
self.persist_directory = persist_directory
self.collection_name = collection_name
self.embeddings = OllamaEmbeddings(model=embedding_model)
self.client = chromadb.PersistentClient(
path=str(persist_directory),
settings=ChromaSettings(anonymized_telemetry=False),
)
self._vectorstore: Optional[Chroma] = None
@property
def vectorstore(self) -> Chroma:
"""Get or create the vectorstore."""
if self._vectorstore is None:
self._vectorstore = Chroma(
client=self.client,
collection_name=self.collection_name,
embedding_function=self.embeddings,
)
return self._vectorstore
def add_documents(self, documents: List[Document]) -> List[str]:
"""Add documents to the vectorstore."""
return self.vectorstore.add_documents(documents)
def search(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
) -> List[Document]:
"""Search for similar documents."""
return self.vectorstore.similarity_search(
query,
k=k,
filter=filter,
)
def search_with_scores(
self,
query: str,
k: int = 4,
) -> List[tuple[Document, float]]:
"""Search with relevance scores."""
return self.vectorstore.similarity_search_with_score(query, k=k)
def delete_collection(self):
"""Delete the collection."""
self.client.delete_collection(self.collection_name)
self._vectorstore = None
RAG Chain¶
# src/generation/chains.py
from typing import Optional
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from ..retrieval.vectorstore import VectorStore
RAG_PROMPT = PromptTemplate.from_template("""
You are a helpful assistant answering questions based on the provided context.
INSTRUCTIONS:
- Only use information from the context below
- If the context doesn't contain the answer, say "I don't have enough information to answer that."
- Be concise but complete
- Cite the source document when relevant
CONTEXT:
{context}
QUESTION: {question}
ANSWER:""")
class RAGChain:
"""RAG question-answering chain."""
def __init__(
self,
vectorstore: VectorStore,
model: str = "llama3.2",
base_url: str = "http://localhost:11434",
k: int = 4,
):
self.vectorstore = vectorstore
self.k = k
self.llm = Ollama(
model=model,
base_url=base_url,
)
self.retriever = vectorstore.vectorstore.as_retriever(
search_kwargs={"k": k}
)
def _format_docs(self, docs) -> str:
"""Format retrieved documents."""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "Unknown")
formatted.append(f"[{i}] Source: {source}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
def create_chain(self):
"""Create the RAG chain."""
return (
{
"context": self.retriever | self._format_docs,
"question": RunnablePassthrough(),
}
| RAG_PROMPT
| self.llm
| StrOutputParser()
)
def query(self, question: str) -> str:
"""Query the RAG system."""
chain = self.create_chain()
return chain.invoke(question)
def query_with_sources(self, question: str) -> dict:
"""Query and return sources."""
docs = self.retriever.invoke(question)
answer = self.query(question)
return {
"answer": answer,
"sources": [
{
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata,
}
for doc in docs
],
}
FastAPI Server¶
# src/api/server.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from ..config import settings
from ..ingestion.loaders import DocumentLoader
from ..ingestion.chunkers import DocumentChunker
from ..retrieval.vectorstore import VectorStore
from ..generation.chains import RAGChain
app = FastAPI(title="RAG API")
# Initialize components
vectorstore = VectorStore(
persist_directory=settings.vectordb_dir,
collection_name=settings.collection_name,
embedding_model=settings.embedding_model,
)
rag_chain = RAGChain(
vectorstore=vectorstore,
model=settings.llm_model,
k=settings.retrieval_k,
)
class QueryRequest(BaseModel):
question: str
k: Optional[int] = None
class QueryResponse(BaseModel):
answer: str
sources: List[dict]
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Query the RAG system."""
try:
result = rag_chain.query_with_sources(request.question)
return QueryResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ingest")
async def ingest_documents():
"""Ingest all documents from the documents directory."""
loader = DocumentLoader(settings.documents_dir)
chunker = DocumentChunker(settings.chunk_size, settings.chunk_overlap)
documents = loader.load_all()
chunks = chunker.chunk(documents)
ids = vectorstore.add_documents(chunks)
return {
"documents_loaded": len(documents),
"chunks_created": len(chunks),
"ids": ids[:5], # Return first 5 IDs
}
@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
"""Upload and ingest a single document."""
# Save file temporarily
temp_path = settings.documents_dir / file.filename
content = await file.read()
temp_path.write_bytes(content)
# Process
loader = DocumentLoader(settings.documents_dir)
chunker = DocumentChunker(settings.chunk_size, settings.chunk_overlap)
documents = loader.load_file(temp_path)
chunks = chunker.chunk(documents)
ids = vectorstore.add_documents(chunks)
return {
"filename": file.filename,
"chunks_created": len(chunks),
}
@app.delete("/collection")
async def delete_collection():
"""Delete the vector collection."""
vectorstore.delete_collection()
return {"status": "deleted"}
@app.get("/health")
async def health():
"""Health check."""
return {"status": "healthy"}
Docker Compose Setup¶
# docker-compose.yml
services:
ollama:
image: ollama/ollama:latest
container_name: ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
rag-api:
build: .
container_name: rag-api
ports:
- "8000:8000"
volumes:
- ./data:/app/data
environment:
- RAG_LLM_BASE_URL=http://ollama:11434
depends_on:
- ollama
volumes:
ollama_data:
Dockerfile¶
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY pyproject.toml .
RUN pip install .
# Copy source
COPY src/ src/
# Run
CMD ["uvicorn", "src.api.server:app", "--host", "0.0.0.0", "--port", "8000"]
Running the Pipeline¶
1. Start Services¶
# Start Ollama and pull models
docker compose up -d ollama
docker exec ollama ollama pull llama3.2
docker exec ollama ollama pull nomic-embed-text
# Start RAG API
docker compose up -d rag-api
2. Ingest Documents¶
# Add documents to data/documents/
cp your-docs/* data/documents/
# Trigger ingestion
curl -X POST http://localhost:8000/ingest
3. Query¶
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "What are the main topics covered?"}'