Building Reliable LLM Pipelines with Pydantic and LangChain
Table of Contents
Contents
Building Reliable LLM Pipelines with Pydantic and LangChain
Introduction: Why Structured Output Matters for LLM Applications
Large Language Models (LLMs) excel at generating human-like text, but their unstructured outputs often pose challenges for production applications. While ChatGPT might give you a perfectly formatted response in conversation, integrating that same capability into your application requires reliable, parseable data structures.
This is where Pydantic and LangChain become essential tools. Pydantic provides robust data validation and parsing capabilities, while LangChain offers multiple approaches to seamlessly integrate structured output generation with various LLM providers. The modern approach uses LangChain’s with_structured_output()
method, which leverages native model capabilities for the most reliable results, while the traditional PydanticOutputParser
provides a fallback for older models or custom scenarios.
Understanding Pydantic: The Foundation of Structured Data
What is Pydantic and Why Use It?
Pydantic is a Python library that uses type hints to validate data and serialize complex data structures. It’s particularly powerful for LLM applications for several critical reasons:
-
Type Safety and Validation: Pydantic ensures that LLM outputs conform to expected data types and constraints, preventing runtime errors from malformed data.
-
Automatic Parsing: It can automatically convert JSON-like structures from LLM outputs into Python objects with proper typing.
-
Error Handling: Provides comprehensive error messages when validation fails, making it easier to debug and improve your prompts.
-
Schema Generation: Automatically generates JSON schemas that can be used in prompts to guide the LLM toward producing correctly structured outputs.
-
Integration with Frameworks: Works seamlessly with FastAPI, LangChain, and other modern Python frameworks.
This allows us to have much more control over our LLM applications. Here’s a simple example of a Pydantic model:
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class PersonInfo(BaseModel):
name: str = Field(..., description="Full name of the person")
age: int = Field(..., ge=0, le=150, description="Age in years")
email: Optional[str] = Field(None, pattern=r'^[^@]+@[^@]+\.[^@]+$')
skills: List[str] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.now)
This model not only defines the structure of your data but also enforces validation rules (age between 0-150, valid email format) and provides default values where appropriate.
Core Pydantic Concepts for LLM Integration
BaseModel serves as the foundation for all structured data classes. Every model you create inherits from BaseModel, providing automatic validation, serialization, and JSON parsing capabilities.
Field definitions allow you to specify constraints, default values, and descriptions that can be used to generate better prompts for LLMs:
from pydantic import BaseModel, Field, field_validator
class ProductReview(BaseModel):
product_name: str = Field(..., description="Name of the product being reviewed")
rating: int = Field(..., ge=1, le=5, description="Rating from 1-5 stars")
sentiment: str = Field(..., pattern=r'^(positive|negative|neutral)$')
summary: str = Field(..., max_length=200)
@field_validator('summary')
@classmethod
def validate_summary(cls, v):
if len(v.split()) < 5:
raise ValueError('Summary must be at least 5 words')
return v
The Challenge of Raw LLM Outputs
Common Issues with Unstructured LLM Responses
Raw LLM outputs present several challenges for production applications:
- Inconsistent formatting - The same prompt might return data in different formats across multiple requests
- Parsing difficulties - Natural language responses require complex regex or string manipulation
- Type safety issues - No guarantee that numbers are actually numeric or dates are valid
- Error handling complexity - Difficult to handle edge cases and malformed responses gracefully
Consider this typical LLM response to a product analysis request:
The iPhone 14 Pro is excellent with a rating of 4.5/5.
Price: $999. Released in September 2022.
Key features include: improved camera, A16 chip, Dynamic Island.
Overall sentiment: very positive.
Parsing this reliably across different LLM responses becomes a maintenance nightmare without structured output.
LangChain’s Modern Structured Output Approach
The with_structured_output() Method (Recommended)
LangChain’s with_structured_output()
method is the easiest and most reliable way to get structured outputs from language models. It leverages the native structured output capabilities of modern LLMs, making it more robust than prompt-based parsing approaches.
Here’s how to use it:
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List
class ProductAnalysis(BaseModel):
name: str = Field(description="Product name")
price: float = Field(description="Price in USD")
rating: float = Field(description="Rating out of 5")
features: List[str] = Field(description="Key features list")
sentiment: str = Field(description="Overall sentiment (positive/negative/neutral)")
# Create structured LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.with_structured_output(ProductAnalysis)
# Direct invocation - no prompt engineering needed!
result = structured_llm.invoke("iPhone 14 Pro - $999, excellent camera, A16 chip, 4.5 stars")
print(result) # Returns ProductAnalysis object directly
print(result.name) # "iPhone 14 Pro"
print(result.price) # 999.0
Key Advantages of with_structured_output()
- Native Model Integration: Uses the model’s built-in function calling capabilities
- No Prompt Engineering: No need to craft complex format instructions
- Built-in Validation: Automatic Pydantic validation and error handling
- Higher Reliability: More consistent results than text-based parsing
- Streaming Support: Supports streaming for dictionary-based outputs
Advanced Usage with Error Handling
For production applications, you can handle parsing errors gracefully:
from langchain_openai import ChatOpenAI
from pydantic import ValidationError
class ContactInfo(BaseModel):
name: str = Field(description="Full name of the person")
email: str = Field(description="Email address")
phone: str = Field(description="Phone number", default=None)
company: str = Field(description="Company name", default=None)
def extract_contact_info(text: str) -> ContactInfo | None:
llm = ChatOpenAI(model="gpt-4o-mini")
structured_llm = llm.with_structured_output(ContactInfo)
try:
result = structured_llm.invoke(f"Extract contact information: {text}")
return result
except ValidationError as e:
print(f"Validation error: {e}")
return None
except Exception as e:
print(f"Extraction error: {e}")
return None
# Usage
text = "Contact John Smith at john.smith@company.com or call (555) 123-4567. He works at Tech Solutions Inc."
contact = extract_contact_info(text)
if contact:
print(f"Name: {contact.name}, Email: {contact.email}")
Alternative Approach: PydanticOutputParser
When to Use PydanticOutputParser
While with_structured_output()
is recommended for most use cases, PydanticOutputParser
is still useful for:
- Older Models: Models that don’t support function calling
- Custom Prompting: When you need fine-grained control over prompt engineering
- Debugging: When you want to see the raw model output before parsing
- Legacy Systems: Maintaining existing implementations
Implementing PydanticOutputParser
Here’s how to set up a structured output pipeline using the traditional approach:
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List
class ProductAnalysis(BaseModel):
name: str = Field(description="Product name")
price: float = Field(description="Price in USD")
rating: float = Field(description="Rating out of 5")
features: List[str] = Field(description="Key features list")
sentiment: str = Field(description="Overall sentiment (positive/negative/neutral)")
# Set up the parser
parser = PydanticOutputParser(pydantic_object=ProductAnalysis)
# Create prompt template
prompt = PromptTemplate(
template="Analyze the following product description and extract structured information.\n{format_instructions}\n{query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()}
)
# Initialize LLM and create chain
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | parser
# Parse the output
try:
result = chain.invoke({"query": "iPhone 14 Pro - $999, excellent camera, A16 chip, 4.5 stars"})
print(result)
except Exception as e:
print(f"Parsing error: {e}")
Choosing Between the Approaches
Feature | with_structured_output() | PydanticOutputParser |
---|---|---|
Ease of Use | ✅ Very simple | ⚠️ Requires prompt engineering |
Reliability | ✅ High (native model support) | ⚠️ Depends on prompt quality |
Model Requirements | ⚠️ Needs function calling support | ✅ Works with any model |
Customization | ⚠️ Limited prompt control | ✅ Full prompt control |
Performance | ✅ Generally faster | ⚠️ May require retries |
Recommended For | Most production use cases | Legacy systems, debugging |
Migration Path
If you’re using PydanticOutputParser
, migrating to with_structured_output()
is straightforward:
# Old approach
parser = PydanticOutputParser(pydantic_object=MyModel)
prompt = PromptTemplate(
template="Extract data: {format_instructions}\n{text}",
input_variables=["text"],
partial_variables={"format_instructions": parser.get_format_instructions()}
)
chain = prompt | llm | parser
result = chain.invoke({"text": input_text})
# New approach
structured_llm = llm.with_structured_output(MyModel)
result = structured_llm.invoke(input_text)
Designing Effective Pydantic Models for LLM Output
Best Practices for Schema Design
Start with clear, descriptive field names and descriptions:
class EmailClassification(BaseModel):
subject: str = Field(..., description="Email subject line")
category: str = Field(..., description="Category: urgent, normal, spam, or promotional")
priority_score: int = Field(..., ge=1, le=10, description="Priority from 1 (low) to 10 (high)")
action_required: bool = Field(..., description="Whether immediate action is required")
key_topics: List[str] = Field(default_factory=list, description="Main topics discussed")
estimated_response_time: Optional[str] = Field(None, description="Suggested response timeframe")
Use appropriate field types and constraints:
from enum import Enum
from decimal import Decimal
class SentimentType(str, Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class FinancialAnalysis(BaseModel):
company_name: str = Field(..., min_length=1, max_length=100)
stock_price: Decimal = Field(..., gt=0)
sentiment: SentimentType
confidence_score: float = Field(..., ge=0.0, le=1.0)
risk_factors: List[str] = Field(default_factory=list, max_length=10)
Handling Complex Nested Structures
For complex data, use nested Pydantic models:
class ContactInfo(BaseModel):
email: str = Field(..., pattern=r'^[^@]+@[^@]+\.[^@]+$')
phone: Optional[str] = Field(None, pattern=r'^\+?[\d\s\-\(\)]{10,}$')
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str = Field(..., pattern=r'^\d{5}(-\d{4})?$')
class CustomerProfile(BaseModel):
name: str = Field(..., min_length=2)
contact: ContactInfo
address: Address
account_type: str = Field(..., pattern=r'^(premium|standard|basic)$')
signup_date: datetime
total_purchases: Decimal = Field(default=Decimal('0.00'), ge=0)
Practical Implementation Examples
Example 1: Simple Data Extraction (Modern Approach)
Extract contact information using with_structured_output()
:
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Optional
class ContactExtraction(BaseModel):
name: Optional[str] = Field(None, description="Person's full name")
email: Optional[str] = Field(None, description="Email address")
phone: Optional[str] = Field(None, description="Phone number")
company: Optional[str] = Field(None, description="Company name")
def extract_contact_info(text: str) -> ContactExtraction:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.with_structured_output(ContactExtraction)
prompt = f"""Extract contact information from the following text.
If any information is not present, use null.
Text: {text}
"""
return structured_llm.invoke(prompt)
# Usage
text = "Contact John Smith at john.smith@company.com or call (555) 123-4567. He works at Tech Solutions Inc."
result = extract_contact_info(text)
print(result.model_dump()) # Using model_dump() instead of deprecated dict()
print(f"Name: {result.name}, Company: {result.company}")
Example 1b: Using PydanticOutputParser (Alternative)
For comparison, here’s the same example using the traditional approach:
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
def extract_contact_info_traditional(text: str) -> ContactExtraction:
parser = PydanticOutputParser(pydantic_object=ContactExtraction)
prompt = PromptTemplate(
template="""Extract contact information from the following text.
If any information is not present, use null.
{format_instructions}
Text: {text}
""",
input_variables=["text"],
partial_variables={"format_instructions": parser.get_format_instructions()}
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | parser
return chain.invoke({"text": text})
# Usage
text = "Contact John Smith at john.smith@company.com or call (555) 123-4567. He works at Tech Solutions Inc."
result = extract_contact_info_traditional(text)
print(result.model_dump())
Example 2: Complex Document Analysis
Analyze business documents with multiple data points using the modern approach:
from typing import Dict, Union, List
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
class DocumentAnalysis(BaseModel):
document_type: str = Field(..., description="Type of document (contract, invoice, report, etc.)")
key_dates: List[str] = Field(default_factory=list, description="Important dates mentioned")
financial_figures: List[Dict[str, Union[str, float]]] = Field(
default_factory=list,
description="Financial amounts with descriptions"
)
action_items: List[str] = Field(default_factory=list, description="Required actions or next steps")
parties_involved: List[str] = Field(default_factory=list, description="People or organizations mentioned")
risk_level: str = Field(..., pattern=r'^(low|medium|high)$', description="Overall risk assessment")
summary: str = Field(..., max_length=500, description="Brief summary of document content")
def analyze_document(document_text: str) -> DocumentAnalysis:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=2000)
structured_llm = llm.with_structured_output(DocumentAnalysis)
prompt = f"""Analyze this document thoroughly and extract structured information.
Pay attention to dates, financial information, and action items.
Document:
{document_text}
"""
return structured_llm.invoke(prompt)
# Usage example
sample_document = """
CONSULTING AGREEMENT
Date: March 15, 2024
Parties: TechCorp Inc. and DataSolutions LLC
Amount: $50,000 for 6-month engagement
Payment Terms: Net 30 days
Deliverables: Complete data migration by June 30, 2024
Risk: Medium - dependent on client data quality
Next Steps: Schedule kickoff meeting, finalize technical requirements
"""
analysis = analyze_document(sample_document)
print(f"Document Type: {analysis.document_type}")
print(f"Risk Level: {analysis.risk_level}")
print(f"Financial Figures: {analysis.financial_figures}")
print(f"Action Items: {analysis.action_items}")
Example 3: Multi-Step Reasoning with Structured Output
Build a system that performs analysis in structured steps:
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List
class ReasoningStep(BaseModel):
step_number: int = Field(..., description="Sequential step number")
description: str = Field(..., description="What this step accomplishes")
input_data: str = Field(..., description="Data being analyzed in this step")
reasoning: str = Field(..., description="Logical reasoning applied")
output: str = Field(..., description="Result of this step")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence in this step's conclusion")
class StructuredAnalysis(BaseModel):
problem_statement: str = Field(..., description="Clear statement of the problem being solved")
reasoning_steps: List[ReasoningStep] = Field(..., description="Sequential reasoning steps")
final_conclusion: str = Field(..., description="Final answer or recommendation")
overall_confidence: float = Field(..., ge=0.0, le=1.0, description="Overall confidence in conclusion")
assumptions: List[str] = Field(default_factory=list, description="Key assumptions made")
limitations: List[str] = Field(default_factory=list, description="Limitations of this analysis")
def perform_structured_analysis(problem: str) -> StructuredAnalysis:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=3000)
structured_llm = llm.with_structured_output(StructuredAnalysis)
prompt = f"""Analyze the following problem using structured reasoning.
Break down your analysis into clear steps, showing your reasoning process.
Problem: {problem}
"""
return structured_llm.invoke(prompt)
# Usage example
problem = "A company's sales dropped 25% last quarter. Analyze potential causes and recommend solutions."
analysis = perform_structured_analysis(problem)
print(f"Problem: {analysis.problem_statement}")
print(f"Steps: {len(analysis.reasoning_steps)}")
print(f"Conclusion: {analysis.final_conclusion}")
print(f"Confidence: {analysis.overall_confidence}")
Advanced Techniques and Error Handling
Custom Validators and Post-Processing
Implement custom validation logic for domain-specific requirements:
from pydantic import field_validator, model_validator
from typing import Dict, Any
class SalesAnalysis(BaseModel):
quarter: str = Field(..., pattern=r'^Q[1-4] \d{4}$')
revenue: float = Field(..., gt=0)
growth_rate: float = Field(..., description="Growth rate as decimal (0.1 = 10%)")
top_products: List[str] = Field(..., min_length=1, max_length=5)
@field_validator('growth_rate')
@classmethod
def validate_growth_rate(cls, v):
if v < -1.0 or v > 10.0: # -100% to 1000% seems reasonable
raise ValueError('Growth rate seems unrealistic')
return v
@model_validator(mode='after')
def validate_consistency(self):
quarter = self.quarter
revenue = self.revenue
if quarter and quarter.startswith('Q1') and revenue > 1000000:
# Custom business logic validation
pass
return self
Robust Error Handling Strategies
Implement comprehensive error handling for production systems using the modern approach:
from langchain_openai import ChatOpenAI
from pydantic import ValidationError, BaseModel
import logging
from typing import Optional, Tuple, Type
class RobustStructuredExtractor:
def __init__(self, pydantic_model: Type[BaseModel], llm_model: str = "gpt-4o-mini", max_retries: int = 3):
self.pydantic_model = pydantic_model
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.max_retries = max_retries
self.logger = logging.getLogger(__name__)
def extract(self, text: str, custom_prompt: Optional[str] = None) -> Tuple[Optional[BaseModel], bool, Optional[str]]:
"""
Extract structured data with retry logic.
Returns (result, success, error_message)
"""
structured_llm = self.llm.with_structured_output(self.pydantic_model)
prompt = custom_prompt or f"Extract structured information from: {text}"
for attempt in range(self.max_retries):
try:
result = structured_llm.invoke(prompt)
return result, True, None
except ValidationError as e:
self.logger.warning(f"Validation error on attempt {attempt + 1}: {str(e)}")
# Try with more specific instructions
prompt = f"""Extract structured information from the following text.
Be precise with data types and ensure all required fields are provided.
If information is missing, use appropriate default values.
Text: {text}
"""
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == self.max_retries - 1:
return None, False, str(e)
return None, False, "Max retries exceeded"
def extract_with_fallback(self, text: str, fallback_model: Optional[Type[BaseModel]] = None) -> Tuple[Optional[BaseModel], bool, str]:
"""
Extract with fallback to simpler model if main extraction fails.
"""
# Try primary model first
result, success, error = self.extract(text)
if success:
return result, True, "Primary extraction successful"
# Try fallback model if provided
if fallback_model:
self.logger.info("Attempting extraction with fallback model")
fallback_extractor = RobustStructuredExtractor(fallback_model)
result, success, error = fallback_extractor.extract(text)
if success:
return result, True, "Fallback extraction successful"
return None, False, f"All extraction attempts failed: {error}"
# Usage example
class SimpleContact(BaseModel):
name: str = Field(description="Person's name")
email: str = Field(description="Email address")
class DetailedContact(BaseModel):
name: str = Field(description="Person's full name")
email: str = Field(description="Email address")
phone: Optional[str] = Field(None, description="Phone number")
company: Optional[str] = Field(None, description="Company name")
title: Optional[str] = Field(None, description="Job title")
# Setup extractor
extractor = RobustStructuredExtractor(DetailedContact)
# Extract with fallback
text = "John Smith works at TechCorp, reach him at john@techcorp.com"
result, success, message = extractor.extract_with_fallback(text, fallback_model=SimpleContact)
if success:
print(f"Extraction successful: {result}")
print(f"Method: {message}")
else:
print(f"Extraction failed: {message}")
Performance Optimization and Best Practices
Prompt Engineering for Better Structured Output
Design prompts that encourage consistent, parseable responses:
def create_structured_prompt(pydantic_model, additional_instructions=""):
parser = PydanticOutputParser(pydantic_object=pydantic_model)
base_template = """You are a data extraction expert. Your task is to analyze the given text and extract information in the exact JSON format specified.
CRITICAL REQUIREMENTS:
1. Output ONLY valid JSON that matches the schema
2. Use null for missing information, never leave fields empty
3. Ensure all required fields are present
4. Follow the data types exactly as specified
{format_instructions}
{additional_instructions}
Text to analyze: {text}
JSON Output:"""
return PromptTemplate(
template=base_template,
input_variables=["text"],
partial_variables={
"format_instructions": parser.get_format_instructions(),
"additional_instructions": additional_instructions
}
)
Caching and Performance Strategies
Implement caching for repeated operations:
from functools import lru_cache
import hashlib
import json
class CachedStructuredExtractor:
def __init__(self, pydantic_model, llm):
self.parser = PydanticOutputParser(pydantic_object=pydantic_model)
self.llm = llm
self._cache = {}
def _get_cache_key(self, text: str, prompt: str) -> str:
content = f"{text}||{prompt}"
return hashlib.md5(content.encode()).hexdigest()
def extract_with_cache(self, text: str, prompt_template: PromptTemplate):
cache_key = self._get_cache_key(text, prompt_template.template)
if cache_key in self._cache:
return self._cache[cache_key]
formatted_prompt = prompt_template.format_prompt(text=text)
output = self.llm.invoke(formatted_prompt.to_string())
result = self.parser.parse(output)
self._cache[cache_key] = result
return result
Real-World Use Cases and Applications
Use Case 1: Automated Invoice Processing
from decimal import Decimal
from typing import Dict, Union, List, Optional
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
class InvoiceData(BaseModel):
invoice_number: str = Field(..., description="Invoice number")
date: str = Field(..., description="Invoice date")
vendor_name: str = Field(..., description="Vendor/supplier name")
total_amount: Decimal = Field(..., gt=0, description="Total amount due")
line_items: List[Dict[str, Union[str, Decimal, int]]] = Field(
default_factory=list,
description="Individual line items with description, quantity, and price"
)
payment_terms: Optional[str] = Field(None, description="Payment terms if specified")
due_date: Optional[str] = Field(None, description="Payment due date")
class ModernInvoiceProcessor:
def __init__(self, llm_model: str = "gpt-4o-mini"):
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.structured_llm = self.llm.with_structured_output(InvoiceData)
def process_invoice(self, invoice_text: str) -> InvoiceData:
prompt = f"""Extract structured data from this invoice.
Be precise with numbers and ensure all amounts are decimal values.
For line items, include description, quantity, and unit price for each item.
Invoice Text:
{invoice_text}
"""
return self.structured_llm.invoke(prompt)
def process_invoice_with_validation(self, invoice_text: str) -> tuple[InvoiceData | None, bool, str]:
"""Process invoice with comprehensive validation."""
try:
result = self.process_invoice(invoice_text)
# Additional business logic validation
if result.total_amount <= 0:
return None, False, "Invalid total amount"
if not result.invoice_number.strip():
return None, False, "Missing invoice number"
return result, True, "Successfully processed"
except Exception as e:
return None, False, f"Processing failed: {str(e)}"
# Usage example
processor = ModernInvoiceProcessor()
sample_invoice = """
INVOICE #INV-2024-001
Date: March 20, 2024
From: Office Supplies Inc.
To: TechCorp
Line Items:
- Laptop computers (Qty: 2, Price: $1,200 each)
- Office chairs (Qty: 5, Price: $150 each)
- Software licenses (Qty: 10, Price: $50 each)
Subtotal: $3,650
Tax: $292
Total: $3,942
Terms: Net 30
Due Date: April 19, 2024
"""
result, success, message = processor.process_invoice_with_validation(sample_invoice)
if success:
print(f"Invoice processed successfully!")
print(f"Invoice #: {result.invoice_number}")
print(f"Vendor: {result.vendor_name}")
print(f"Total: ${result.total_amount}")
print(f"Line Items: {len(result.line_items)}")
else:
print(f"Processing failed: {message}")
Use Case 2: Content Moderation System
from typing import List
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
class ContentModerationResult(BaseModel):
content_type: str = Field(..., description="Type of content (text, image_caption, etc.)")
toxicity_score: float = Field(..., ge=0.0, le=1.0, description="Toxicity score from 0-1")
categories_detected: List[str] = Field(
default_factory=list,
description="Harmful categories detected (hate_speech, harassment, etc.)"
)
severity: str = Field(..., pattern=r'^(low|medium|high|critical)$')
recommended_action: str = Field(..., description="Recommended moderation action")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence in assessment")
explanation: str = Field(..., description="Brief explanation of the decision")
requires_human_review: bool = Field(..., description="Whether human review is needed")
class ContentModerator:
def __init__(self, llm_model: str = "gpt-4o-mini"):
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.structured_llm = self.llm.with_structured_output(ContentModerationResult)
def moderate_content(self, content: str, content_type: str = "text") -> ContentModerationResult:
prompt = f"""Analyze this {content_type} content for harmful elements and provide a moderation assessment.
Consider toxicity, harassment, hate speech, spam, and other harmful content.
Be objective and consistent in your evaluation.
Provide scores and categories based on the content's actual harmfulness.
If content appears safe, use low toxicity scores and appropriate recommendations.
Content to moderate: {content}
"""
return self.structured_llm.invoke(prompt)
def batch_moderate(self, content_list: List[str]) -> List[ContentModerationResult]:
"""Moderate multiple pieces of content."""
results = []
for content in content_list:
try:
result = self.moderate_content(content)
results.append(result)
except Exception as e:
# Create a default result for failed moderation
default_result = ContentModerationResult(
content_type="text",
toxicity_score=0.5,
categories_detected=["moderation_error"],
severity="medium",
recommended_action="manual_review",
confidence=0.0,
explanation=f"Moderation failed: {str(e)}",
requires_human_review=True
)
results.append(default_result)
return results
# Usage example
moderator = ContentModerator()
# Test with various content types
test_content = [
"This is a normal message about today's weather.",
"I love this new product! Highly recommended.",
"Check out this amazing offer - limited time only!"
]
results = moderator.batch_moderate(test_content)
for i, result in enumerate(results):
print(f"Content {i+1}:")
print(f" Toxicity Score: {result.toxicity_score}")
print(f" Severity: {result.severity}")
print(f" Action: {result.recommended_action}")
print(f" Human Review: {result.requires_human_review}")
print(f" Explanation: {result.explanation}")
print()
Testing and Quality Assurance
Testing Strategies for Pydantic-LLM Integration
import pytest
from unittest.mock import Mock, patch
class TestStructuredExtraction:
def setup_method(self):
self.mock_llm = Mock()
self.extractor = ContactExtraction
def test_valid_parsing(self):
# Test with valid JSON output
valid_json = '''{"name": "John Doe", "email": "john@example.com", "phone": "555-1234", "company": "Acme Corp"}'''
self.mock_llm.return_value = valid_json
parser = PydanticOutputParser(pydantic_object=ContactExtraction)
result = parser.parse(valid_json)
assert result.name == "John Doe"
assert result.email == "john@example.com"
def test_invalid_parsing(self):
# Test error handling
invalid_json = '{"name": "John", "email": "invalid-email"}'
parser = PydanticOutputParser(pydantic_object=ContactExtraction)
with pytest.raises(ValueError):
parser.parse(invalid_json)
def test_missing_optional_fields(self):
# Test handling of optional fields
minimal_json = '{"name": "John Doe"}'
parser = PydanticOutputParser(pydantic_object=ContactExtraction)
result = parser.parse(minimal_json)
assert result.name == "John Doe"
assert result.email is None
assert result.phone is None
Monitoring and Quality Metrics
from dataclasses import dataclass
from typing import Dict, List
import time
@dataclass
class ExtractionMetrics:
total_requests: int = 0
successful_extractions: int = 0
parsing_errors: int = 0
validation_errors: int = 0
average_response_time: float = 0.0
error_patterns: Dict[str, int] = None
def __post_init__(self):
if self.error_patterns is None:
self.error_patterns = {}
class MonitoredStructuredExtractor:
def __init__(self, pydantic_model, llm):
self.parser = PydanticOutputParser(pydantic_object=pydantic_model)
self.llm = llm
self.metrics = ExtractionMetrics()
def extract_with_monitoring(self, text: str, prompt_template: PromptTemplate):
start_time = time.time()
self.metrics.total_requests += 1
try:
formatted_prompt = prompt_template.format_prompt(text=text)
output = self.llm.invoke(formatted_prompt.to_string())
result = self.parser.parse(output)
self.metrics.successful_extractions += 1
return result, None
except ValueError as e:
self.metrics.validation_errors += 1
error_type = type(e).__name__
self.metrics.error_patterns[error_type] = self.metrics.error_patterns.get(error_type, 0) + 1
return None, str(e)
except Exception as e:
self.metrics.parsing_errors += 1
error_type = type(e).__name__
self.metrics.error_patterns[error_type] = self.metrics.error_patterns.get(error_type, 0) + 1
return None, str(e)
finally:
response_time = time.time() - start_time
# Update running average
total_time = self.metrics.average_response_time * (self.metrics.total_requests - 1) + response_time
self.metrics.average_response_time = total_time / self.metrics.total_requests
def get_success_rate(self) -> float:
if self.metrics.total_requests == 0:
return 0.0
return self.metrics.successful_extractions / self.metrics.total_requests
Advanced Integration Patterns
Combining with LangChain Agents
from langchain.agents import Tool, AgentExecutor
from langchain.agents import create_react_agent
from langchain_core.agents import AgentAction, AgentFinish
class StructuredAnalysisTool:
def __init__(self, pydantic_model, llm):
self.parser = PydanticOutputParser(pydantic_object=pydantic_model)
self.llm = llm
def analyze(self, text: str) -> str:
prompt = PromptTemplate(
template="""Analyze the following text and extract structured information:
{format_instructions}
Text: {text}
""",
input_variables=["text"],
partial_variables={"format_instructions": self.parser.get_format_instructions()}
)
formatted_prompt = prompt.format_prompt(text=text)
output = self.llm.invoke(formatted_prompt.to_string())
result = self.parser.parse(output)
return result.json()
# Create tools for agent
analysis_tool = StructuredAnalysisTool(DocumentAnalysis, llm)
tools = [
Tool(
name="Document Analysis",
description="Analyze documents and extract structured information",
func=analysis_tool.analyze
)
]
Integration with Vector Databases
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class StructuredDocumentProcessor:
def __init__(self, pydantic_model, llm, embeddings):
self.parser = PydanticOutputParser(pydantic_object=pydantic_model)
self.llm = llm
self.embeddings = embeddings
self.vectorstore = Chroma(embedding_function=embeddings)
def process_and_store(self, document_text: str, document_id: str):
# Extract structured data
prompt = PromptTemplate(
template="""Extract key information from this document:
{format_instructions}
Document: {text}
""",
input_variables=["text"],
partial_variables={"format_instructions": self.parser.get_format_instructions()}
)
formatted_prompt = prompt.format_prompt(text=document_text)
output = self.llm.invoke(formatted_prompt.to_string())
structured_data = self.parser.parse(output)
# Store both original and structured data
self.vectorstore.add_texts(
texts=[document_text],
metadatas=[{
"document_id": document_id,
"structured_data": structured_data.json(),
"extraction_timestamp": datetime.now().isoformat()
}]
)
return structured_data
Future Considerations and Emerging Patterns
Function Calling Integration
Modern LLMs increasingly support function calling, which can be combined with Pydantic for even more reliable structured output:
import json
from typing import Any
def create_function_schema(pydantic_model) -> Dict[str, Any]:
"""Convert Pydantic model to OpenAI function schema"""
schema = pydantic_model.model_json_schema()
return {
"name": f"extract_{pydantic_model.__name__.lower()}",
"description": f"Extract {pydantic_model.__name__} data from text",
"parameters": {
"type": "object",
"properties": schema["properties"],
"required": schema.get("required", [])
}
}
class FunctionCallingExtractor:
def __init__(self, pydantic_model):
self.model = pydantic_model
self.function_schema = create_function_schema(pydantic_model)
def extract_with_function_calling(self, text: str, openai_client):
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Extract structured data from the provided text."},
{"role": "user", "content": text}
],
functions=[self.function_schema],
function_call={"name": self.function_schema["name"]}
)
function_call = response.choices[0].message.function_call
if function_call:
arguments = json.loads(function_call.arguments)
return self.model(**arguments)
raise ValueError("No function call returned")
Scaling for Production
For high-volume production environments, consider these patterns:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from typing import List, Any
class BatchStructuredProcessor:
def __init__(self, pydantic_model, llm, batch_size=10, max_workers=4):
self.parser = PydanticOutputParser(pydantic_object=pydantic_model)
self.llm = llm
self.batch_size = batch_size
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.queue = queue.Queue()
self.results = {}
async def process_batch(self, texts: List[str]) -> List[Any]:
"""Process multiple texts concurrently"""
tasks = []
for text in texts:
task = asyncio.create_task(self._process_single(text))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _process_single(self, text: str):
"""Process a single text item"""
loop = asyncio.get_event_loop()
def _extract():
# Your extraction logic here
prompt = PromptTemplate(
template="{format_instructions}\n\nText: {text}",
input_variables=["text"],
partial_variables={"format_instructions": self.parser.get_format_instructions()}
)
formatted_prompt = prompt.format_prompt(text=text)
output = self.llm.invoke(formatted_prompt.to_string())
return self.parser.parse(output)
return await loop.run_in_executor(self.executor, _extract)
Conclusion and Key Takeaways
Structured output from LLMs using Pydantic and LangChain transforms unreliable text generation into robust, production-ready data extraction systems. The modern with_structured_output()
method has revolutionized this approach, making it more accessible and reliable than ever.
Key Benefits of the Modern Approach
Native Integration - The with_structured_output()
method leverages models’ built-in function calling capabilities for maximum reliability.
Simplified Development - No complex prompt engineering required - just define your Pydantic model and invoke.
Type Safety and Validation - Pydantic ensures your data meets expected formats and constraints before it reaches your application logic.
Built-in Error Handling - Automatic validation and parsing error handling prevents runtime failures and provides clear feedback for debugging.
Performance - Native model support typically provides faster and more consistent results than text-based parsing.
When to Use Each Approach
Use with_structured_output()
for:
- Most production applications (recommended default)
- New projects requiring structured output
- Models that support function calling (GPT-4, GPT-3.5-turbo, etc.)
- Applications prioritizing reliability and simplicity
Use PydanticOutputParser
for:
- Legacy systems that can’t be easily migrated
- Older models without function calling support
- Cases requiring fine-grained prompt control
- Debugging and development scenarios
Getting Started Recommendations
- Start with
with_structured_output()
- Use the modern approach unless you have specific requirements for the traditional method - Design Clear Schemas - Use descriptive field names and comprehensive descriptions
- Implement Error Handling - Always wrap extractions in try-catch blocks for production use
- Test Thoroughly - Implement comprehensive testing for edge cases and validation scenarios
- Monitor Performance - Track success rates and response times in production
- Plan for Scale - Consider batch processing and caching for high-volume applications
Migration Strategy
If you’re currently using PydanticOutputParser
, consider migrating to with_structured_output()
:
- Assess Compatibility - Ensure your LLM provider supports function calling
- Update Code Gradually - Migrate one component at a time
- Test Thoroughly - Compare outputs between old and new approaches
- Maintain Fallbacks - Keep legacy code available during transition period
The combination of Pydantic’s robust data validation with LangChain’s modern structured output capabilities provides the most reliable foundation for building production LLM applications. The with_structured_output()
method represents a significant step forward in making structured output both more accessible to developers and more reliable in production environments.
For more comprehensive information on advanced techniques and best practices, explore the official documentation for both Pydantic and LangChain, particularly the structured output guide. Consider implementing monitoring and testing strategies appropriate for your specific use case and scale requirements.