Queue / Worker (Distributed)
Decouple requesting an agent run from executing it: producers enqueue jobs onto a Redis-backed queue, and a pool of workers dequeue and run them. This gives you horizontal scale, backpressure under load, retries, and fault isolation — a slow or failing worker doesn't block producers.
The example below is compiled code pulled from the
paladin-doc-examplescrate via mdBook{{#include}}. The Redis calls compile but are not executed by the check gate, so it stays in sync with theRedisQueueAdapterAPI without needing a live Redis.
Prerequisites: Run
make dev(starts Redis) first, and enable theredis-queuefeature onpaladin-storage.
When to choose it
- Choose it when you need scale-out across workers/hosts, backpressure for bursty load, automatic retries, or isolation between job execution and your request path.
- Look elsewhere when load is low and in-process execution suffices (embedded library), or you only need synchronous request/response (HTTP service host).
Producer and worker
The producer enqueues a typed AgentJob; the worker dequeues it (as generic JSON), runs the
agent through a PaladinExecutionService, and marks the item complete:
#![allow(unused)] fn main() { use std::sync::Arc; use std::time::Duration; use serde::{Deserialize, Serialize}; use paladin_core::base::entity::message::{Location, Message}; use paladin_core::platform::container::queue_item::QueueItem; use paladin_ports::output::queue_port::QueuePort; use paladin_storage::redis::{RedisQueueAdapter, RedisQueueConfig}; use paladin::application::services::paladin::paladin_execution_service::PaladinExecutionService; use paladin::prelude::*; // Paladin, PaladinResult, ... const QUEUE: &str = "agent-jobs"; /// The unit of work a producer enqueues and a worker executes. #[derive(Clone, Serialize, Deserialize)] struct AgentJob { agent: String, input: String, } /// **Producer** — connect to Redis and enqueue an agent job. Many producers can /// enqueue concurrently; the queue absorbs bursts and applies backpressure. pub async fn enqueue_job() -> Result<(), Box<dyn std::error::Error>> { let queue = RedisQueueAdapter::new(RedisQueueConfig::default(), None).await?; queue.create_queue(QUEUE.to_string(), None).await?; let job = AgentJob { agent: "summarizer".to_string(), input: "Summarise the Q3 earnings call.".to_string(), }; let message = Message::new( Location::service("producer"), Location::service("worker"), job, ); let item = QueueItem::new(QUEUE.to_string(), message, None); let id = queue.enqueue(QUEUE, item).await?; println!("enqueued job {id}"); Ok(()) } /// **Worker** — pull jobs off the queue and run them through a /// `PaladinExecutionService`. Run many of these (in this process or across hosts) /// to scale out. Each item is marked in-progress, then completed with its result. pub async fn run_worker( queue: &RedisQueueAdapter, service: &PaladinExecutionService, agent: &Paladin, ) -> Result<(), Box<dyn std::error::Error>> { while let Some(item) = queue.dequeue(QUEUE).await? { let item_id = item.action.id; queue .start_processing(QUEUE, item_id, "worker-1".to_string()) .await?; // The dequeued payload is generic JSON; read the agent input from it. let input = item.message.payload()["input"].as_str().unwrap_or_default(); let result: PaladinResult = service.execute(agent, input).await?; queue .complete_processing( QUEUE, item_id, Some(serde_json::json!({ "output": result.output })), ) .await?; } Ok(()) } }
Run many workers — in this process via several tokio tasks, or as separate processes across
hosts — all pulling from the same queue. start_processing / complete_processing /
fail_processing track each item's lifecycle, and failures can retry up to the configured
limit.
Configuring the queue
RedisQueueConfig is typically populated from config.yml:
queue:
redis_host: "localhost"
redis_port: 6379
redis_db: 0
connection_timeout: 30
key_prefix: "paladin:queue"
max_retries: 3
See also
- Standing up Redis and the adapter in detail — Redis Queue Adapter Setup.
- Each worker is itself an embedded agent host; a worker can also run a Battalion as its unit of work.
← Back to Choosing a topology