Redis Queue Adapter Setup
This section describes how to set up and use the Redis queue adapter for the paladin framework.
Prerequisites
- Docker and Docker Compose
- Rust 1.75 or later
- Redis 7.0 or later (if running locally)
Quick Start
1. Start with Docker Compose
The easiest way to get started is using Docker Compose:
# Clone the repository
git clone <repository-url>
cd paladin
# Start Redis and the application
docker-compose -f docker/docker-compose.yml up -d
# Check service health
docker-compose ps
2. Development Setup
For development with auto-reload:
# Start Redis and development tools
docker-compose -f docker/docker-compose.yml -f docker/docker-compose.dev.yml up -d
# Or run locally with Redis in Docker
docker run -d --name redis -p 6379:6379 redis:7-alpine
# Run the application locally
RUST_LOG=debug cargo run
3. Testing
Run the integration tests:
# Using Docker (recommended)
docker-compose -f docker/docker-compose.test.yml up --build test-runner
# Or locally (requires Redis running)
cargo test queue_integration_tests
Configuration
Environment Variables
The Redis queue adapter can be configured using environment variables:
# Redis connection
export APP_REDIS_HOST=localhost
export APP_REDIS_PORT=6379
export APP_REDIS_PASSWORD=your_password # Optional
export APP_REDIS_DB=0
export APP_REDIS_CONNECTION_TIMEOUT=30
# Queue settings
export APP_REDIS_KEY_PREFIX=paladin:queue
export APP_REDIS_MAX_RETRIES=3
export APP_REDIS_ENABLE_PRIORITY_QUEUES=true
Configuration File
Add queue configuration to your config.toml:
[queue]
redis_host = "localhost"
redis_port = 6379
redis_password = "" # Optional
redis_db = 0
connection_timeout = 30
key_prefix = "paladin:queue"
max_retries = 3
enable_priority_queues = true
Queue Operations
Basic Usage
#![allow(unused)] fn main() { use paladin::infrastructure::adapters::queue::redis::RedisQueueAdapter; use paladin::paladin_ports::output::queue_port::QueuePort; // Initialize the adapter let config = RedisQueueConfig::default(); let adapter = RedisQueueAdapter::new(config, None).await?; // Create a queue adapter.create_queue("my-queue".to_string(), None).await?; // Enqueue an item let message = Message::new( Location::service("producer"), Location::service("consumer"), serde_json::json!({"task": "process_data", "id": 123}) ); let queue_item = QueueItem::new("my-queue".to_string(), message, None); let item_id = adapter.enqueue("my-queue", queue_item).await?; // Dequeue an item if let Some(item) = adapter.dequeue("my-queue").await? { // Process the item adapter.start_processing("my-queue", item.id(), "worker-1".to_string()).await?; // Complete processing let result = serde_json::json!({"status": "completed"}); adapter.complete_processing("my-queue", item.id(), Some(result)).await?; } }
Priority Queues
#![allow(unused)] fn main() { use paladin::core::base::entity::message::MessagePriority; // Enqueue with priority adapter.enqueue_with_priority("priority-queue", high_priority_item, MessagePriority::High).await?; // Dequeue highest priority first let item = adapter.dequeue_highest_priority("priority-queue").await?; }
Batch Operations
#![allow(unused)] fn main() { // Enqueue multiple items at once let items = vec![item1, item2, item3]; let item_ids = adapter.enqueue_batch("batch-queue", items).await?; // Dequeue multiple items let items = adapter.dequeue_batch("batch-queue", 5).await?; }
Monitoring and Management
Redis Commander (Development)
Access Redis Commander for queue inspection:
# Start with development profile
docker-compose --profile dev up -d
# Access Redis Commander
open http://localhost:8081
# Login: admin/admin (configurable via environment)
Queue Statistics
#![allow(unused)] fn main() { // Get queue statistics let stats = adapter.get_queue_stats("my-queue").await?; println!("Pending: {}, Processing: {}, Completed: {}, Failed: {}", stats.pending_items, stats.processing_items, stats.completed_items, stats.failed_items); // Get all queue statistics let all_stats = adapter.get_all_stats().await; for (queue_name, stats) in all_stats { println!("Queue {}: {} total items", queue_name, stats.total_items); } }
Health Checks
#![allow(unused)] fn main() { // Check adapter health let is_healthy = adapter.health_check().await?; }
Queue Management
Retry Failed Items
#![allow(unused)] fn main() { // Retry a specific failed item adapter.retry_item("my-queue", failed_item_id).await?; }
Purge Completed/Failed Items
#![allow(unused)] fn main() { // Clean up completed items let purged_completed = adapter.purge_completed("my-queue").await?; // Clean up failed items let purged_failed = adapter.purge_failed("my-queue").await?; }
Pause/Resume Queues
#![allow(unused)] fn main() { // Pause queue processing adapter.pause_queue("my-queue").await?; // Resume queue processing adapter.resume_queue("my-queue").await?; }
Redis Key Structure
The adapter uses the following Redis key patterns:
paladin:queue:{queue_name} # Main queue (FIFO list)
paladin:queue:{queue_name}:high # High priority queue
paladin:queue:{queue_name}:normal # Normal priority queue
paladin:queue:{queue_name}:low # Low priority queue
paladin:queue:{queue_name}:critical # Critical priority queue
paladin:queue:meta:{queue_name} # Queue metadata (hash)
paladin:queue:processing:{queue_name} # Items being processed (hash)
paladin:queue:completed:{queue_name} # Completed items (hash)
paladin:queue:failed:{queue_name} # Failed items (hash)
Error Handling
The adapter provides comprehensive error handling:
#![allow(unused)] fn main() { use paladin::core::platform::manager::queue_service::QueueError; match adapter.enqueue("my-queue", item).await { Ok(item_id) => println!("Enqueued item: {}", item_id), Err(QueueError::QueueNotFound(name)) => println!("Queue {} not found", name), Err(QueueError::QueueFull { queue_name, capacity }) => { println!("Queue {} is full (capacity: {})", queue_name, capacity) }, Err(QueueError::OperationFailed(msg)) => println!("Operation failed: {}", msg), Err(e) => println!("Other error: {}", e), } }
Performance Considerations
Connection Pooling
The adapter uses Redis connection manager for efficient connection pooling:
#![allow(unused)] fn main() { // Connections are automatically managed // No need for manual connection handling }
Batch Operations
Use batch operations for better performance:
#![allow(unused)] fn main() { // Instead of multiple single enqueues for item in items { adapter.enqueue("queue", item).await?; // Slower } // Use batch enqueue adapter.enqueue_batch("queue", items).await?; // Faster }
Pipeline Operations
The adapter internally uses Redis pipelines for efficient batch operations.
Troubleshooting
Common Issues
-
Connection Failed
# Check Redis is running docker ps | grep redis # Check Redis connectivity redis-cli ping -
Permission Denied
# Check Redis password configuration # Ensure APP_REDIS_PASSWORD matches Redis requirepass -
Memory Issues
# Check Redis memory usage redis-cli info memory # Configure maxmemory policy in redis.conf maxmemory-policy allkeys-lru
Debug Logging
Enable debug logging for detailed queue operations:
RUST_LOG=debug cargo run
Redis Logs
Check Redis logs for connection and operation issues:
# Docker logs
docker logs paladin-redis
# Or check Redis info
redis-cli info
Production Deployment
Redis Configuration
For production, ensure proper Redis configuration:
- Persistence: Enable AOF for durability
- Memory: Set appropriate maxmemory and policy
- Security: Use password authentication
- Monitoring: Enable slow log and latency monitoring
High Availability
Consider Redis Sentinel or Cluster for high availability:
# docker-compose.prod.yml
services:
redis-master:
image: redis:7-alpine
command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}
redis-replica:
image: redis:7-alpine
command: redis-server --appendonly yes --slaveof redis-master 6379
Monitoring
Use Redis monitoring tools:
- Redis Insight for GUI-based monitoring
- Prometheus Redis exporter for metrics
- Custom health checks in your application
Testing
The adapter includes comprehensive integration tests. Run them with:
# Full test suite
cargo test
# Queue-specific tests
cargo test queue_integration_tests
# With logging
RUST_LOG=debug cargo test queue_integration_tests -- --nocapture
Examples
See the examples/ directory for complete usage examples:
examples/basic_queue.rs- Basic queue operationsexamples/priority_queue.rs- Priority queue usageexamples/batch_processing.rs- Batch operationsexamples/error_handling.rs- Error handling patterns