Queue / Worker (Distributed)

Decouple requesting an agent run from executing it: producers enqueue jobs onto a Redis-backed queue, and a pool of workers dequeue and run them. This gives you horizontal scale, backpressure under load, retries, and fault isolation — a slow or failing worker doesn't block producers.

The example below is compiled code pulled from the paladin-doc-examples crate via mdBook {{#include}}. The Redis calls compile but are not executed by the check gate, so it stays in sync with the RedisQueueAdapter API without needing a live Redis.

Prerequisites: Run make dev (starts Redis) first, and enable the redis-queue feature on paladin-storage.

When to choose it

  • Choose it when you need scale-out across workers/hosts, backpressure for bursty load, automatic retries, or isolation between job execution and your request path.
  • Look elsewhere when load is low and in-process execution suffices (embedded library), or you only need synchronous request/response (HTTP service host).

Producer and worker

The producer enqueues a typed AgentJob; the worker dequeues it (as generic JSON), runs the agent through a PaladinExecutionService, and marks the item complete:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use std::time::Duration;

use serde::{Deserialize, Serialize};

use paladin_core::base::entity::message::{Location, Message};
use paladin_core::platform::container::queue_item::QueueItem;
use paladin_ports::output::queue_port::QueuePort;
use paladin_storage::redis::{RedisQueueAdapter, RedisQueueConfig};

use paladin::application::services::paladin::paladin_execution_service::PaladinExecutionService;
use paladin::prelude::*; // Paladin, PaladinResult, ...

const QUEUE: &str = "agent-jobs";

/// The unit of work a producer enqueues and a worker executes.
#[derive(Clone, Serialize, Deserialize)]
struct AgentJob {
    agent: String,
    input: String,
}

/// **Producer** — connect to Redis and enqueue an agent job. Many producers can
/// enqueue concurrently; the queue absorbs bursts and applies backpressure.
pub async fn enqueue_job() -> Result<(), Box<dyn std::error::Error>> {
    let queue = RedisQueueAdapter::new(RedisQueueConfig::default(), None).await?;
    queue.create_queue(QUEUE.to_string(), None).await?;

    let job = AgentJob {
        agent: "summarizer".to_string(),
        input: "Summarise the Q3 earnings call.".to_string(),
    };
    let message = Message::new(
        Location::service("producer"),
        Location::service("worker"),
        job,
    );
    let item = QueueItem::new(QUEUE.to_string(), message, None);

    let id = queue.enqueue(QUEUE, item).await?;
    println!("enqueued job {id}");
    Ok(())
}

/// **Worker** — pull jobs off the queue and run them through a
/// `PaladinExecutionService`. Run many of these (in this process or across hosts)
/// to scale out. Each item is marked in-progress, then completed with its result.
pub async fn run_worker(
    queue: &RedisQueueAdapter,
    service: &PaladinExecutionService,
    agent: &Paladin,
) -> Result<(), Box<dyn std::error::Error>> {
    while let Some(item) = queue.dequeue(QUEUE).await? {
        let item_id = item.action.id;
        queue
            .start_processing(QUEUE, item_id, "worker-1".to_string())
            .await?;

        // The dequeued payload is generic JSON; read the agent input from it.
        let input = item.message.payload()["input"].as_str().unwrap_or_default();
        let result: PaladinResult = service.execute(agent, input).await?;

        queue
            .complete_processing(
                QUEUE,
                item_id,
                Some(serde_json::json!({ "output": result.output })),
            )
            .await?;
    }
    Ok(())
}
}

Run many workers — in this process via several tokio tasks, or as separate processes across hosts — all pulling from the same queue. start_processing / complete_processing / fail_processing track each item's lifecycle, and failures can retry up to the configured limit.

Configuring the queue

RedisQueueConfig is typically populated from config.yml:

queue:
  redis_host: "localhost"
  redis_port: 6379
  redis_db: 0
  connection_timeout: 30
  key_prefix: "paladin:queue"
  max_retries: 3

See also


← Back to Choosing a topology