AWS Bedrock RAG (Retrieval-Augmented Generation) Guide
Table of Contents
- Overview
- What is RAG?
- Why Use RAG?
- RAG Architecture
- Bedrock RAG Options
- RAG with AWS Bedrock
- Implementation Approaches
- Bedrock Knowledge Bases (Managed Approach)
- Custom RAG Implementation
- Vector Databases for RAG
- Embeddings Models
- Best Practices
- Optimization Techniques
- Complete Examples
- Troubleshooting
Overview
RAG (Retrieval-Augmented Generation) is a technique that enhances Large Language Models (LLMs) by providing them with relevant external knowledge retrieved from a database or document store. Instead of relying solely on the model's training data, RAG allows models to access up-to-date, domain-specific, or private information.
AWS Bedrock provides two main approaches for implementing RAG: 1. Managed Knowledge Bases - Fully managed RAG solution 2. Custom RAG - Build your own using Bedrock APIs and vector databases
Key Benefits: - Access to current and private data - Reduced hallucinations - Source attribution - No model retraining required - Cost-effective knowledge updates
What is RAG?
The Problem RAG Solves
Traditional LLM Limitations:
User: "What's our company's Q4 revenue?"
Traditional LLM:
❌ "I don't have access to your company's financial data"
❌ "Based on my training data from 2023..." (outdated)
❌ Makes up numbers (hallucination)
Problem:
- LLMs only know what they were trained on
- Training data has a cutoff date
- No access to private/proprietary data
- Cannot access real-time information
RAG Solution:
User: "What's our company's Q4 revenue?"
RAG-Enhanced LLM:
1. Retrieve: Search company documents for "Q4 revenue"
2. Find: "Q4 2024 revenue was $50M, up 25% YoY"
3. Generate: "According to the Q4 financial report,
revenue was $50M, representing 25% growth."
✅ Accurate, current, sourced information
How RAG Works
RAG Process Flow:
┌─────────────────────────────────────────────────────────────┐
│ USER QUERY │
│ "What is our return policy for electronics?" │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ STEP 1: QUERY PROCESSING │
│ • Convert query to embedding vector │
│ • Optimize for semantic search │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ STEP 2: RETRIEVAL │
│ • Search vector database │
│ • Find semantically similar documents │
│ • Rank by relevance score │
│ │
│ Retrieved Documents: │
│ 1. "Electronics can be returned within 30 days..." (0.95) │
│ 2. "Original packaging required for returns..." (0.87) │
│ 3. "Refunds processed within 5-7 business days..." (0.82) │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ STEP 3: AUGMENTATION │
│ • Combine retrieved context with user query │
│ • Create enhanced prompt for LLM │
│ │
│ Enhanced Prompt: │
│ "Based on these company policies: │
│ [Retrieved Document 1] │
│ [Retrieved Document 2] │
│ [Retrieved Document 3] │
│ │
│ Answer: What is our return policy for electronics?" │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ STEP 4: GENERATION │
│ • LLM processes enhanced prompt │
│ • Generates response using retrieved context │
│ • Includes source citations │
└────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ RESPONSE │
│ "Electronics can be returned within 30 days of purchase │
│ with original packaging. Refunds are processed within │
│ 5-7 business days. │
│ │
│ Source: Return Policy Document, Section 3.2" │
└─────────────────────────────────────────────────────────────┘
Detailed Workflow:
# Simplified RAG workflow
def rag_pipeline(user_query):
# 1. Convert query to embedding
query_embedding = embed_text(user_query)
# Result: [0.23, -0.45, 0.67, ...] (1536-dim vector)
# 2. Search vector database
similar_docs = vector_db.search(
query_embedding,
top_k=5,
min_score=0.7
)
# Result: [
# {"text": "...", "score": 0.95, "source": "policy.pdf"},
# {"text": "...", "score": 0.87, "source": "faq.pdf"}
# ]
# 3. Build augmented prompt
context = "\n\n".join([doc["text"] for doc in similar_docs])
augmented_prompt = f"""
Use this information to answer the question:
{context}
Question: {user_query}
Provide an accurate answer and cite your sources.
"""
# 4. Generate response
response = llm.generate(augmented_prompt)
return response
RAG vs Other Approaches
| Approach | How It Works | Pros | Cons | Use Case |
|---|---|---|---|---|
| Base LLM | Use model as-is | Simple, fast | Limited knowledge, outdated | General Q&A |
| Fine-tuning | Retrain model on custom data | Model learns domain | Expensive, static | Specialized domains |
| Prompt Engineering | Add context in prompt | Flexible | Token limits | Small context |
| RAG | Retrieve + Generate | Dynamic, scalable | Complexity | Large knowledge bases |
Comparison Example:
Question: "What's the status of order #12345?"
┌─────────────────────────────────────────────────────────────┐
│ BASE LLM │
│ "I don't have access to order information" │
│ ❌ Cannot access external data │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ FINE-TUNED MODEL │
│ "Orders typically take 3-5 days to ship" │
│ ⚠️ Generic answer, not specific to order #12345 │
│ 💰 Expensive to retrain for every order update │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ PROMPT ENGINEERING │
│ Include all orders in prompt: "Order 12345: shipped..." │
│ ⚠️ Hits token limits with many orders │
│ 💰 Expensive to include all data in every request │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ RAG │
│ 1. Search order database for #12345 │
│ 2. Retrieve: "Order #12345 shipped on Jan 15, tracking..." │
│ 3. Generate: "Your order #12345 was shipped on January 15 │
│ via FedEx. Tracking: 1Z999..." │
│ ✅ Accurate, current, specific │
│ ✅ Scales to millions of orders │
│ ✅ Updates automatically │
└─────────────────────────────────────────────────────────────┘
Why Use RAG?
1. Access to Private/Current Data
Problem: LLMs don't know your company's data or recent events
RAG Solution:
# Access private company data
query = "What were the key decisions from yesterday's board meeting?"
# RAG retrieves from internal documents
retrieved = [
"Board Meeting Minutes - Jan 15, 2025",
"Decision: Approved $5M budget for AI initiative",
"Decision: Expanded to European market in Q2"
]
# LLM generates answer with current, private information
response = "Based on yesterday's board meeting, two key decisions were made:
1. Approval of $5M budget for AI initiative
2. Plans to expand to European market in Q2 2025"
2. Reduced Hallucinations
Problem: LLMs sometimes make up information
Without RAG:
User: "What's the warranty on Model X-2000?"
LLM: "The Model X-2000 comes with a 2-year warranty"
❌ Made up answer (actual warranty is 3 years)
With RAG:
User: "What's the warranty on Model X-2000?"
Retrieved: "Model X-2000 Specifications: 3-year comprehensive warranty"
LLM: "The Model X-2000 includes a 3-year comprehensive warranty.
Source: Product Specifications Document"
✅ Accurate answer based on retrieved facts
3. Cost-Effective Knowledge Updates
Fine-tuning Approach:
New product launched → Retrain entire model → $$$
Policy updated → Retrain entire model → $$$
Price changed → Retrain entire model → $$$
Cost: $10,000+ per update
Time: Days to weeks
RAG Approach:
New product launched → Add document to knowledge base → $
Policy updated → Update document → $
Price changed → Update document → $
Cost: Pennies per update
Time: Minutes
4. Source Attribution
RAG provides citations:
response = {
"answer": "Our return policy allows 30-day returns for electronics",
"sources": [
{
"document": "Return Policy v2.3",
"page": 5,
"section": "Electronics Returns",
"confidence": 0.95
}
]
}
Benefits: - ✅ Verify accuracy - ✅ Build trust - ✅ Audit trail - ✅ Compliance
5. Domain Expertise
RAG enables instant domain experts:
Medical RAG:
Knowledge Base: Medical journals, research papers, clinical guidelines
Result: AI assistant with medical knowledge
Legal RAG:
Knowledge Base: Case law, statutes, legal precedents
Result: AI assistant with legal knowledge
Financial RAG:
Knowledge Base: Financial reports, market data, regulations
Result: AI assistant with financial knowledge
RAG Architecture
Core Components
┌─────────────────────────────────────────────────────────────┐
│ RAG SYSTEM ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 1. DOCUMENT INGESTION │ │
│ │ • Load documents (PDF, TXT, HTML, etc.) │ │
│ │ • Parse and extract text │ │
│ │ • Clean and normalize │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 2. CHUNKING │ │
│ │ • Split documents into chunks │ │
│ │ • Typical size: 500-1000 tokens │ │
│ │ • Maintain context overlap │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 3. EMBEDDING GENERATION │ │
│ │ • Convert chunks to vectors │ │
│ │ • Use embedding model │ │
│ │ • Typical dimension: 1536 │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 4. VECTOR STORAGE │ │
│ │ • Store embeddings in vector DB │ │
│ │ • Index for fast similarity search │ │
│ │ • Store metadata (source, page, etc.) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 5. QUERY PROCESSING │ │
│ │ • User asks question │ │
│ │ • Convert query to embedding │ │
│ │ • Search vector DB │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 6. RETRIEVAL │ │
│ │ • Find top-k similar chunks │ │
│ │ • Rank by relevance score │ │
│ │ • Apply filters (date, source, etc.) │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 7. AUGMENTATION │ │
│ │ • Build prompt with context │ │
│ │ • Format retrieved chunks │ │
│ │ • Add instructions │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 8. GENERATION │ │
│ │ • Send to LLM │ │
│ │ • Generate response │ │
│ │ • Include citations │ │
│ └────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Data Flow
Indexing Phase (One-time):
Documents → Chunking → Embeddings → Vector DB
│ │ │ │
│ │ │ └─ Store for retrieval
│ │ └─ Convert to vectors
│ └─ Split into pieces
└─ Source data
Query Phase (Every request):
User Query → Embedding → Vector Search → Retrieved Docs
│ │ │
│ │ └─ Top-k relevant chunks
│ └─ Find similar vectors
└─ Convert to vector
Retrieved Docs → Augment Prompt → LLM → Response
│ │ │ │
│ │ │ └─ Final answer
│ │ └─ Generate
│ └─ Add context
└─ Relevant information
Architecture Overview
Complete RAG System:
┌──────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ • PDFs • Word Docs • Web Pages • Databases • APIs │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ DOCUMENT PROCESSING │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Extract │→ │ Clean │→ │ Chunk │ │
│ │ Text │ │ Normalize │ │ Split │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ EMBEDDING SERVICE │
│ (AWS Bedrock Titan Embeddings) │
│ │
│ Text Chunk → [0.23, -0.45, 0.67, ..., 0.12] (1536 dimensions) │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ VECTOR DATABASE │
│ (OpenSearch, Pinecone, FAISS, Chroma, etc.) │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Chunk 1: [0.23, -0.45, ...] → "Return policy..." │ │
│ │ Chunk 2: [0.12, 0.89, ...] → "Warranty info..." │ │
│ │ Chunk 3: [-0.34, 0.56, ...] → "Shipping details..." │ │
│ │ ... │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
│ (Query Time)
│
┌──────────────────────────────────────────────────────────────────┐
│ USER QUERY │
│ "What is the return policy?" │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ QUERY EMBEDDING │
│ "What is the return policy?" → [0.25, -0.43, 0.69, ...] │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ SIMILARITY SEARCH │
│ Find vectors closest to query vector │
│ │
│ Results: │
│ 1. Chunk 1 (similarity: 0.95) → "Return policy..." │
│ 2. Chunk 5 (similarity: 0.87) → "Refund process..." │
│ 3. Chunk 9 (similarity: 0.82) → "Exchange policy..." │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ PROMPT AUGMENTATION │
│ │
│ Context: [Retrieved chunks] │
│ Question: "What is the return policy?" │
│ Instructions: "Answer based on context, cite sources" │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ LLM (Claude, etc.) │
│ Generate answer with context │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ RESPONSE │
│ "Items can be returned within 30 days with receipt. │
│ Refunds processed in 5-7 business days. │
│ Source: Return Policy Document, Section 3" │
└──────────────────────────────────────────────────────────────────┘
Bedrock RAG Options
AWS Bedrock offers multiple ways to implement RAG:
| Option | Complexity | Control | Use Case |
|---|---|---|---|
| Knowledge Bases | Low | Low | Quick start, managed solution |
| Custom RAG | High | High | Full customization needed |
| Hybrid | Medium | Medium | Mix managed + custom |
RAG with AWS Bedrock
Option 1: Bedrock Knowledge Bases (Managed)
Fully managed RAG solution - AWS handles everything:
┌─────────────────────────────────────────────────────────────┐
│ BEDROCK KNOWLEDGE BASES (MANAGED) │
├─────────────────────────────────────────────────────────────┤
│ │
│ AWS Manages: │
│ ✅ Document ingestion │
│ ✅ Chunking strategy │
│ ✅ Embedding generation │
│ ✅ Vector storage (OpenSearch) │
│ ✅ Retrieval logic │
│ ✅ Scaling and availability │
│ │
│ You Provide: │
│ 📄 Documents (S3 bucket) │
│ 🔧 Configuration (chunk size, etc.) │
│ 🤖 Model selection │
│ │
└─────────────────────────────────────────────────────────────┘
Pros: - ✅ Quick setup (minutes) - ✅ No infrastructure management - ✅ Automatic scaling - ✅ Built-in best practices - ✅ Integrated with Agents
Cons: - ❌ Less customization - ❌ AWS OpenSearch only - ❌ Fixed chunking strategies
Best for: - Getting started quickly - Standard use cases - Teams without ML expertise - Integration with Bedrock Agents
Option 2: Custom RAG with Bedrock
Build your own RAG pipeline using Bedrock APIs:
┌─────────────────────────────────────────────────────────────┐
│ CUSTOM RAG WITH BEDROCK │
├─────────────────────────────────────────────────────────────┤
│ │
│ You Control: │
│ 🔧 Document processing │
│ 🔧 Chunking strategy │
│ 🔧 Vector database choice │
│ 🔧 Retrieval algorithm │
│ 🔧 Prompt engineering │
│ │
│ Use Bedrock For: │
│ 🤖 Embeddings (Titan Embeddings) │
│ 🤖 Generation (Claude, Titan, etc.) │
│ │
└─────────────────────────────────────────────────────────────┘
Pros: - ✅ Full control - ✅ Any vector database - ✅ Custom chunking - ✅ Advanced retrieval - ✅ Optimized for your use case
Cons: - ❌ More complex - ❌ Manage infrastructure - ❌ Requires ML knowledge - ❌ More code to maintain
Best for: - Advanced use cases - Specific requirements - Existing vector DB - Maximum optimization
Option 3: Hybrid Approach
Combine managed and custom components:
Example 1: Knowledge Bases + Custom Retrieval
- Use KB for storage
- Custom logic for retrieval
Example 2: Custom Embeddings + KB Storage
- Your embedding model
- KB for vector storage
Example 3: KB + Custom Post-Processing
- KB for retrieval
- Custom reranking logic
Implementation Approaches
Quick Comparison
# APPROACH 1: Bedrock Knowledge Bases (Managed)
# ============================================
# Setup time: 10 minutes
# Code: ~50 lines
# 1. Create Knowledge Base (Console or API)
kb = bedrock_agent.create_knowledge_base(...)
# 2. Add data source (S3 bucket)
data_source = bedrock_agent.create_data_source(...)
# 3. Sync data
bedrock_agent.start_ingestion_job(...)
# 4. Query
response = bedrock_agent_runtime.retrieve_and_generate(
input={'text': 'What is the return policy?'},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'KB123',
'modelArn': 'arn:aws:bedrock:...:claude-3-sonnet'
}
}
)
print(response['output']['text'])
# Done! ✅
# APPROACH 2: Custom RAG
# ============================================
# Setup time: 2-3 days
# Code: ~500 lines
# 1. Load documents
docs = load_documents('data/')
# 2. Chunk documents
chunks = chunk_documents(docs, chunk_size=1000)
# 3. Generate embeddings
embeddings = []
for chunk in chunks:
emb = bedrock_runtime.invoke_model(
modelId='amazon.titan-embed-text-v1',
body=json.dumps({'inputText': chunk})
)
embeddings.append(emb)
# 4. Store in vector DB
vector_db.upsert(embeddings, chunks)
# 5. Query
query_emb = get_embedding(user_query)
results = vector_db.search(query_emb, top_k=5)
# 6. Build prompt
prompt = build_rag_prompt(user_query, results)
# 7. Generate
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-sonnet',
body=json.dumps({'messages': [{'role': 'user', 'content': prompt}]})
)
# Much more code, but full control ✅
Bedrock Knowledge Bases (Managed Approach)
What are Knowledge Bases?
Bedrock Knowledge Bases are fully managed RAG solutions that handle:
┌─────────────────────────────────────────────────────────────┐
│ KNOWLEDGE BASE COMPONENTS │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. DATA SOURCE │
│ • S3 bucket with documents │
│ • Supported: PDF, TXT, MD, HTML, DOC, CSV │
│ • Automatic monitoring for changes │
│ │
│ 2. EMBEDDING MODEL │
│ • Titan Embeddings G1 - Text │
│ • Titan Embeddings V2 │
│ • Cohere Embed models │
│ │
│ 3. VECTOR STORE │
│ • Amazon OpenSearch Serverless │
│ • Amazon OpenSearch Service │
│ • Pinecone │
│ • Redis Enterprise Cloud │
│ │
│ 4. CHUNKING STRATEGY │
│ • Fixed-size chunking │
│ • Default: 300 tokens │
│ • Configurable overlap │
│ │
│ 5. RETRIEVAL CONFIGURATION │
│ • Number of results │
│ • Metadata filtering │
│ • Hybrid search options │
│ │
└─────────────────────────────────────────────────────────────┘
Creating a Knowledge Base
Step-by-Step Guide:
import boto3
import json
bedrock_agent = boto3.client('bedrock-agent')
# Step 1: Create Knowledge Base
kb_response = bedrock_agent.create_knowledge_base(
name='company-docs-kb',
description='Company documentation and policies',
roleArn='arn:aws:iam::ACCOUNT:role/BedrockKBRole',
knowledgeBaseConfiguration={
'type': 'VECTOR',
'vectorKnowledgeBaseConfiguration': {
'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
}
},
storageConfiguration={
'type': 'OPENSEARCH_SERVERLESS',
'opensearchServerlessConfiguration': {
'collectionArn': 'arn:aws:aoss:us-east-1:ACCOUNT:collection/kb-collection',
'vectorIndexName': 'bedrock-knowledge-base-index',
'fieldMapping': {
'vectorField': 'bedrock-knowledge-base-default-vector',
'textField': 'AMAZON_BEDROCK_TEXT_CHUNK',
'metadataField': 'AMAZON_BEDROCK_METADATA'
}
}
}
)
kb_id = kb_response['knowledgeBase']['knowledgeBaseId']
print(f"Knowledge Base created: {kb_id}")
# Step 2: Create Data Source (S3)
ds_response = bedrock_agent.create_data_source(
knowledgeBaseId=kb_id,
name='s3-docs-source',
description='S3 bucket with company documents',
dataSourceConfiguration={
'type': 'S3',
's3Configuration': {
'bucketArn': 'arn:aws:s3:::my-company-docs',
'inclusionPrefixes': ['policies/', 'procedures/']
}
},
vectorIngestionConfiguration={
'chunkingConfiguration': {
'chunkingStrategy': 'FIXED_SIZE',
'fixedSizeChunkingConfiguration': {
'maxTokens': 300,
'overlapPercentage': 20
}
}
}
)
ds_id = ds_response['dataSource']['dataSourceId']
print(f"Data Source created: {ds_id}")
# Step 3: Start Ingestion Job
ingestion_response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=kb_id,
dataSourceId=ds_id
)
job_id = ingestion_response['ingestionJob']['ingestionJobId']
print(f"Ingestion job started: {job_id}")
# Step 4: Wait for ingestion to complete
import time
while True:
job_status = bedrock_agent.get_ingestion_job(
knowledgeBaseId=kb_id,
dataSourceId=ds_id,
ingestionJobId=job_id
)
status = job_status['ingestionJob']['status']
print(f"Ingestion status: {status}")
if status in ['COMPLETE', 'FAILED']:
break
time.sleep(10)
print("Knowledge Base ready!")
IAM Role for Knowledge Base:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-company-docs",
"arn:aws:s3:::my-company-docs/*"
]
},
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel"
],
"Resource": [
"arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v1"
]
},
{
"Effect": "Allow",
"Action": [
"aoss:APIAccessAll"
],
"Resource": [
"arn:aws:aoss:us-east-1:ACCOUNT:collection/*"
]
}
]
}
Querying Knowledge Bases
Two Query Methods:
Method 1: Retrieve Only (Get Documents)
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')
# Just retrieve relevant documents
retrieve_response = bedrock_agent_runtime.retrieve(
knowledgeBaseId='KB123456',
retrievalQuery={
'text': 'What is the return policy for electronics?'
},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': 5,
'overrideSearchType': 'HYBRID' # or 'SEMANTIC'
}
}
)
# Process results
for result in retrieve_response['retrievalResults']:
print(f"Score: {result['score']}")
print(f"Content: {result['content']['text']}")
print(f"Source: {result['location']['s3Location']['uri']}")
print(f"Metadata: {result['metadata']}")
print("---")
Method 2: Retrieve and Generate (RAG)
# Retrieve + Generate answer
rag_response = bedrock_agent_runtime.retrieve_and_generate(
input={
'text': 'What is the return policy for electronics?'
},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'KB123456',
'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0',
'retrievalConfiguration': {
'vectorSearchConfiguration': {
'numberOfResults': 5
}
},
'generationConfiguration': {
'promptTemplate': {
'textPromptTemplate': '''
You are a helpful customer service assistant.
Use the following context to answer the question.
If you don't know the answer, say so.
Always cite your sources.
Context:
$search_results$
Question: $query$
Answer:
'''
},
'inferenceConfig': {
'textInferenceConfig': {
'temperature': 0.7,
'maxTokens': 500
}
}
}
}
}
)
# Get answer
answer = rag_response['output']['text']
print(f"Answer: {answer}")
# Get citations
for citation in rag_response['citations']:
for reference in citation['retrievedReferences']:
print(f"Source: {reference['location']['s3Location']['uri']}")
print(f"Content: {reference['content']['text']}")
With Metadata Filtering:
# Filter by metadata
response = bedrock_agent_runtime.retrieve_and_generate(
input={'text': 'What are the Q4 results?'},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'KB123456',
'modelArn': 'arn:aws:bedrock:...:claude-3-sonnet',
'retrievalConfiguration': {
'vectorSearchConfiguration': {
'numberOfResults': 5,
'filter': {
'equals': {
'key': 'year',
'value': '2024'
}
}
}
}
}
}
)
Integration with Agents
Knowledge Bases work seamlessly with Bedrock Agents:
# Create agent with Knowledge Base
agent_response = bedrock_agent.create_agent(
agentName='customer-support-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='''
You are a customer support agent.
Use the knowledge base to answer questions about:
- Return policies
- Product information
- Shipping details
- Warranty information
Always be helpful and cite your sources.
''',
agentResourceRoleArn='arn:aws:iam::ACCOUNT:role/AgentRole'
)
agent_id = agent_response['agent']['agentId']
# Associate Knowledge Base with Agent
bedrock_agent.associate_agent_knowledge_base(
agentId=agent_id,
agentVersion='DRAFT',
knowledgeBaseId='KB123456',
description='Company policies and product documentation',
knowledgeBaseState='ENABLED'
)
# Prepare and create alias
bedrock_agent.prepare_agent(agentId=agent_id)
alias_response = bedrock_agent.create_agent_alias(
agentId=agent_id,
agentAliasName='production'
)
alias_id = alias_response['agentAlias']['agentAliasId']
# Now agent can use KB automatically
agent_response = bedrock_agent_runtime.invoke_agent(
agentId=agent_id,
agentAliasId=alias_id,
sessionId='session-123',
inputText='What is the warranty on Model X-2000?'
)
# Agent will:
# 1. Recognize it needs product information
# 2. Query the Knowledge Base
# 3. Use retrieved context to answer
# 4. Cite sources
Custom RAG Implementation
Step 1: Document Processing
Load and prepare documents:
import boto3
import json
from pathlib import Path
def load_documents(directory):
"""
Load documents from directory
"""
documents = []
for file_path in Path(directory).rglob('*'):
if file_path.suffix in ['.txt', '.md', '.pdf']:
# Extract text based on file type
if file_path.suffix == '.pdf':
text = extract_pdf_text(file_path)
else:
text = file_path.read_text(encoding='utf-8')
documents.append({
'text': text,
'source': str(file_path),
'metadata': {
'filename': file_path.name,
'type': file_path.suffix,
'size': file_path.stat().st_size
}
})
return documents
def extract_pdf_text(pdf_path):
"""
Extract text from PDF
"""
import PyPDF2
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text()
return text
def clean_text(text):
"""
Clean and normalize text
"""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove special characters if needed
# text = re.sub(r'[^\w\s.,!?-]', '', text)
return text
# Load documents
docs = load_documents('data/company-docs/')
print(f"Loaded {len(docs)} documents")
Step 2: Generate Embeddings
Use Bedrock Titan Embeddings:
def get_embedding(text, model_id='amazon.titan-embed-text-v1'):
"""
Generate embedding for text using Bedrock
"""
bedrock_runtime = boto3.client('bedrock-runtime')
# Prepare request
body = json.dumps({
'inputText': text
})
# Call Bedrock
response = bedrock_runtime.invoke_model(
modelId=model_id,
body=body,
contentType='application/json',
accept='application/json'
)
# Parse response
response_body = json.loads(response['body'].read())
embedding = response_body['embedding']
return embedding
def get_embeddings_batch(texts, batch_size=25):
"""
Generate embeddings for multiple texts
"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
for text in batch:
emb = get_embedding(text)
embeddings.append(emb)
print(f"Processed {min(i + batch_size, len(texts))}/{len(texts)} texts")
return embeddings
# Example usage
text = "Our return policy allows 30-day returns for all electronics."
embedding = get_embedding(text)
print(f"Embedding dimension: {len(embedding)}") # 1536 for Titan v1
print(f"First 5 values: {embedding[:5]}")
Using Titan Embeddings V2:
def get_embedding_v2(text, dimensions=1024):
"""
Use Titan Embeddings V2 with configurable dimensions
"""
bedrock_runtime = boto3.client('bedrock-runtime')
body = json.dumps({
'inputText': text,
'dimensions': dimensions, # 256, 512, or 1024
'normalize': True
})
response = bedrock_runtime.invoke_model(
modelId='amazon.titan-embed-text-v2:0',
body=body,
contentType='application/json',
accept='application/json'
)
response_body = json.loads(response['body'].read())
return response_body['embedding']
Step 3: Store in Vector Database
Option A: FAISS (Local/Simple)
import faiss
import numpy as np
import pickle
class FAISSVectorStore:
def __init__(self, dimension=1536):
self.dimension = dimension
self.index = faiss.IndexFlatL2(dimension)
self.documents = []
self.metadata = []
def add(self, embeddings, documents, metadata=None):
"""
Add embeddings to index
"""
embeddings_array = np.array(embeddings).astype('float32')
self.index.add(embeddings_array)
self.documents.extend(documents)
if metadata:
self.metadata.extend(metadata)
def search(self, query_embedding, top_k=5):
"""
Search for similar vectors
"""
query_array = np.array([query_embedding]).astype('float32')
distances, indices = self.index.search(query_array, top_k)
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.documents):
results.append({
'document': self.documents[idx],
'metadata': self.metadata[idx] if self.metadata else {},
'distance': float(distances[0][i]),
'score': 1 / (1 + float(distances[0][i])) # Convert to similarity
})
return results
def save(self, path):
"""
Save index to disk
"""
faiss.write_index(self.index, f"{path}/index.faiss")
with open(f"{path}/documents.pkl", 'wb') as f:
pickle.dump({'documents': self.documents, 'metadata': self.metadata}, f)
def load(self, path):
"""
Load index from disk
"""
self.index = faiss.read_index(f"{path}/index.faiss")
with open(f"{path}/documents.pkl", 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.metadata = data['metadata']
# Usage
vector_store = FAISSVectorStore(dimension=1536)
# Add documents
chunks = ["chunk 1 text", "chunk 2 text", "chunk 3 text"]
embeddings = get_embeddings_batch(chunks)
metadata = [{'source': 'doc1.pdf'}, {'source': 'doc1.pdf'}, {'source': 'doc2.pdf'}]
vector_store.add(embeddings, chunks, metadata)
# Search
query = "What is the return policy?"
query_emb = get_embedding(query)
results = vector_store.search(query_emb, top_k=3)
for result in results:
print(f"Score: {result['score']:.3f}")
print(f"Text: {result['document']}")
print(f"Source: {result['metadata']['source']}")
print("---")
# Save for later use
vector_store.save('vector_store')
Option B: Pinecone (Cloud)
import pinecone
# Initialize Pinecone
pinecone.init(
api_key='YOUR_API_KEY',
environment='us-west1-gcp'
)
# Create index
index_name = 'company-docs'
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=1536,
metric='cosine'
)
index = pinecone.Index(index_name)
# Add vectors
def add_to_pinecone(chunks, embeddings, metadata):
"""
Add vectors to Pinecone
"""
vectors = []
for i, (chunk, emb, meta) in enumerate(zip(chunks, embeddings, metadata)):
vectors.append({
'id': f'doc_{i}',
'values': emb,
'metadata': {
'text': chunk,
**meta
}
})
# Upsert in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
# Query
def query_pinecone(query_text, top_k=5):
"""
Query Pinecone index
"""
query_emb = get_embedding(query_text)
results = index.query(
vector=query_emb,
top_k=top_k,
include_metadata=True
)
return results['matches']
# Usage
add_to_pinecone(chunks, embeddings, metadata)
results = query_pinecone("What is the return policy?", top_k=3)
for match in results:
print(f"Score: {match['score']:.3f}")
print(f"Text: {match['metadata']['text']}")
print("---")
Option C: OpenSearch
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
# AWS credentials
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
'us-east-1',
'es',
session_token=credentials.token
)
# Connect to OpenSearch
client = OpenSearch(
hosts=[{'host': 'your-opensearch-endpoint.us-east-1.es.amazonaws.com', 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Create index with vector field
index_name = 'company-docs'
index_body = {
'settings': {
'index': {
'knn': True,
'knn.algo_param.ef_search': 100
}
},
'mappings': {
'properties': {
'text': {'type': 'text'},
'embedding': {
'type': 'knn_vector',
'dimension': 1536,
'method': {
'name': 'hnsw',
'space_type': 'cosinesimil',
'engine': 'nmslib'
}
},
'metadata': {'type': 'object'}
}
}
}
if not client.indices.exists(index=index_name):
client.indices.create(index=index_name, body=index_body)
# Add documents
def add_to_opensearch(chunks, embeddings, metadata):
"""
Add documents to OpenSearch
"""
for i, (chunk, emb, meta) in enumerate(zip(chunks, embeddings, metadata)):
doc = {
'text': chunk,
'embedding': emb,
'metadata': meta
}
client.index(index=index_name, id=str(i), body=doc)
# Query
def query_opensearch(query_text, top_k=5):
"""
Query OpenSearch with vector similarity
"""
query_emb = get_embedding(query_text)
query_body = {
'size': top_k,
'query': {
'knn': {
'embedding': {
'vector': query_emb,
'k': top_k
}
}
}
}
response = client.search(index=index_name, body=query_body)
results = []
for hit in response['hits']['hits']:
results.append({
'text': hit['_source']['text'],
'metadata': hit['_source']['metadata'],
'score': hit['_score']
})
return results
# Usage
add_to_opensearch(chunks, embeddings, metadata)
results = query_opensearch("What is the return policy?", top_k=3)
Step 4: Retrieval
Implement retrieval with reranking:
def retrieve_with_reranking(query, top_k=5, rerank_top_k=3):
"""
Retrieve documents with optional reranking
"""
# Step 1: Initial retrieval (get more than needed)
query_emb = get_embedding(query)
initial_results = vector_store.search(query_emb, top_k=top_k * 2)
# Step 2: Rerank using cross-encoder or LLM
reranked = rerank_results(query, initial_results)
# Step 3: Return top results
return reranked[:rerank_top_k]
def rerank_results(query, results):
"""
Rerank results using LLM
"""
# Simple reranking: ask LLM to score relevance
bedrock_runtime = boto3.client('bedrock-runtime')
scored_results = []
for result in results:
prompt = f"""
Query: {query}
Document: {result['document']}
On a scale of 0-10, how relevant is this document to the query?
Respond with only a number.
"""
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': 10,
'temperature': 0
})
)
response_body = json.loads(response['body'].read())
score = float(response_body['content'][0]['text'].strip())
result['rerank_score'] = score
scored_results.append(result)
# Sort by rerank score
scored_results.sort(key=lambda x: x['rerank_score'], reverse=True)
return scored_results
Hybrid Search (Vector + Keyword):
def hybrid_search(query, top_k=5, alpha=0.5):
"""
Combine vector search with keyword search
alpha: weight for vector search (1-alpha for keyword)
"""
# Vector search
query_emb = get_embedding(query)
vector_results = vector_store.search(query_emb, top_k=top_k * 2)
# Keyword search (simple BM25)
keyword_results = keyword_search(query, top_k=top_k * 2)
# Combine scores
combined = {}
for result in vector_results:
doc_id = result['metadata'].get('id')
combined[doc_id] = {
'document': result['document'],
'metadata': result['metadata'],
'score': alpha * result['score']
}
for result in keyword_results:
doc_id = result['metadata'].get('id')
if doc_id in combined:
combined[doc_id]['score'] += (1 - alpha) * result['score']
else:
combined[doc_id] = {
'document': result['document'],
'metadata': result['metadata'],
'score': (1 - alpha) * result['score']
}
# Sort by combined score
results = sorted(combined.values(), key=lambda x: x['score'], reverse=True)
return results[:top_k]
Step 5: Generation
Build RAG prompt and generate:
def build_rag_prompt(query, retrieved_docs):
"""
Build prompt with retrieved context
"""
context = "\n\n".join([
f"Document {i+1} (Source: {doc['metadata'].get('source', 'Unknown')}):\n{doc['document']}"
for i, doc in enumerate(retrieved_docs)
])
prompt = f"""
You are a helpful assistant. Use the following context to answer the question.
If the answer is not in the context, say "I don't have enough information to answer that."
Always cite which document(s) you used.
Context:
{context}
Question: {query}
Answer:
"""
return prompt
def generate_rag_response(query, retrieved_docs):
"""
Generate response using RAG
"""
bedrock_runtime = boto3.client('bedrock-runtime')
# Build prompt
prompt = build_rag_prompt(query, retrieved_docs)
# Call Claude
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'messages': [
{
'role': 'user',
'content': prompt
}
],
'max_tokens': 1000,
'temperature': 0.7
})
)
response_body = json.loads(response['body'].read())
answer = response_body['content'][0]['text']
return answer
# Complete RAG pipeline
def rag_query(query, top_k=3):
"""
Complete RAG query pipeline
"""
# 1. Retrieve
retrieved_docs = retrieve_with_reranking(query, top_k=top_k)
# 2. Generate
answer = generate_rag_response(query, retrieved_docs)
# 3. Return with sources
return {
'answer': answer,
'sources': [
{
'text': doc['document'],
'source': doc['metadata'].get('source'),
'score': doc['score']
}
for doc in retrieved_docs
]
}
# Usage
result = rag_query("What is the return policy for electronics?")
print(f"Answer: {result['answer']}\n")
print("Sources:")
for source in result['sources']:
print(f"- {source['source']} (score: {source['score']:.3f})")
Streaming Response:
def generate_rag_response_streaming(query, retrieved_docs):
"""
Generate streaming response
"""
bedrock_runtime = boto3.client('bedrock-runtime')
prompt = build_rag_prompt(query, retrieved_docs)
response = bedrock_runtime.invoke_model_with_response_stream(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': 1000,
'temperature': 0.7
})
)
# Stream response
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
if 'delta' in chunk and 'text' in chunk['delta']:
yield chunk['delta']['text']
# Usage
print("Answer: ", end='', flush=True)
for chunk in generate_rag_response_streaming(query, retrieved_docs):
print(chunk, end='', flush=True)
print()
Vector Databases for RAG
Comparison of Vector Database Options:
| Database | Type | Pros | Cons | Best For |
|---|---|---|---|---|
| FAISS | Local | Fast, free, simple | No persistence, single machine | Development, small scale |
| Pinecone | Cloud | Managed, scalable | Cost, vendor lock-in | Production, easy setup |
| OpenSearch | Self-hosted/Cloud | Full-text + vector, AWS native | Complex setup | AWS environments |
| Chroma | Local/Cloud | Simple API, open source | Newer, less mature | Development, prototyping |
| Weaviate | Self-hosted/Cloud | Feature-rich, GraphQL | Complex | Advanced use cases |
| Qdrant | Self-hosted/Cloud | Fast, Rust-based | Smaller community | Performance-critical |
Choosing a Vector Database:
Decision Tree:
Are you just prototyping?
├─ Yes → Use FAISS (simple, local)
└─ No → Continue
Do you want fully managed?
├─ Yes → Use Pinecone or AWS OpenSearch Serverless
└─ No → Continue
Already using AWS?
├─ Yes → Use OpenSearch (integrates well)
└─ No → Continue
Need hybrid search (vector + keyword)?
├─ Yes → Use OpenSearch or Weaviate
└─ No → Use Qdrant or Chroma
Need maximum performance?
└─ Use Qdrant or FAISS with custom infrastructure
Embeddings Models
Available Bedrock Embedding Models:
# Titan Embeddings G1 - Text
model_id = 'amazon.titan-embed-text-v1'
# - Dimension: 1536 (fixed)
# - Max input: 8192 tokens
# - Cost: $0.0001 per 1K tokens
# - Best for: General purpose
# Titan Embeddings V2
model_id = 'amazon.titan-embed-text-v2:0'
# - Dimensions: 256, 512, or 1024 (configurable)
# - Max input: 8192 tokens
# - Normalization option
# - Better performance than V1
# Cohere Embed English
model_id = 'cohere.embed-english-v3'
# - Dimension: 1024
# - Max input: 512 tokens
# - Optimized for English
# Cohere Embed Multilingual
model_id = 'cohere.embed-multilingual-v3'
# - Dimension: 1024
# - Max input: 512 tokens
# - Supports 100+ languages
Choosing an Embedding Model:
def choose_embedding_model(use_case):
"""
Recommendation based on use case
"""
recommendations = {
'general': 'amazon.titan-embed-text-v2:0',
'multilingual': 'cohere.embed-multilingual-v3',
'english_only': 'cohere.embed-english-v3',
'cost_sensitive': 'amazon.titan-embed-text-v2:0', # Smaller dimensions
'high_accuracy': 'amazon.titan-embed-text-v1' # Larger dimensions
}
return recommendations.get(use_case, 'amazon.titan-embed-text-v2:0')
Best Practices
1. Chunking Strategy
Optimal chunk size depends on your use case:
Small chunks (200-300 tokens) - Pros: Precise retrieval, less noise - Cons: May lose context, need more chunks - Best for: Q&A, fact lookup
Medium chunks (500-1000 tokens) - Pros: Good balance, maintains context - Cons: None for most cases - Best for: Most use cases, general documents
Large chunks (1500-2000 tokens) - Pros: More context, fewer chunks - Cons: Less precise, more noise - Best for: Long-form content, summaries
# Smart chunking with overlap
def smart_chunk(text, chunk_size=1000, overlap=200):
"""
Chunk text with overlap to maintain context
"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk_text = ' '.join(words[i:i + chunk_size])
chunks.append(chunk_text)
return chunks
# Semantic chunking (better)
def semantic_chunking(text):
"""
Chunk by semantic boundaries (paragraphs, sections)
"""
# Split by double newlines (paragraphs)
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) < 1000:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
2. Metadata Management
Store rich metadata for better filtering:
def create_document_with_metadata(text, source_file):
"""
Create document with comprehensive metadata
"""
return {
'text': text,
'metadata': {
'source': source_file,
'filename': Path(source_file).name,
'type': Path(source_file).suffix,
'created_at': datetime.now().isoformat(),
'word_count': len(text.split()),
'char_count': len(text),
# Domain-specific metadata
'version': '1',
'department': 'HR',
'category': 'policy',
'last_updated': '2024-01-15',
'author': 'HR Team',
'tags': ['return', 'policy', 'electronics']
}
}
# Query with metadata filters
def query_with_filters(query, filters):
"""
Query with metadata filtering
"""
results = vector_store.search(query, top_k=20)
# Apply filters
filtered = []
for result in results:
meta = result['metadata']
if filters.get('department') and meta.get('department') != filters['department']:
continue
if filters.get('category') and meta.get('category') != filters['category']:
continue
if filters.get('min_date') and meta.get('last_updated') < filters.get('min_date'):
continue
filtered.append(result)
return filtered[:5]
# Usage
results = query_with_filters(
"What is the return policy?",
filters={
'department': 'HR',
'category': 'policy',
'min_date': '2024-01-01'
}
)
3. Query Optimization
Enhance user queries for better retrieval:
def optimize_query(user_query):
"""
Enhance user query for better retrieval
"""
bedrock_runtime = boto3.client('bedrock-runtime')
prompt = f"""Rewrite this user query to be more specific and effective for document search.
Add relevant keywords and context.
Original query: {user_query}
Optimized query:"""
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'messages': [{
'role': 'user',
'content': prompt
}],
'max_tokens': 200,
'temperature': 0
})
)
response_body = json.loads(response['body'].read())
optimized = response_body['content'][0]['text'].strip()
return optimized
# Query expansion
def expand_query(query):
"""
Generate multiple query variations
"""
variations = [
query,
f"What is {query}",
f"Details about {query}?",
f"Information on {query}",
]
# Get embeddings for all variations
embeddings = [get_embedding(q) for q in variations]
# Average embeddings
avg_embedding = np.mean(embeddings, axis=0).tolist()
return avg_embedding
4. Response Quality
Ensure high-quality responses with validation:
def generate_with_quality_checks(query, retrieved_docs):
"""
Generate response with quality validation
"""
# Check if retrieved docs are relevant
if not retrieved_docs or retrieved_docs[0]['score'] < 0.7:
return {
'answer': "I don't have enough relevant information to answer that question confidently.",
'confidence': 'low',
'sources': []
}
# Generate response
answer = generate_rag_response(query, retrieved_docs)
# Validate response
if "I don't know" in answer or "not sure" in answer or "cannot" in answer:
confidence = 'low'
elif len(retrieved_docs) >= 3 and retrieved_docs[0]['score'] > 0.8:
confidence = 'high'
else:
confidence = 'medium'
return {
'answer': answer,
'confidence': confidence,
'sources': [doc['metadata']['source'] for doc in retrieved_docs]
}
# Add inline citations
def add_citations(answer, sources):
"""
Add inline citations to answer
"""
# Simple citation format
cited_answer = answer
for i, source in enumerate(sources, 1):
source_name = source['metadata'].get('filename', 'Unknown')
cited_answer += f"\n\n[{i}] {source_name}"
return cited_answer
5. Monitoring and Evaluation
Track RAG system performance:
class RAGMetrics:
def __init__(self):
self.queries = []
def log_query(self, query, results, response_time):
"""
Log query metrics
"""
self.queries.append({
'timestamp': datetime.now().isoformat(),
'query': query,
'num_results': len(results),
'top_score': results[0]['score'] if results else 0,
'response_time': response_time
})
def get_stats(self):
"""
Get performance statistics
"""
if not self.queries:
return {}
return {
'total_queries': len(self.queries),
'avg_response_time': np.mean([q['response_time'] for q in self.queries]),
'avg_top_score': np.mean([q['top_score'] for q in self.queries]),
'low_confidence_queries': len([q for q in self.queries if q['top_score'] < 0.7])
}
# Usage
metrics = RAGMetrics()
def rag_query_with_metrics(query):
"""
RAG query with performance tracking
"""
start_time = time.time()
# Retrieve
results = retrieve_with_reranking(query)
# Generate
answer = generate_rag_response(query, results)
response_time = time.time() - start_time
# Log metrics
metrics.log_query(query, results, response_time)
return answer
# Periodic evaluation
def evaluate_rag_quality(test_queries):
"""
Evaluate RAG system quality
"""
results = []
for query, expected_answer in test_queries:
answer = rag_query(query)
# Compare (simplified)
similarity = calculate_similarity(answer['answer'], expected_answer)
results.append({
'query': query,
'similarity': similarity,
'sources_used': len(answer['sources'])
})
avg_similarity = np.mean([r['similarity'] for r in results])
pass_rate = len([r for r in results if r['similarity'] > 0.8]) / len(results)
return {
'avg_similarity': avg_similarity,
'pass_rate': pass_rate
}
Optimization Techniques
1. Caching
Cache embeddings and results:
from functools import lru_cache
import hashlib
class EmbeddingCache:
def __init__(self):
self.cache = {}
def get_cache_key(self, text):
"""
Generate cache key from text
"""
return hashlib.md5(text.encode()).hexdigest()
def get_embedding(self, text):
"""
Get embedding with caching
"""
cache_key = self.get_cache_key(text)
if cache_key in self.cache:
return self.cache[cache_key]
# Generate embedding
embedding = generate_embedding(text)
# Cache it
self.cache[cache_key] = embedding
return embedding
# Query result caching
@lru_cache(maxsize=1000)
def cached_rag_query(query):
"""
Cache RAG query results
"""
return rag_query(query)
2. Batch Processing
Process multiple queries efficiently:
def batch_rag_queries(queries, batch_size=10):
"""
Process multiple queries in batches
"""
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
# Get embeddings in batch
embeddings = get_embeddings_batch(batch)
# Retrieve for each
batch_results = []
for query, emb in zip(batch, embeddings):
retrieved = vector_store.search(emb, top_k=3)
answer = generate_rag_response(query, retrieved)
batch_results.append(answer)
results.extend(batch_results)
return results
3. Async Processing
Handle concurrent requests:
import asyncio
import aioboto3
async def async_get_embedding(text, bedrock):
"""
Async embedding generation
"""
async with bedrock.client('bedrock-runtime') as client:
response = await client.invoke_model(
modelId='amazon.titan-embed-text-v1',
body=json.dumps({
'inputText': text
})
)
body = await response['body'].read()
return json.loads(body)['embedding']
async def async_rag_query(query):
"""
Async RAG query
"""
session = aioboto3.Session()
# Get query embedding
query_emb = await async_get_embedding(query)
# Search (assuming async vector store)
results = await vector_store.async_search(query_emb)
# Generate
answer = await async_generate_response(query, results)
return answer
async def process_queries_concurrent(queries):
"""
Process multiple queries concurrently
"""
tasks = [async_rag_query(q) for q in queries]
results = await asyncio.gather(*tasks)
return results
# Usage
queries = ["query 1", "query 2", "query 3"]
results = asyncio.run(process_queries_concurrent(queries))
4. Vector Index Optimization
Optimize FAISS index for speed:
import faiss
def create_optimized_index(dimension=1536, num_clusters=100):
"""
Create optimized FAISS index with IVF (Inverted File Index)
"""
# Quantizer
quantizer = faiss.IndexFlatL2(dimension)
# IVF index
index = faiss.IndexIVFFlat(quantizer, dimension, num_clusters)
return index
def train_and_add(index, embeddings):
"""
Train index and add vectors
"""
embeddings_array = np.array(embeddings).astype('float32')
# Train
index.train(embeddings_array)
# Add vectors
index.add(embeddings_array)
return index
def optimized_search(index, query_embedding, top_k=5, nprobe=10):
"""
Search with optimized parameters
"""
# Set nprobe (number of clusters to search)
index.nprobe = nprobe
query_array = np.array([query_embedding]).astype('float32')
distances, indices = index.search(query_array, top_k)
return indices, distances
Complete Examples
Example 1: Simple RAG System
"""
Complete simple RAG system
"""
import boto3
import json
import faiss
import numpy as np
from pathlib import Path
class SimpleRAG:
def __init__(self):
self.bedrock = boto3.client('bedrock-runtime')
self.documents = []
self.vector_store = None
def load_documents(self, directory):
"""
Load documents from directory
"""
for file_path in Path(directory).glob('*.txt'):
text = file_path.read_text()
self.documents.append({
'text': text,
'source': str(file_path)
})
print(f"Loaded {len(self.documents)} documents")
def chunk_documents(self, chunk_size=1000):
"""
Chunk documents
"""
chunks = []
for doc in self.documents:
words = doc['text'].split()
for i in range(0, len(words), chunk_size):
chunk_text = ' '.join(words[i:i + chunk_size])
chunks.append({
'text': chunk_text,
'source': doc['source']
})
self.documents = chunks
print(f"Created {len(chunks)} chunks")
def create_embeddings(self):
"""
Generate embeddings for all chunks
"""
embeddings = []
for i, doc in enumerate(self.documents):
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{len(self.documents)}")
response = self.bedrock.invoke_model(
modelId='amazon.titan-embed-text-v1',
body=json.dumps({
'inputText': doc['text']
})
)
emb = json.loads(response['body'].read())['embedding']
embeddings.append(emb)
# Create FAISS index
embeddings_array = np.array(embeddings).astype('float32')
self.vector_store = faiss.IndexFlatL2(len(embeddings[0]))
self.vector_store.add(embeddings_array)
print("Vector store created")
def retrieve(self, query, top_k=3):
"""
Retrieve relevant documents
"""
# Get query embedding
response = self.bedrock.invoke_model(
modelId='amazon.titan-embed-text-v1',
body=json.dumps({'inputText': query})
)
query_emb = json.loads(response['body'].read())['embedding']
# Search
query_array = np.array([query_emb]).astype('float32')
distances, indices = self.vector_store.search(query_array, top_k)
# Get documents
results = []
for idx in indices[0]:
results.append(self.documents[idx])
return results
def generate(self, query, context_docs):
"""
Generate answer with context
"""
# Build prompt
context = "\n\n".join([
f"Document {i+1}:\n{doc['text']}"
for i, doc in enumerate(context_docs)
])
prompt = f"""Use this context to answer the question:
Context:
{context}
Question: {query}
Answer the question based on the provided context. Include citations."""
response = self.bedrock.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'messages': [{
'role': 'user',
'content': prompt
}],
'max_tokens': 1000
})
)
answer = json.loads(response['body'].read())['content'][0]['text']
return answer
def query(self, question):
"""
Complete RAG query
"""
# Retrieve
docs = self.retrieve(question, top_k=3)
# Generate
answer = self.generate(question, docs)
return {
'answer': answer,
'sources': [doc['source'] for doc in docs]
}
# Usage
rag = SimpleRAG()
rag.load_documents('data/company-docs/')
rag.chunk_documents(chunk_size=1000)
rag.create_embeddings()
# Query
result = rag.query("What is the return policy?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
Example 2: Production RAG with Knowledge Bases
"""
Production RAG using Bedrock Knowledge Bases
"""
import boto3
import json
class ProductionRAG:
def __init__(self, kb_id, model_arn):
self.kb_id = kb_id
self.model_arn = model_arn
self.client = boto3.client('bedrock-agent-runtime')
def query(self, question, filters=None, top_k=5):
"""
Query with optional metadata filtering
"""
config = {
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': self.kb_id,
'modelArn': self.model_arn,
'retrievalConfiguration': {
'vectorSearchConfiguration': {
'numberOfResults': top_k
}
}
}
}
# Add filters if provided
if filters:
config['knowledgeBaseConfiguration']['retrievalConfiguration']['vectorSearchConfiguration']['filter'] = filters
response = self.client.retrieve_and_generate(
input={'text': question},
retrieveAndGenerateConfiguration=config
)
return {
'answer': response['output']['text'],
'citations': response.get('citations', []),
'session_id': response.get('sessionId')
}
def multi_turn_conversation(self, session_id, question):
"""
Continue conversation with context
"""
response = self.client.retrieve_and_generate(
input={'text': question},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': self.kb_id,
'modelArn': self.model_arn
}
},
sessionId=session_id
)
return {
'answer': response['output']['text'],
'citations': response.get('citations', [])
}
# Usage
rag = ProductionRAG(
kb_id='KB123456',
model_arn='arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0'
)
# Single query
result = rag.query("What is the return policy?")
print(f"Answer: {result['answer']}")
# With filters
result = rag.query(
"What are the Q4 results?",
filters={'equals': {'key': 'year', 'value': '2024'}}
)
# Multi-turn conversation
session_id = result.get('session_id')
followup = rag.multi_turn_conversation(session_id, "What about Q3?")
Troubleshooting
Common Issues and Solutions
Issue 1: Low Retrieval Quality
Problem: Retrieved documents not relevant
Solutions: 1. Adjust chunk size
# Try smaller chunks
chunks = chunk_documents(docs, chunk_size=500)
- Use better embeddings
# Use Titan Embeddings V2
embedding_v2 = get_embedding_v2(text, dimensions=1024) # Instead of 1536
- Add query optimization
optimized_query = optimize_query(user_query)
results = retrieve(optimized_query)
- Use hybrid search
# Combine vector search + keyword
results = hybrid_search(query, alpha=0.7) # 70% vector + 30% keyword
Issue 2: Slow Performance
Problem: Queries taking too long
Solutions: 1. Use caching
cache = EmbeddingCache()
embedding = cache.get_embedding(text)
- Optimize index
# Create optimized FAISS index
index = create_optimized_index(dimension=1536, num_clusters=100)
- Reduce top_k
# Instead of 10
results = retrieve(query, top_k=3)
- Use async processing
results = await async_rag_query(query)
Issue 3: High Embedding Costs
Problem: Embedding costs too high
Solutions: 1. Use smaller dimensions
# Use Titan Embeddings V2 with 256 dimensions instead of 1536
embedding = get_embedding_v2(text, dimensions=256)
- Cache embeddings
# Don't regenerate for same text
embedding = cache.get_embedding(text)
- Batch process
# Instead of one at a time
embeddings = get_embeddings_batch(texts, batch_size=25)
- Use cheaper models for reranking
# Use Haiku instead of Sonnet for scoring
score = rerank_with_haiku(query, docs)
Issue 4: Hallucination
Problem: Model making up information
Solutions: 1. Stricter prompts
prompt = """ONLY use information from the provided context.
If the answer is not in the context, say "I don't have that information."
Context: {context}
Question: {query}"""
- Check retrieval score
if results[0]['score'] < 0.7:
return "I don't have enough relevant information"
- Add confidence scoring
confidence = calculate_confidence(results)
if confidence < 0.8:
return "Low confidence warning: ..."
Issue 5: Token Limit Exceeded
Problem: Context too large for model
Solutions: 1. Reduce number of retrieved docs
# Instead of 10
results = retrieve(query, top_k=2)
- Truncate long documents
def truncate_doc(doc, max_tokens=500):
words = doc['text'].split()
return ' '.join(words[:max_tokens])
- Use summarization
# Summarize context before generation
summary = summarize_context(retrieved_docs)
answer = generate(query, summary)
- Use models with larger context
# Claude 3 Sonnet supports 200K tokens
# Use for larger context needs
Summary
RAG (Retrieval-Augmented Generation) enhances LLMs by providing them with relevant external knowledge, enabling: - Access to current and private data - Reduced hallucinations - Source attribution - Cost-effective knowledge updates
AWS Bedrock offers two main approaches: 1. Knowledge Bases - Fully managed, quick setup, integrated with Agents 2. Custom RAG - Full control, advanced features, custom vector database
Key Components: - Document processing and chunking - Embedding generation (Titan, Cohere) - Vector storage (OpenSearch, Pinecone, FAISS) - Retrieval with reranking - Response generation with citations
Best Practices: - Choose appropriate chunk size (500-1000 tokens) - Store rich metadata for filtering - Optimize queries before retrieval - Implement quality checks - Monitor performance metrics
Optimization: - Cache embeddings and results - Batch process multiple queries - Use async for concurrency - Optimize vector indexes
For advanced use cases, move to custom RAG with full control over chunking, retrieval, and generation strategies. Start with Knowledge Bases for quick prototyping, then optimize as needed.