Skip to main content

Implementing Workers

This guide describes how to implement a Stepflow worker in any programming language. Workers are standalone processes that pull tasks from the orchestrator, execute components, and report results back.

While the Python SDK provides a high-level API that handles protocol details automatically, you can implement workers in any language by following this specification.

What is a Worker?

A worker is a process that hosts one or more workflow components and executes them on behalf of the orchestrator. Workers are registered through routing configuration, which maps component paths to named queues. Workers:

  • Pull task assignments from a named queue
  • Execute components and report results via CompleteTask
  • Send heartbeats to signal liveness during execution
  • Can make callbacks to the orchestrator (e.g., sub-run submission)
  • Run as independent processes, enabling language flexibility and fault isolation

See the Protocol Overview for how workers fit into the overall architecture, and the Task Lifecycle for the detailed task flow.

Requirements

Protocol Requirements

RequirementDescription
Queue ConnectionWorkers MUST connect to the configured queue backend (gRPC PullTasks or NATS consumer)
Task CompletionWorkers MUST report results via CompleteTask on OrchestratorService
HeartbeatsWorkers MUST send TaskHeartbeat before and during execution
Component ListingWorkers MUST handle list_components task assignments
Component ExecutionWorkers MUST handle execute task assignments
Error ReportingWorkers MUST report failures with appropriate TaskErrorCode
Bidirectional CallsWorkers MAY call SubmitRun and GetRun during execution

Observability Requirements

RequirementDescription
OTLP TracingWorkers SHOULD support OpenTelemetry for distributed tracing
Structured LoggingWorkers SHOULD use structured JSON logging with diagnostic context
Context PropagationWorkers SHOULD propagate ObservabilityContext in bidirectional calls

Worker Configuration

Workers discover the orchestrator and queue configuration through environment variables. When launched as a subprocess, the orchestrator sets these automatically:

VariableDescription
STEPFLOW_TRANSPORTQueue backend: grpc or nats
STEPFLOW_QUEUE_NAMEQueue name to pull tasks from
STEPFLOW_TASKS_URLOrchestrator gRPC address (for gRPC transport)
STEPFLOW_BLOB_URLBlobService gRPC address

For remote workers (e.g., in Kubernetes), configure these in the deployment manifest.

Protocol Methods

Component Listing (MUST)

When the worker receives a list_components task, it MUST enumerate all registered components and report them via CompleteTask with a ListComponentsResult.

Component Execution (MUST)

When the worker receives an execute task, it MUST:

  1. Send a TaskHeartbeat immediately (transitions task to EXECUTING)
  2. Execute the component with the provided input
  3. Send periodic heartbeats during execution (resets crash-detection timer)
  4. Report the result via CompleteTask (success or failure)

Bidirectional Callbacks (MAY)

During execution, workers MAY call back to the orchestrator:

RPCDescription
SubmitRunSubmit a sub-workflow for execution
GetRunQuery run status and results

Workers use the orchestrator_service_url from TaskContext for callbacks. See Task Lifecycle — Bidirectional Callbacks for details.

Blob Storage

Workers access blob storage via the BlobService gRPC API. See Blob Storage for details.

Error Handling

Workers MUST report failures via CompleteTask with an appropriate TaskErrorCode:

CodeWhen to Use
COMPONENT_FAILEDComponent executed but returned a business-logic failure
INVALID_INPUTInput validation failure
COMPONENT_NOT_FOUNDRequested component doesn't exist on this worker
RESOURCE_UNAVAILABLEExternal resource unavailable
WORKER_ERRORUnexpected worker/SDK error

See Error Handling for the complete reference and retry behavior.

Observability

Each task carries an ObservabilityContext with OpenTelemetry trace/span IDs. Workers should extract the parent context, create child spans for component execution, and propagate context in bidirectional calls.

Workers SHOULD support these environment variables:

VariableDescriptionDefault
STEPFLOW_OTLP_ENDPOINTOTLP collector endpointnone
STEPFLOW_SERVICE_NAMEService name for traces/logsstepflow-worker
STEPFLOW_TRACE_ENABLEDEnable tracingtrue if endpoint set
STEPFLOW_LOG_LEVELLog levelINFO
STEPFLOW_LOG_DESTINATIONWhere to log (stderr, file, otlp)otlp if endpoint set, else stderr

Implementation Checklist

Required (MUST)

  • Connect to queue backend (gRPC PullTasks or NATS consumer)
  • Handle list_components task assignments
  • Handle execute task assignments
  • Send TaskHeartbeat before execution starts
  • Send periodic heartbeats during execution
  • Report results via CompleteTask
  • Report failures with TaskError and TaskErrorCode
  • OpenTelemetry tracing with parent context extraction
  • Structured JSON logging with diagnostic context
  • OTLP export for traces and logs
  • Graceful shutdown handling
  • Progress reporting in heartbeats

Reference Implementations

See Also