This guide covers how to implement task workers in the Conductor Rust SDK. Workers are the components that execute tasks in your workflows.
- Quick Start
- Worker Types
- TaskHandler
- Worker Configuration
- Error Handling
- Long-Running Tasks
- Metrics and Events
- Best Practices
use conductor::{Configuration, TaskHandler, FnWorker, WorkerOutput};
use conductor::models::Task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Configuration::new("http://localhost:8080/api");
// Create a simple worker
let worker = FnWorker::new("greet_task", |task: Task| async move {
let name: String = task.get_input("name").unwrap_or_else(|| "World".to_string());
Ok(WorkerOutput::completed_with_result(format!("Hello, {}!", name)))
});
// Start the handler
let mut handler = TaskHandler::new(config)?;
handler.add_worker(worker);
handler.start().await?;
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
handler.stop().await?;
Ok(())
}The simplest way to create a worker using a closure:
use conductor::{FnWorker, WorkerOutput};
use conductor::models::Task;
let worker = FnWorker::new("my_task", |task: Task| async move {
// Access task inputs
let value: i32 = task.get_input("value").unwrap_or(0);
// Process the task
let result = value * 2;
// Return the result
Ok(WorkerOutput::completed_with_result(result))
})
.with_thread_count(10) // Concurrent executions
.with_poll_interval_millis(100) // Polling frequency
.with_domain("production"); // Task routing domainFor high-throughput scenarios, use FnWorkerArc which avoids cloning the task:
use conductor::{FnWorkerArc, WorkerOutput};
use conductor::models::Task;
use std::sync::Arc;
let worker = FnWorkerArc::new("high_throughput_task", |task: Arc<Task>| async move {
// Task is passed as Arc, no cloning needed
let data: String = task.get_input("data").unwrap_or_default();
// Process efficiently
Ok(WorkerOutput::completed_with_result(data.len()))
})
.with_thread_count(50)
.with_poll_interval_millis(10);For complex workers, implement the Worker trait:
use async_trait::async_trait;
use conductor::worker::{Worker, WorkerOutput};
use conductor::models::Task;
use conductor::error::Result;
struct MyWorker {
db_pool: DatabasePool,
}
#[async_trait]
impl Worker for MyWorker {
fn task_definition_name(&self) -> &str {
"database_task"
}
async fn execute(&self, task: &Task) -> Result<WorkerOutput> {
let query: String = task.get_input("query").unwrap_or_default();
// Use shared resources
let result = self.db_pool.execute(&query).await?;
Ok(WorkerOutput::completed_with_result(result))
}
fn thread_count(&self) -> usize {
20
}
fn poll_interval_millis(&self) -> u64 {
50
}
fn domain(&self) -> Option<&str> {
Some("database")
}
}The TaskHandler manages the lifecycle of workers:
use conductor::{Configuration, TaskHandler, FnWorker};
let config = Configuration::new("http://localhost:8080/api");
let mut handler = TaskHandler::new(config)?;
// Add workers
handler.add_worker(worker1);
handler.add_worker(worker2);
// Start all workers
handler.start().await?;
// ... application runs ...
// Graceful shutdown (waits for in-flight tasks)
handler.stop().await?;use conductor::{TaskHandler, Configuration, MetricsSettings};
let handler = TaskHandler::builder(config)
.worker(worker1)
.worker(worker2)
.metrics(MetricsSettings::default())
.build()?;// Pause a specific worker
handler.pause_worker("my_task");
// Resume it
handler.resume_worker("my_task");
// Pause all workers
handler.pause_all();
// Resume all
handler.resume_all();The TaskHandler provides access to Conductor clients:
// Get the full client
let conductor_client = handler.conductor_client();
// Get specific clients
let workflow_client = conductor_client.workflow_client();
let metadata_client = handler.metadata_client();
let task_client = handler.task_client();let worker = FnWorker::new("my_task", handler_fn)
.with_thread_count(10) // Max concurrent executions
.with_poll_interval_millis(100) // Poll every 100ms
.with_domain("my_domain") // Task routing domain
.with_identity("worker-1"); // Worker identifierWorkers can be configured via environment variables:
# Global defaults
export CONDUCTOR_WORKER_POLL_INTERVAL=100
export CONDUCTOR_WORKER_DOMAIN=default
# Task-specific configuration
export CONDUCTOR_WORKER_MY_TASK_POLL_INTERVAL=50
export CONDUCTOR_WORKER_MY_TASK_THREAD_COUNT=20
export CONDUCTOR_WORKER_MY_TASK_DOMAIN=production- Environment variables (highest)
- Programmatic configuration
- Default values (lowest)
Task completed successfully:
// With a single result value
Ok(WorkerOutput::completed_with_result("success"))
// With a map of outputs
let mut output = HashMap::new();
output.insert("status".to_string(), json!("processed"));
output.insert("count".to_string(), json!(42));
Ok(WorkerOutput::completed(output))
// With no output data
Ok(WorkerOutput::complete())Task failed (will retry based on task definition):
Ok(WorkerOutput::failed("Database connection error"))Task is still running (for long-running tasks):
// Call back in 60 seconds
Ok(WorkerOutput::in_progress(60))// Using WorkerOutput::failed - task fails, may retry
let worker = FnWorker::new("my_task", |task| async move {
if let Err(e) = validate_input(&task) {
return Ok(WorkerOutput::failed(format!("Validation error: {}", e)));
}
// ... process ...
Ok(WorkerOutput::complete())
});
// Using Err - task fails with error, may retry
let worker = FnWorker::new("my_task", |task| async move {
let data = fetch_data().await
.map_err(|e| ConductorError::worker(format!("Fetch failed: {}", e)))?;
Ok(WorkerOutput::completed_with_result(data))
});Configure retry behavior in the task definition:
use conductor::models::{TaskDef, RetryLogic, TimeoutPolicy};
let task_def = TaskDef::new("my_task")
.with_retry(3, RetryLogic::ExponentialBackoff, 60) // 3 retries, exponential backoff, 60s base delay
.with_timeout(300, TimeoutPolicy::Retry); // 5 min timeout, retry on timeoutFor tasks that take longer than the response timeout:
use conductor::models::TaskInProgress;
let worker = FnWorker::new("long_task", |task: Task| async move {
let progress: i32 = task.get_input("_progress").unwrap_or(0);
if progress < 100 {
// Do some work
let new_progress = progress + 10;
// Return in-progress with callback
let in_progress = TaskInProgress::new(30) // Call back in 30 seconds
.with_output("progress", new_progress);
return Ok(WorkerOutput::InProgress(in_progress));
}
// Completed
Ok(WorkerOutput::completed_with_result("Done!"))
});let worker = FnWorker::new("my_task", |task: Task| async move {
// Get typed input with default
let count: i32 = task.get_input("count").unwrap_or(0);
let name: String = task.get_input("name").unwrap_or_else(|| "default".to_string());
// Get optional input
let optional: Option<String> = task.get_input("optional");
// Get raw JSON value
if let Some(value) = task.input_data.get("complex") {
// Handle complex value
}
Ok(WorkerOutput::complete())
});Access task metadata directly or via the TaskContext:
let worker = FnWorker::new("my_task", |task: Task| async move {
// Direct access to common fields
let task_id = task.task_id();
let workflow_id = task.workflow_instance_id();
let poll_count = task.poll_count();
let retry_count = task.retry_count();
// Check execution state
if task.is_first_poll() {
println!("First time processing this task");
}
if task.is_retry() {
println!("This is retry #{}", retry_count);
}
Ok(WorkerOutput::complete())
});For more detailed task context, use task.context():
use conductor::worker::TaskContext;
let worker = FnWorker::new("my_task", |task: Task| async move {
let ctx = task.context();
// All task metadata in one place
println!("Task ID: {}", ctx.task_id());
println!("Workflow ID: {}", ctx.workflow_instance_id());
println!("Task Type: {}", ctx.task_type());
println!("Reference Name: {}", ctx.reference_task_name());
// Execution state
println!("Poll Count: {}", ctx.poll_count());
println!("Retry Count: {}", ctx.retry_count());
println!("Iteration: {}", ctx.iteration());
// Timing (epoch milliseconds)
println!("Scheduled: {}", ctx.scheduled_time());
println!("Started: {}", ctx.start_time());
// Optional fields
if let Some(correlation_id) = ctx.correlation_id() {
println!("Correlation ID: {}", correlation_id);
}
if let Some(domain) = ctx.domain() {
println!("Domain: {}", domain);
}
// State checks
if ctx.is_first_poll() {
// Initialize resources for first poll
}
if ctx.is_retry() {
// Handle retry-specific logic
}
Ok(WorkerOutput::complete())
});Use poll count to track progress in long-running tasks:
let worker = FnWorker::new("batch_processor", |task: Task| async move {
let ctx = task.context();
let batch_size = 100;
let offset = ctx.poll_count() * batch_size;
// Process a batch of items
let items = fetch_items(offset, batch_size).await?;
if items.len() < batch_size {
// All done
Ok(WorkerOutput::completed_with_result("Processing complete"))
} else {
// More to process - callback in 5 seconds
Ok(WorkerOutput::in_progress(5))
}
});use conductor::metrics::MetricsSettings;
let mut handler = TaskHandler::new(config)?;
handler.enable_metrics(MetricsSettings {
port: 9090,
enabled: true,
});Metrics available at http://localhost:9090/metrics:
conductor_worker_tasks_polled_total- Total tasks polledconductor_worker_tasks_executed_total- Total tasks executedconductor_worker_task_duration_seconds- Task execution durationconductor_worker_poll_errors_total- Poll errors
use conductor::events::{TaskRunnerEventsListener, TaskEvent};
use std::sync::Arc;
struct MyListener;
impl TaskRunnerEventsListener for MyListener {
fn on_task_started(&self, event: &TaskEvent) {
println!("Task started: {}", event.task_id);
}
fn on_task_completed(&self, event: &TaskEvent) {
println!("Task completed: {}", event.task_id);
}
fn on_task_failed(&self, event: &TaskEvent) {
println!("Task failed: {} - {:?}", event.task_id, event.error);
}
}
handler.add_event_listener(Arc::new(MyListener));Define input/output schemas for validation:
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct MyInput {
name: String,
count: i32,
#[serde(default)]
optional: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct MyOutput {
result: String,
processed_count: i32,
}
let worker = FnWorker::new("schema_task", |task: Task| async move {
let input: MyInput = task.get_input("input").unwrap_or_default();
Ok(WorkerOutput::completed_with_result(MyOutput {
result: format!("Hello, {}!", input.name),
processed_count: input.count,
}))
})
.with_input_schema_from::<MyInput>(true) // strict mode
.with_output_schema_from::<MyOutput>(true);Design workers to be idempotent - safe to execute multiple times:
let worker = FnWorker::new("process_order", |task: Task| async move {
let order_id: String = task.get_input("order_id").unwrap_or_default();
// Check if already processed
if is_order_processed(&order_id).await? {
return Ok(WorkerOutput::completed_with_result("Already processed"));
}
// Process and mark as done atomically
process_and_mark_complete(&order_id).await?;
Ok(WorkerOutput::complete())
});// I/O-bound tasks: higher thread count
let io_worker = FnWorker::new("api_call", handler)
.with_thread_count(50);
// CPU-bound tasks: match CPU cores
let cpu_worker = FnWorker::new("compute", handler)
.with_thread_count(num_cpus::get());
// External rate-limited APIs: limit concurrency
let rate_limited_worker = FnWorker::new("external_api", handler)
.with_thread_count(5);let worker = FnWorker::new("robust_task", |task: Task| async move {
// Validate inputs
let required: String = task.get_input("required")
.ok_or_else(|| ConductorError::worker("Missing required input"))?;
// Handle transient errors with retry
let result = retry_with_backoff(|| async {
external_api_call(&required).await
}).await?;
Ok(WorkerOutput::completed_with_result(result))
});use tracing::{info, warn, error, instrument};
#[instrument(skip(task), fields(task_id = %task.task_id, workflow_id = %task.workflow_instance_id))]
async fn process_task(task: Task) -> Result<WorkerOutput> {
info!("Processing task");
match do_work(&task).await {
Ok(result) => {
info!(result = ?result, "Task completed successfully");
Ok(WorkerOutput::completed_with_result(result))
}
Err(e) => {
error!(error = %e, "Task failed");
Ok(WorkerOutput::failed(e.to_string()))
}
}
}// Share expensive resources across executions
struct DatabaseWorker {
pool: Arc<DatabasePool>,
}
#[async_trait]
impl Worker for DatabaseWorker {
fn task_definition_name(&self) -> &str {
"db_task"
}
async fn execute(&self, task: &Task) -> Result<WorkerOutput> {
// Reuse connection from pool
let conn = self.pool.get().await?;
let result = conn.query(&task.get_input::<String>("query").unwrap_or_default()).await?;
Ok(WorkerOutput::completed_with_result(result))
}
}- Task Management - Low-level task APIs
- Workflow Management - Running workflows
- Metadata Management - Registering task definitions