Skip to main content

Custom Components

Custom components allow you to extend Stepflow with your own business logic, integrations, and data processing capabilities. This guide shows you how to create custom components using the Python SDK.

Quick Start

Installation

Install the Python SDK:

# Using uv (recommended)
uv add stepflow-py

# Using pip
pip install stepflow-py

For HTTP transport support:

# Using uv
uv add stepflow-py[http]

# Using pip
pip install stepflow-py[http]

Basic Component Server

Create a simple component server:

# my_server.py
from stepflow_py import StepflowServer
import msgspec

# Define input and output types
class GreetingInput(msgspec.Struct):
name: str
language: str = "en"

class GreetingOutput(msgspec.Struct):
message: str
language: str

# Create server instance
server = StepflowServer()

# Register a component
@server.component
def greet(input: GreetingInput) -> GreetingOutput:
greetings = {
"en": f"Hello, {input.name}!",
"es": f"¡Hola, {input.name}!",
"fr": f"Bonjour, {input.name}!",
}

message = greetings.get(input.language, greetings["en"])
return GreetingOutput(message=message, language=input.language)

if __name__ == "__main__":
server.run()

Configuration

Configure the component server in stepflow-config.yml:

plugins:
my_components:
type: stepflow
transport: stdio
command: python
args: ["my_server.py"]

routes:
"/my/{*component}":
- plugin: my_components

See the Configuration Guide for more details.

Using in Workflows

Reference your components in workflow YAML files:

schema: https://stepflow.org/schemas/v1/flow.json
input_schema:
type: object
properties:
user_name:
type: string

steps:
- id: greeting_step
component: /my/greet
input:
name:
$from: { workflow: input }
path: user_name
language: "es"

output:
greeting:
$from: { step: greeting_step }
path: message

See Steps for more details on using components in steps within a flow.

Component Development

Type Definitions

Use msgspec.Struct for type-safe input and output definitions:

import msgspec
from typing import Optional, List, Dict, Any

class ProcessingInput(msgspec.Struct):
# Required fields
data: List[Dict[str, Any]]
operation: str

# Optional fields with defaults
batch_size: int = 100
timeout: Optional[int] = None

# Complex nested types
config: Optional[Dict[str, Any]] = None

class ProcessingOutput(msgspec.Struct):
processed_count: int
results: List[Dict[str, Any]]
errors: List[str] = msgspec.field(default_factory=list)
metadata: Dict[str, Any] = msgspec.field(default_factory=dict)

Synchronous Components

Simple components that don't require I/O operations:

@server.component
def calculate_statistics(input: DataInput) -> StatsOutput:
"""Calculate basic statistics for a dataset."""
data = input.numbers

return StatsOutput(
count=len(data),
sum=sum(data),
mean=sum(data) / len(data) if data else 0,
min=min(data) if data else None,
max=max(data) if data else None
)

Asynchronous Components

Components that perform I/O operations, API calls, or other async work:

import asyncio
import httpx

@server.component
async def fetch_user_data(input: UserInput) -> UserOutput:
"""Fetch user data from external API."""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{input.user_id}")
response.raise_for_status()

user_data = response.json()
return UserOutput(
user_id=user_data["id"],
name=user_data["name"],
email=user_data["email"],
last_seen=user_data.get("last_seen")
)

Error Handling

Handle different types of errors appropriately:

@server.component
async def process_with_validation(input: ProcessInput) -> ProcessOutput:
"""Process data with comprehensive error handling."""
try:
# Business logic validation
if not input.data:
return ProcessOutput(
success=False,
error="No data provided",
error_type="validation_error"
)

# Process the data
result = await process_data(input.data)

return ProcessOutput(
success=True,
result=result,
processed_count=len(input.data)
)

except ValueError as e:
# Business logic error - workflow can handle this
return ProcessOutput(
success=False,
error=f"Invalid data format: {e}",
error_type="validation_error"
)

except Exception as e:
# System error - this will halt workflow execution
raise RuntimeError(f"Failed to process data: {e}")

Advanced Features with Context

Using StepflowContext

Components can access the Stepflow runtime through the StepflowContext parameter. This can be used to store and retrieve data using blob storage or execute sub-workflows.

Blob Storage Operations

Use blob storage for persisting data between workflow steps:

@server.component
async def data_aggregator(
input: AggregatorInput,
context: StepflowContext
) -> AggregatorOutput:
"""Aggregate data from multiple blob sources."""

aggregated_data = []

# Retrieve data from multiple blobs
for blob_id in input.source_blob_ids:
try:
blob_data = await context.get_blob(blob_id)
aggregated_data.extend(blob_data.get("items", []))
except Exception as e:
context.log(f"Failed to retrieve blob {blob_id}: {e}")

# Process aggregated data
processed = process_aggregated_data(aggregated_data)

# Store result
result_blob_id = await context.put_blob({
"aggregated_count": len(aggregated_data),
"processed_data": processed,
"source_blobs": input.source_blob_ids
})

return AggregatorOutput(
result_blob_id=result_blob_id,
total_items=len(aggregated_data)
)

Sub-workflow Execution

Execute other workflows from within components. This is particularly useful for orchestrating complex tasks involving multiple steps in parallel. For example, this creates a workflow_orchestrator component which runs sub-workflows. In this example, each workflow is awaited immediately, but these could use asyncio.gather for parallel execution.

@server.component
async def workflow_orchestrator(
input: OrchestratorInput,
context: StepflowContext
) -> OrchestratorOutput:
"""Execute sub-workflows based on input conditions."""

results = []

for task in input.tasks:
if task.type == "data_processing":
# Execute data processing workflow
result = await context.evaluate_flow_by_id(
flow_id=input.data_processing_flow_id,
input={"data": task.data, "config": task.config}
)
results.append({
"task_id": task.id,
"type": "data_processing",
"result": result
})

elif task.type == "ml_inference":
# Execute ML inference workflow
result = await context.evaluate_flow_by_id(
flow_id=input.ml_flow_id,
input={"model": task.model, "features": task.features}
)
results.append({
"task_id": task.id,
"type": "ml_inference",
"result": result
})

return OrchestratorOutput(
completed_tasks=len(results),
results=results
)