Document Loading and Preprocessing for RAG with LangChain and ChromaDB
Master document ingestion, preprocessing, and chunking strategies for optimal RAG performance
langchain
chromadb
rag
document-processing
data-preprocessing
text-chunking
by Bui An Du
📄 Document Loading and Preprocessing for RAG
🔄 From Raw Data to RAG-Ready Content
Transform diverse document sources into optimized vector embeddings for superior retrieval
Understanding Document Sources
File Format Support in LangChain
LangChain supports a wide variety of document formats through specialized loaders:
python
from langchain.document_loaders import (
PyPDFLoader, # PDF documents
TextLoader, # Plain text files
UnstructuredHTMLLoader, # HTML pages
UnstructuredMarkdownLoader, # Markdown files
Docx2txtLoader, # Word documents
CSVLoader, # CSV files
JSONLoader, # JSON data
UnstructuredPowerPointLoader, # PowerPoint presentations
UnstructuredExcelLoader, # Excel spreadsheets
)
# Example: Loading multiple document types
loaders = [
PyPDFLoader("data/company_handbook.pdf"),
TextLoader("data/policies.txt"),
UnstructuredMarkdownLoader("data/readme.md"),
Docx2txtLoader("data/meeting_notes.docx"),
]
documents = []
for loader in loaders:
documents.extend(loader.load())
print(f"Loaded {len(documents)} documents from {len(loaders)} sources")Directory-Based Loading
python
from langchain.document_loaders import DirectoryLoader
from langchain.document_loaders import TextLoader
import os
# Load all files from a directory
def load_directory(directory_path: str):
"""Load all supported documents from a directory"""
# Define file extensions and their loaders
file_loaders = {
'.pdf': PyPDFLoader,
'.txt': TextLoader,
'.md': UnstructuredMarkdownLoader,
'.docx': Docx2txtLoader,
'.html': UnstructuredHTMLLoader,
'.json': JSONLoader,
}
all_documents = []
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
if file_ext in file_loaders:
try:
loader_class = file_loaders[file_ext]
loader = loader_class(file_path)
docs = loader.load()
# Add source metadata
for doc in docs:
doc.metadata['source_file'] = file_path
doc.metadata['file_type'] = file_ext
all_documents.extend(docs)
print(f"Loaded {len(docs)} documents from {file}")
except Exception as e:
print(f"Error loading {file_path}: {e}")
return all_documents
# Usage
documents = load_directory("data/company_documents/")
print(f"Total documents loaded: {len(documents)}")Advanced Document Processing
Web Scraping and URL Loading
python
from langchain.document_loaders import WebBaseLoader
from langchain.document_loaders import SeleniumURLLoader
import requests
from bs4 import BeautifulSoup
class AdvancedWebLoader:
def __init__(self, urls: list, chunk_size: int = 1000):
self.urls = urls
self.chunk_size = chunk_size
def load_documents(self):
"""Load and process web documents with advanced features"""
all_documents = []
for url in self.urls:
try:
# Use WebBaseLoader for simple scraping
loader = WebBaseLoader(url)
documents = loader.load()
# Enhanced metadata extraction
for doc in documents:
doc.metadata.update({
'url': url,
'title': self._extract_title(doc.page_content),
'description': self._extract_description(doc.page_content),
'last_modified': self._get_last_modified(url),
'content_type': 'web_page'
})
all_documents.extend(documents)
print(f"Loaded content from {url}")
except Exception as e:
print(f"Error loading {url}: {e}")
return all_documents
def _extract_title(self, html_content: str) -> str:
"""Extract page title from HTML"""
soup = BeautifulSoup(html_content, 'html.parser')
title_tag = soup.find('title')
return title_tag.text.strip() if title_tag else "Untitled"
def _extract_description(self, html_content: str) -> str:
"""Extract meta description"""
soup = BeautifulSoup(html_content, 'html.parser')
meta_desc = soup.find('meta', attrs={'name': 'description'})
return meta_desc.get('content', '') if meta_desc else ""
def _get_last_modified(self, url: str) -> str:
"""Get last modified date from HTTP headers"""
try:
response = requests.head(url, timeout=10)
return response.headers.get('last-modified', '')
except:
return ""API-Based Document Loading
python
from langchain.document_loaders.base import BaseLoader
from langchain.docstore.document import Document
import requests
class APIDocumentLoader(BaseLoader):
"""Load documents from REST APIs"""
def __init__(self, api_endpoint: str, headers: dict = None, params: dict = None):
self.api_endpoint = api_endpoint
self.headers = headers or {}
self.params = params or {}
def load(self):
"""Load documents from API"""
try:
response = requests.get(
self.api_endpoint,
headers=self.headers,
params=self.params,
timeout=30
)
response.raise_for_status()
data = response.json()
documents = []
if isinstance(data, list):
# List of documents
for item in data:
content = self._extract_content(item)
metadata = self._extract_metadata(item)
documents.append(Document(
page_content=content,
metadata=metadata
))
else:
# Single document
content = self._extract_content(data)
metadata = self._extract_metadata(data)
documents.append(Document(
page_content=content,
metadata=metadata
))
return documents
except Exception as e:
print(f"Error loading from API: {e}")
return []
def _extract_content(self, item: dict) -> str:
"""Extract text content from API response item"""
# Customize based on your API response structure
content_fields = ['content', 'text', 'body', 'description']
for field in content_fields:
if field in item and isinstance(item[field], str):
return item[field]
# Fallback: convert entire item to string
return str(item)
def _extract_metadata(self, item: dict) -> dict:
"""Extract metadata from API response item"""
metadata = {
'source': 'api',
'api_endpoint': self.api_endpoint,
}
# Extract common metadata fields
meta_fields = ['id', 'title', 'author', 'created_at', 'updated_at', 'tags']
for field in meta_fields:
if field in item:
metadata[field] = item[field]
return metadata
# Usage example
api_loader = APIDocumentLoader(
api_endpoint="https://api.example.com/articles",
headers={"Authorization": "Bearer your-token"}
)
documents = api_loader.load()Text Splitting Strategies
Intelligent Text Splitting
python
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter,
MarkdownHeaderTextSplitter,
Language
)
from langchain.document_loaders import UnstructuredMarkdownLoader
import re
class IntelligentTextSplitter:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_documents(self, documents):
"""Intelligently split documents based on their content type"""
split_documents = []
for doc in documents:
content_type = doc.metadata.get('file_type', '')
if content_type == '.md':
# Use markdown-aware splitting
splits = self._split_markdown(doc)
elif content_type == '.py' or content_type == '.js':
# Use code-aware splitting
splits = self._split_code(doc, content_type)
elif content_type == '.pdf':
# Use recursive splitting for PDFs
splits = self._split_recursive(doc)
else:
# Default recursive splitting
splits = self._split_recursive(doc)
split_documents.extend(splits)
return split_documents
def _split_markdown(self, document):
"""Split markdown documents preserving structure"""
# Split by headers first
headers_to_split_on = [
("# ", "Header 1"),
("## ", "Header 2"),
("### ", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
header_splits = markdown_splitter.split_text(document.page_content)
# Further split large chunks
final_splits = []
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
for split in header_splits:
if len(split.page_content) > self.chunk_size:
sub_splits = recursive_splitter.split_text(split.page_content)
for sub_split in sub_splits:
final_splits.append(Document(
page_content=sub_split,
metadata=document.metadata
))
else:
final_splits.append(split)
return final_splits
def _split_code(self, document, file_type):
"""Split code files preserving function/class boundaries"""
if file_type == '.py':
language = Language.PYTHON
elif file_type == '.js':
language = Language.JS
else:
language = Language.PYTHON # fallback
code_splitter = RecursiveCharacterTextSplitter.from_language(
language=language,
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
return code_splitter.split_documents([document])
def _split_recursive(self, document):
"""Standard recursive text splitting"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
return splitter.split_documents([document])Content-Aware Chunking
python
class SemanticTextSplitter:
def __init__(self, embeddings, chunk_size: int = 1000):
self.embeddings = embeddings
self.chunk_size = chunk_size
def split_text(self, text: str):
"""Split text based on semantic similarity"""
sentences = self._split_into_sentences(text)
chunks = []
current_chunk = []
for sentence in sentences:
# Check if adding this sentence would exceed chunk size
if len(' '.join(current_chunk + [sentence])) > self.chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
else:
current_chunk.append(sentence)
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def _split_into_sentences(self, text: str):
"""Split text into sentences"""
import re
# Simple sentence splitting - can be improved with NLP libraries
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
return [s.strip() for s in sentences if s.strip()]
def semantic_chunking(self, text: str):
"""Advanced semantic chunking using embeddings"""
sentences = self._split_into_sentences(text)
if len(sentences) <= 1:
return [text]
# Get embeddings for each sentence
sentence_embeddings = self.embeddings.embed_documents(sentences)
chunks = []
current_chunk = [sentences[0]]
current_embedding = sentence_embeddings[0]
for i in range(1, len(sentences)):
sentence = sentences[i]
embedding = sentence_embeddings[i]
# Calculate similarity with current chunk
similarity = self._cosine_similarity(current_embedding, embedding)
# If similarity is low, start new chunk
if similarity < 0.7: # Adjust threshold as needed
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_embedding = embedding
else:
current_chunk.append(sentence)
# Update current embedding (simple average)
current_embedding = [
(a + b) / 2 for a, b in zip(current_embedding, embedding)
]
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def _cosine_similarity(self, vec1, vec2):
"""Calculate cosine similarity between two vectors"""
import numpy as np
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return dot_product / (norm1 * norm2) if norm1 != 0 and norm2 != 0 else 0Metadata Enhancement
Automatic Metadata Extraction
python
from langchain.document_loaders import UnstructuredFileLoader
from langchain.docstore.document import Document
import spacy
import datetime
class EnhancedDocumentProcessor:
def __init__(self):
# Load NLP model for entity extraction
try:
self.nlp = spacy.load("en_core_web_sm")
except:
self.nlp = None
def process_documents(self, documents):
"""Enhance documents with rich metadata"""
enhanced_docs = []
for doc in documents:
enhanced_doc = self._enhance_document(doc)
enhanced_docs.append(enhanced_doc)
return enhanced_docs
def _enhance_document(self, document: Document):
"""Add comprehensive metadata to a document"""
content = document.page_content
metadata = document.metadata.copy()
# Basic text statistics
metadata.update({
'word_count': len(content.split()),
'char_count': len(content),
'sentence_count': len(content.split('.')),
'processed_at': datetime.datetime.now().isoformat(),
})
# Content analysis
if self.nlp:
doc_nlp = self.nlp(content[:10000]) # Limit for performance
# Extract entities
entities = [(ent.text, ent.label_) for ent in doc_nlp.ents]
metadata['named_entities'] = entities
# Extract key phrases (noun chunks)
noun_chunks = [chunk.text for chunk in doc_nlp.noun_chunks]
metadata['noun_chunks'] = noun_chunks[:10] # Top 10
# Language detection (simplified)
metadata['detected_language'] = 'en' # Could use langdetect library
# Content quality metrics
metadata.update({
'has_tables': '|' in content and '\n' in content,
'has_code': '```' in content or ' ' in content,
'has_lists': '- ' in content or '* ' in content or '1. ' in content,
'reading_time_minutes': max(1, len(content.split()) // 200), # ~200 words per minute
})
# Content categorization (simple keyword-based)
categories = self._categorize_content(content)
metadata['categories'] = categories
return Document(
page_content=content,
metadata=metadata
)
def _categorize_content(self, content: str):
"""Categorize content based on keywords"""
categories = []
content_lower = content.lower()
category_keywords = {
'technical': ['api', 'code', 'programming', 'software', 'development', 'algorithm'],
'business': ['company', 'policy', 'procedure', 'strategy', 'management', 'finance'],
'documentation': ['guide', 'tutorial', 'reference', 'manual', 'documentation'],
'academic': ['research', 'study', 'analysis', 'theory', 'methodology'],
'legal': ['contract', 'agreement', 'terms', 'policy', 'compliance', 'regulation'],
}
for category, keywords in category_keywords.items():
if any(keyword in content_lower for keyword in keywords):
categories.append(category)
return categories if categories else ['general']Data Quality and Cleaning
Text Cleaning Pipeline
python
import re
from typing import List
class TextCleaner:
def __init__(self):
self.cleaning_rules = [
self._remove_excessive_whitespace,
self._normalize_quotes,
self._remove_special_characters,
self._fix_encoding_issues,
self._standardize_line_breaks,
]
def clean_documents(self, documents: List[Document]):
"""Apply cleaning pipeline to documents"""
cleaned_docs = []
for doc in documents:
cleaned_content = doc.page_content
# Apply each cleaning rule
for rule in self.cleaning_rules:
cleaned_content = rule(cleaned_content)
# Create cleaned document
cleaned_doc = Document(
page_content=cleaned_content,
metadata=doc.metadata
)
cleaned_docs.append(cleaned_doc)
return cleaned_docs
def _remove_excessive_whitespace(self, text: str) -> str:
"""Remove excessive whitespace and normalize spacing"""
# Remove multiple spaces
text = re.sub(r' +', ' ', text)
# Remove multiple newlines
text = re.sub(r'\n{3,}', '\n\n', text)
# Strip whitespace
return text.strip()
def _normalize_quotes(self, text: str) -> str:
"""Normalize different types of quotes"""
# Convert smart quotes to regular quotes
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
return text
def _remove_special_characters(self, text: str) -> str:
"""Remove or replace problematic special characters"""
# Keep common punctuation, remove others
allowed_chars = re.compile(r'[^\w\s.,!?-]')
text = allowed_chars.sub('', text)
return text
def _fix_encoding_issues(self, text: str) -> str:
"""Fix common encoding issues"""
# Replace common encoding artifacts
replacements = {
'’': "'",
'“': '"',
'â€': '"',
'â€"': '—',
'â€"': '–',
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
def _standardize_line_breaks(self, text: str) -> str:
"""Standardize line breaks"""
# Convert different line break styles to \n
text = text.replace('\r\n', '\n').replace('\r', '\n')
return textContent Filtering
python
class ContentFilter:
def __init__(self, min_length: int = 50, max_length: int = 10000):
self.min_length = min_length
self.max_length = max_length
def filter_documents(self, documents: List[Document]):
"""Filter documents based on quality criteria"""
filtered_docs = []
for doc in documents:
if self._passes_filters(doc):
filtered_docs.append(doc)
return filtered_docs
def _passes_filters(self, document: Document) -> bool:
"""Check if document passes all filters"""
content = document.page_content
# Length filters
if len(content) < self.min_length:
return False
if len(content) > self.max_length:
return False
# Content quality filters
if not self._has_minimum_words(content):
return False
if self._is_low_quality(content):
return False
if self._contains_spam(content):
return False
return True
def _has_minimum_words(self, content: str) -> bool:
"""Check if content has minimum number of meaningful words"""
words = re.findall(r'\b\w+\b', content.lower())
meaningful_words = [w for w in words if len(w) > 2 and w not in {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}]
return len(meaningful_words) >= 10
def _is_low_quality(self, content: str) -> bool:
"""Detect low-quality content"""
# Too many repeated words
words = content.lower().split()
if len(words) > 0:
most_common_word_ratio = max(words.count(word) for word in set(words)) / len(words)
if most_common_word_ratio > 0.3: # More than 30% of content is same word
return True
# Too many special characters
special_char_ratio = len(re.findall(r'[^a-zA-Z0-9\s]', content)) / len(content)
if special_char_ratio > 0.5: # More than 50% special characters
return True
return False
def _contains_spam(self, content: str) -> bool:
"""Basic spam detection"""
spam_indicators = [
'buy now', 'click here', 'free money', 'work from home',
'guaranteed results', 'limited time offer', 'act now'
]
content_lower = content.lower()
return any(indicator in content_lower for indicator in spam_indicators)Integration with ChromaDB
Optimized Document Storage
python
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Any
class OptimizedChromaLoader:
def __init__(self, collection_name: str = "documents"):
self.client = chromadb.PersistentClient(
path="./chroma_db",
settings=Settings(
anonymized_telemetry=False,
allow_reset=True,
)
)
self.collection_name = collection_name
def load_documents(self, documents: List[Document], embeddings):
"""Load documents with optimized batching and metadata"""
collection = self.client.get_or_create_collection(
name=self.collection_name,
metadata={
"description": "RAG document collection",
"created": str(datetime.datetime.now()),
"embedding_model": "text-embedding-ada-002"
}
)
# Prepare data for batch insertion
texts = [doc.page_content for doc in documents]
metadatas = []
for doc in documents:
metadata = doc.metadata.copy()
# Ensure metadata values are ChromaDB-compatible
for key, value in metadata.items():
if isinstance(value, (list, dict)):
metadata[key] = str(value) # Convert complex types to strings
elif value is None:
metadata[key] = "" # Convert None to empty string
metadatas.append(metadata)
# Generate IDs
ids = [f"doc_{i}_{hash(text[:100])}" for i, text in enumerate(texts)]
# Batch embed and insert
batch_size = 100
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
batch_metadatas = metadatas[i:i + batch_size]
batch_ids = ids[i:i + batch_size]
try:
collection.add(
documents=batch_texts,
metadatas=batch_metadatas,
ids=batch_ids
)
print(f"Inserted batch {i//batch_size + 1} ({len(batch_texts)} documents)")
except Exception as e:
print(f"Error inserting batch {i//batch_size + 1}: {e}")
print(f"Successfully loaded {len(documents)} documents into ChromaDB")
def get_collection_stats(self):
"""Get collection statistics"""
try:
collection = self.client.get_collection(self.collection_name)
count = collection.count()
# Sample metadata to understand structure
if count > 0:
sample = collection.get(limit=1, include=['metadatas'])
metadata_keys = list(sample['metadatas'][0].keys()) if sample['metadatas'] else []
return {
"document_count": count,
"metadata_fields": metadata_keys if count > 0 else [],
"collection_name": self.collection_name
}
except Exception as e:
return {"error": str(e)}Best Practices Summary
Document Loading
- Use appropriate loaders for different file types
- Implement error handling and retry logic
- Add comprehensive metadata during loading
- Validate document content and structure
Text Preprocessing
- Clean and normalize text before chunking
- Preserve document structure when possible
- Implement content-aware chunking strategies
- Balance chunk size with semantic coherence
Quality Assurance
- Filter out low-quality or irrelevant content
- Validate metadata completeness
- Monitor document processing pipeline
- Implement content deduplication
Performance Optimization
- Use batch processing for large document sets
- Implement parallel processing where possible
- Cache processed documents
- Monitor memory usage during processing
The quality of your RAG system's retrieval directly depends on how well you preprocess and organize your documents. Invest time in robust document processing pipelines to achieve optimal results. 📚✨