LangChain Integration
LangChain Integration Example
This example demonstrates comprehensive integration of LangChain with Stepflow, showcasing three practical approaches for using LangChain runnables as Stepflow components.
Overview
The LangChain integration provides three different ways to use LangChain runnables:
- Decorated Runnable: Use
@server.langchain_component
decorator to register factories as Stepflow components - Named Runnable: Execute runnables directly by import path using
/invoke_named
component - UDF: Store Python code in blobs that creates LangChain runnables, executed via
/udf
Prerequisites
- Rust (for building Stepflow)
- Python 3.11+ with uv
- LangChain core library
Setup
-
Install dependencies:
cd ../../sdks/python
uv add --group dev langchain-core -
Build Stepflow:
cd ../../stepflow-rs
cargo build
Running the Examples
Each approach has its own dedicated workflow file for focused demonstration:
Decorated Runnable
# Demonstrates @server.langchain_component decorator pattern
../../stepflow-rs/target/debug/stepflow run \
--flow=decorated_runnable.yaml \
--input=input.json \
--config=stepflow-config.yml
Named Runnable
# Demonstrates direct invocation with import paths via /invoke_named
../../stepflow-rs/target/debug/stepflow run \
--flow=named_runnable.yaml \
--input=input.json \
--config=stepflow-config.yml
UDF
# Demonstrates self-contained Python code creating runnables via /udf
../../stepflow-rs/target/debug/stepflow run \
--flow=udf.yaml \
--input=input.json \
--config=stepflow-config.yml
Three Integration Approaches Explained
Decorated Runnable (@server.langchain_component
)
Components are registered directly with the server using decorators:
@server.langchain_component(name="text_analyzer")
def create_text_analyzer():
"""Analyze text and return various metrics."""
def analyze_text(text_input):
text = text_input["text"]
return {
"word_count": len(text.split()),
"char_count": len(text),
# ... more analysis
}
return RunnableLambda(analyze_text)
Usage in workflow:
- id: analyze_text
component: /text_analyzer
input:
input:
text: "Your text here"
execution_mode: invoke
Components available:
/text_analyzer
: Text analysis (word count, char count, etc.)/sentiment_classifier
: Sentiment analysis using keyword matching/math_operations
: Parallel mathematical operations
Named Runnable (via /invoke_named
with Import Paths)
Runnables are imported from Python modules and executed directly using standard import path syntax:
# Create runnables in a module (example_runnables.py)
from langchain_core.runnables import RunnableLambda
def process_text(data):
text = data["text"]
words = text.split()
return {
"processed_text": " ".join(word.capitalize() for word in words),
"word_count": len(words),
"original_length": len(text)
}
# Make the runnable available for import
text_processor = RunnableLambda(process_text)
# Use the SDK-provided invoke_named component with caching
from stepflow_py import create_invoke_named_component
create_invoke_named_component(server)
Usage in workflow:
# Directly invoke runnable from import path in a single step
- id: process_text
component: /invoke_named
input:
import_path: example_runnables.text_processor
input:
text: "Your text here"
Import path examples:
example_runnables.text_processor
: Module.attribute syntaxmypackage.submodule:my_runnable
: Module:attribute syntax (alternative)utils.text.capitalizer
: Nested module paths
Benefits:
- Single-step execution (no serialization/deserialization overhead)
- Automatic caching by import path (similar to UDF blob IDs)
- Much simpler workflow structure
- Direct execution without intermediate steps
- Provided by core SDK - no need to reimplement in each project
- Cache can be cleared with
clear_import_cache()
if needed
SDK Functions:
from stepflow_py import (
create_invoke_named_component, # Add /invoke_named to your server
invoke_named_runnable, # Core async function for direct use
get_runnable_from_import_path, # Import runnables with caching
clear_import_cache # Clear the import cache
)
# Use in your own components
result = await invoke_named_runnable(
import_path=input.import_path,
input_data=input.input,
config=input.config,
context=context,
use_cache=True,
)
UDF (Python code creating runnables)
Python code is created as a blob and executed via the /udf
component. The workflow demonstrates a completely self-contained approach:
# Step 1: Create blob containing Python code
- id: create_text_processor_blob
component: /builtin/create_blob
input:
data:
code: |
from langchain_core.runnables import RunnableLambda
def process_text(data):
text = data["text"]
# Custom processing logic here
return {
"processed_text": text.upper(),
"word_count": len(text.split()),
"processed_by": "user_udf"
}
# UDF must assign the runnable to 'result'
result = RunnableLambda(process_text)
# Step 2: Execute the UDF code
- id: execute_udf
component: /udf
input:
blob_id:
$from: {step: create_text_processor_blob}
path: blob_id
input:
text: "Your text here"
Flexibility:
- Users can create any LangChain runnable they want
- No restrictions on functionality - full Python/LangChain capabilities
- Completely self-contained - no external blob_id required
- Code is created dynamically within the workflow
Sample Inputs
Each approach has different input requirements:
Decorated Runnable
{
"text": "This is a fantastic example of registered LangChain components!"
}
Named Runnable
{
"text": "Process this text",
"numbers": [1, 2, 3, 4, 5],
"template": "Hello {name}!",
"values": {"name": "World"}
}
UDF
{
"text": "This demonstrates self-contained user-defined LangChain runnables!"
}
Architecture
Configuration
The stepflow-config.yml
configures:
langchain
plugin: Python server running the LangChain components- Routing: Components first try the langchain plugin, then fall back to builtin
LangChain Server
The langchain_server.py
file demonstrates:
- All three integration approaches in one server
- Component registration using different patterns
- Integration with Stepflow's bidirectional communication
- Type-safe component definitions with msgspec
Key Features
- Multiple Integration Patterns: Three practical ways to achieve the same goals
- Async Compatibility: Both Stepflow and LangChain use async/await patterns
- Type Safety: msgspec integration with JSON Schema support
- Bidirectional Communication: Components access Stepflow runtime via
StepflowContext
- Flexible Execution: Support for different runnable creation and execution patterns
- Schema Generation: Automatic schema extraction where applicable
When to Use Each Approach
Decorated Runnable
- Best for: Stable, reusable components with fixed logic
- Pros: Simple to implement, automatic registration, good performance
- Cons: Less flexible, requires server restart for changes
Named Runnable
- Best for: Reusable modules, standard library patterns, team-shared components
- Pros: Single-step execution, uses standard Python import syntax, promotes modular design, cacheable by import path, no serialization overhead
- Cons: Requires importable modules, path must be available at runtime
UDF
- Best for: Dynamic code generation, user-provided logic, maximum flexibility
- Pros: Complete flexibility, can generate code dynamically, supports user input
- Cons: Security considerations, compilation overhead, more complex
Files
Core Files
langchain_server.py
: Python server demonstrating all three approachesexample_runnables.py
: Example module with importable runnablesstepflow-config.yml
: Stepflow configurationinput.json
: Sample input dataREADME.md
: This documentation
Workflow Files (One per Approach)
decorated_runnable.yaml
: Demonstrates registered components (@server.langchain_component)named_runnable.yaml
: Demonstrates direct import path invocation via /invoke_namedudf.yaml
: Demonstrates self-contained UDF with Python code creating runnables
Troubleshooting
Common Issues
- Missing LangChain: Install with
uv add langchain-core
- Import errors: Ensure Stepflow Python SDK is properly installed
- Serialization issues: Some complex runnables may not serialize properly
- Type annotation issues: Check that all msgspec structs are properly defined
Performance Considerations
- Decorated runnable has the best performance (pre-registered, no overhead)
- Named runnable has excellent performance (cached imports, no serialization overhead)
- UDF has compilation overhead but is very flexible
Next Steps
To extend this example:
- Add more LangChain integrations: Try different runnable types (chains, agents, etc.)
- Custom components: Create domain-specific business logic components
- Streaming support: Implement streaming for long-running operations
- Error handling: Add comprehensive error handling and recovery
- Performance optimization: Leverage import caching, optimize hot paths