Event-Driven Job Queues: Why Polling is Dead
# Event-Driven Job Queues: Why Polling is Dead
Traditional job queues poll for work at fixed intervals. This wastes resources when idle and introduces latency when busy. Event-driven architecture solves both problems.
## The Polling Problem
Consider a typical polling implementation:
javascript// DON'T DO THIS setInterval(async () => { const jobs = await queue.getNextJobs(10); if (jobs.length === 0) { // Wasted cycle - nothing to process return; } for (const job of jobs) { await processJob(job); } }, 5000); // Check every 5 seconds
Problems:
- >5 second latency minimum for new jobs
- >CPU cycles wasted checking empty queue
- >No backpressure when overwhelmed
- >Difficult to scale horizontally
## Event-Driven Architecture
React to jobs immediately as they arrive:
javascriptconst EventEmitter = require('events'); class EventQueue extends EventEmitter { constructor() { super(); this.jobs = []; this.processing = new Set(); this.maxConcurrent = 5; } add(job) { this.jobs.push(job); this.emit('job:added', job); this.processNext(); } processNext() { if (this.processing.size >= this.maxConcurrent) { return; // At capacity } const job = this.jobs.shift(); if (!job) { return; // No jobs waiting } this.processing.add(job.id); this.emit('job:started', job); this.executeJob(job) .then(() => this.emit('job:completed', job)) .catch(err => this.emit('job:failed', job, err)) .finally(() => { this.processing.delete(job.id); this.processNext(); // Check for more work }); } async executeJob(job) { // Override in subclass throw new Error('Not implemented'); } }
## Smart Retry Logic
Not all failures are equal. Handle them appropriately:
javascriptclass RetryableQueue extends EventQueue { constructor(options = {}) { super(); this.maxRetries = options.maxRetries || 3; this.retryDelay = options.retryDelay || 1000; this.retryMultiplier = options.retryMultiplier || 2; } async executeJob(job) { let lastError; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { return await this.processJob(job); } catch (error) { lastError = error; if (!this.isRetryable(error)) { throw error; // Fail immediately } if (attempt < this.maxRetries) { const delay = this.retryDelay * Math.pow(this.retryMultiplier, attempt - 1); this.emit('job:retry', job, attempt, delay); await this.sleep(delay); } } } throw lastError; } isRetryable(error) { // Network errors, timeouts = retry // Validation errors = don't retry const retryableCodes = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND']; return retryableCodes.includes(error.code); } sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } }
## Per-Worker Concurrency
Different job types need different concurrency limits:
javascriptclass WorkerPool { constructor() { this.workers = new Map(); } register(type, handler, options = {}) { this.workers.set(type, { handler, maxConcurrent: options.maxConcurrent || 5, active: 0, queue: [] }); } async dispatch(job) { const worker = this.workers.get(job.type); if (!worker) { throw new Error(`No worker for job type: ${job.type}`); } if (worker.active >= worker.maxConcurrent) { // Queue for later return new Promise((resolve, reject) => { worker.queue.push({ job, resolve, reject }); }); } return this.execute(worker, job); } async execute(worker, job) { worker.active++; try { return await worker.handler(job); } finally { worker.active--; this.processQueue(worker); } } processQueue(worker) { if (worker.queue.length === 0) return; if (worker.active >= worker.maxConcurrent) return; const { job, resolve, reject } = worker.queue.shift(); this.execute(worker, job) .then(resolve) .catch(reject); } }
Usage:
javascriptconst pool = new WorkerPool(); // CPU-intensive: limit to 2 concurrent pool.register('image-processing', processImage, { maxConcurrent: 2 }); // I/O-bound: allow more concurrency pool.register('email-send', sendEmail, { maxConcurrent: 20 }); // External API: respect rate limits pool.register('api-call', callExternalApi, { maxConcurrent: 5 });
## Filesystem-Based Persistence
For simple deployments without Redis:
javascriptconst fs = require('fs').promises; const path = require('path'); class PersistentQueue { constructor(dataDir) { this.dataDir = dataDir; this.pendingDir = path.join(dataDir, 'pending'); this.processingDir = path.join(dataDir, 'processing'); this.completedDir = path.join(dataDir, 'completed'); this.failedDir = path.join(dataDir, 'failed'); } async init() { await fs.mkdir(this.pendingDir, { recursive: true }); await fs.mkdir(this.processingDir, { recursive: true }); await fs.mkdir(this.completedDir, { recursive: true }); await fs.mkdir(this.failedDir, { recursive: true }); // Recover jobs from processing (crashed workers) await this.recoverProcessing(); } async add(job) { const filename = `${Date.now()}-${job.id}.json`; const filepath = path.join(this.pendingDir, filename); await fs.writeFile(filepath, JSON.stringify(job)); } async getNext() { const files = await fs.readdir(this.pendingDir); if (files.length === 0) return null; // Sort by timestamp (oldest first) files.sort(); const filename = files[0]; // Atomic move to processing const src = path.join(this.pendingDir, filename); const dest = path.join(this.processingDir, filename); await fs.rename(src, dest); const content = await fs.readFile(dest, 'utf8'); return { ...JSON.parse(content), _filename: filename }; } async complete(job) { const src = path.join(this.processingDir, job._filename); const dest = path.join(this.completedDir, job._filename); await fs.rename(src, dest); } async fail(job, error) { job.error = error.message; job.failedAt = Date.now(); const src = path.join(this.processingDir, job._filename); const dest = path.join(this.failedDir, job._filename); await fs.writeFile(dest, JSON.stringify(job)); await fs.unlink(src); } async recoverProcessing() { const files = await fs.readdir(this.processingDir); for (const filename of files) { const src = path.join(this.processingDir, filename); const dest = path.join(this.pendingDir, filename); await fs.rename(src, dest); } } }
## Performance Comparison
| Approach | Latency (idle) | CPU (idle) | Throughput |
|---|---|---|---|
| Polling (5s) | 2.5s avg | Constant | Limited |
| Polling (1s) | 0.5s avg | 5x more | Limited |
| Event-driven | <1ms | Near zero | Unlimited* |
*Limited by worker capacity, not architecture
## Monitoring Events
Track everything through events:
javascriptqueue.on('job:added', (job) => { metrics.increment('jobs.added', { type: job.type }); }); queue.on('job:started', (job) => { metrics.gauge('jobs.processing', queue.processing.size); }); queue.on('job:completed', (job) => { metrics.timing('jobs.duration', Date.now() - job.startedAt); }); queue.on('job:failed', (job, error) => { metrics.increment('jobs.failed', { type: job.type, error: error.code }); }); queue.on('job:retry', (job, attempt, delay) => { metrics.increment('jobs.retries', { type: job.type, attempt }); });
## Conclusion
Event-driven job queues eliminate polling waste while providing instant response to new work. Combined with smart retry logic and per-worker concurrency controls, this architecture handles any workload efficiently.
The filesystem persistence option makes deployment simple—no Redis or database required for many use cases. Start simple, scale up as needed.