Milvus¶
Milvus is an enterprise-scale, cloud-native vector database designed for massive datasets and high-throughput workloads.
When to Use Milvus¶
- Billions of vectors
- GPU acceleration required
- Multi-tenancy
- Kubernetes-native deployment
- Real-time streaming ingestion
For smaller workloads, consider ChromaDB or Qdrant.
Installation¶
Docker Compose (Standalone)¶
# docker-compose.yml
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
container_name: milvus-etcd
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- etcd_data:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
image: minio/minio:latest
container_name: milvus-minio
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- minio_data:/minio_data
command: minio server /minio_data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
milvus:
image: milvusdb/milvus:latest
container_name: milvus-standalone
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
ports:
- "19530:19530"
- "9091:9091"
volumes:
- milvus_data:/var/lib/milvus
depends_on:
- etcd
- minio
volumes:
etcd_data:
minio_data:
milvus_data:
Python Client¶
Basic Usage¶
Connect¶
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
# Connect
connections.connect(host="localhost", port="19530")
Create Collection¶
# Define schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512),
]
schema = CollectionSchema(fields=fields, description="Document embeddings")
# Create collection
collection = Collection(name="documents", schema=schema)
Create Index¶
# Index is required for search
index_params = {
"metric_type": "COSINE", # or L2, IP
"index_type": "IVF_FLAT", # or HNSW, IVF_SQ8, etc.
"params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)
Insert Data¶
import numpy as np
# Prepare data
embeddings = np.random.rand(1000, 768).tolist()
texts = [f"Document {i}" for i in range(1000)]
sources = ["source.pdf"] * 1000
# Insert
data = [embeddings, texts, sources]
collection.insert(data)
# Flush to persist
collection.flush()
Search¶
# Load collection into memory
collection.load()
# Search
query_vector = np.random.rand(1, 768).tolist()
results = collection.search(
data=query_vector,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5,
output_fields=["text", "source"]
)
for hits in results:
for hit in hits:
print(f"ID: {hit.id}, Score: {hit.score}")
print(f"Text: {hit.entity.get('text')[:100]}...")
Index Types¶
| Index | Best For | Memory | Speed |
|---|---|---|---|
| FLAT | Small datasets, exact search | High | Slow |
| IVF_FLAT | Balanced recall/speed | Medium | Fast |
| IVF_SQ8 | Memory constrained | Low | Fast |
| HNSW | High recall requirement | High | Very fast |
| GPU_IVF_FLAT | GPU acceleration | GPU | Very fast |
HNSW Index¶
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {
"M": 16,
"efConstruction": 200
}
}
collection.create_index("embedding", index_params)
GPU Index¶
index_params = {
"metric_type": "L2",
"index_type": "GPU_IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index("embedding", index_params)
Filtering¶
Scalar Filtering¶
# Filter by field
results = collection.search(
data=query_vector,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5,
expr='source == "manual.pdf"',
output_fields=["text", "source"]
)
# Range filter
results = collection.search(
data=query_vector,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5,
expr='page >= 10 and page <= 50',
output_fields=["text", "page"]
)
# IN clause
results = collection.search(
data=query_vector,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5,
expr='category in ["tech", "science"]',
output_fields=["text", "category"]
)
Partitions¶
Organize data for efficient querying:
# Create partition
collection.create_partition("2024_data")
# Insert into partition
collection.insert(data, partition_name="2024_data")
# Search specific partition
results = collection.search(
data=query_vector,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=5,
partition_names=["2024_data"]
)
Collection Management¶
List Collections¶
Collection Info¶
Drop Collection¶
LangChain Integration¶
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import OllamaEmbeddings
embeddings = OllamaEmbeddings(model="nomic-embed-text")
# Create from documents
vectorstore = Milvus.from_documents(
documents,
embeddings,
connection_args={"host": "localhost", "port": "19530"},
collection_name="langchain_docs"
)
# Load existing
vectorstore = Milvus(
embeddings,
connection_args={"host": "localhost", "port": "19530"},
collection_name="langchain_docs"
)
# Search
results = vectorstore.similarity_search("query", k=4)
Performance Tuning¶
Memory Management¶
# Release collection from memory when not in use
collection.release()
# Load when needed
collection.load()
# Load specific partitions only
collection.load(partition_names=["recent_data"])
Batch Operations¶
# Batch insert
batch_size = 10000
for i in range(0, len(embeddings), batch_size):
batch_data = [
embeddings[i:i + batch_size],
texts[i:i + batch_size],
sources[i:i + batch_size]
]
collection.insert(batch_data)
collection.flush()
Index Parameters¶
# Tune search parameters
search_params = {
"metric_type": "COSINE",
"params": {
"nprobe": 16, # Higher = better recall, slower
"ef": 64 # For HNSW
}
}
Monitoring¶
Metrics¶
Grafana Dashboard¶
Milvus provides official Grafana dashboards for monitoring: - Query performance - Memory usage - Index status - Segment statistics
Kubernetes Deployment¶
For production, use Milvus Operator:
# Install operator
kubectl apply -f https://raw.githubusercontent.com/milvus-io/milvus-operator/main/deploy/manifests/deployment.yaml
# Deploy cluster
kubectl apply -f - <<EOF
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: milvus-cluster
spec:
mode: cluster
dependencies:
storage:
external: false
EOF
See Also¶
- Vector Databases Overview
- Qdrant - Simpler alternative for production
- RAG Implementation