Event-Driven Job Queues: Why Polling is Dead
# Event-Driven Job Queues: Why Polling is Dead
Traditional job queues poll for work at fixed intervals. This wastes resources when idle and introduces latency when busy. Event-driven architecture solves both problems.
## The Polling Problem
Consider a typical polling implementation:
// DON'T DO THIS
setInterval(async () => {
const jobs = await queue.getNextJobs(10);
if (jobs.length === 0) {
// Wasted cycle - nothing to process
return;
}
for (const job of jobs) {
await processJob(job);
}
}, 5000); // Check every 5 seconds
Problems:
- >5 second latency minimum for new jobs
- >CPU cycles wasted checking empty queue
- >No backpressure when overwhelmed
- >Difficult to scale horizontally
## Event-Driven Architecture
React to jobs immediately as they arrive:
const EventEmitter = require('events');
class EventQueue extends EventEmitter {
constructor() {
super();
this.jobs = [];
this.processing = new Set();
this.maxConcurrent = 5;
}
add(job) {
this.jobs.push(job);
this.emit('job:added', job);
this.processNext();
}
processNext() {
if (this.processing.size >= this.maxConcurrent) {
return; // At capacity
}
const job = this.jobs.shift();
if (!job) {
return; // No jobs waiting
}
this.processing.add(job.id);
this.emit('job:started', job);
this.executeJob(job)
.then(() => this.emit('job:completed', job))
.catch(err => this.emit('job:failed', job, err))
.finally(() => {
this.processing.delete(job.id);
this.processNext(); // Check for more work
});
}
async executeJob(job) {
// Override in subclass
throw new Error('Not implemented');
}
}
## Smart Retry Logic
Not all failures are equal. Handle them appropriately:
class RetryableQueue extends EventQueue {
constructor(options = {}) {
super();
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 1000;
this.retryMultiplier = options.retryMultiplier || 2;
}
async executeJob(job) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await this.processJob(job);
} catch (error) {
lastError = error;
if (!this.isRetryable(error)) {
throw error; // Fail immediately
}
if (attempt < this.maxRetries) {
const delay = this.retryDelay Math.pow(this.retryMultiplier, attempt - 1);
this.emit('job:retry', job, attempt, delay);
await this.sleep(delay);
}
}
}
throw lastError;
}
isRetryable(error) {
// Network errors, timeouts = retry
// Validation errors = don't retry
const retryableCodes = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND'];
return retryableCodes.includes(error.code);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
## Per-Worker Concurrency
Different job types need different concurrency limits:
class WorkerPool {
constructor() {
this.workers = new Map();
}
register(type, handler, options = {}) {
this.workers.set(type, {
handler,
maxConcurrent: options.maxConcurrent || 5,
active: 0,
queue: []
});
}
async dispatch(job) {
const worker = this.workers.get(job.type);
if (!worker) {
throw new Error(No worker for job type: ${job.type});
}
if (worker.active >= worker.maxConcurrent) {
// Queue for later
return new Promise((resolve, reject) => {
worker.queue.push({ job, resolve, reject });
});
}
return this.execute(worker, job);
}
async execute(worker, job) {
worker.active++;
try {
return await worker.handler(job);
} finally {
worker.active--;
this.processQueue(worker);
}
}
processQueue(worker) {
if (worker.queue.length === 0) return;
if (worker.active >= worker.maxConcurrent) return;
const { job, resolve, reject } = worker.queue.shift();
this.execute(worker, job)
.then(resolve)
.catch(reject);
}
}
Usage:
const pool = new WorkerPool();
// CPU-intensive: limit to 2 concurrent
pool.register('image-processing', processImage, { maxConcurrent: 2 });
// I/O-bound: allow more concurrency
pool.register('email-send', sendEmail, { maxConcurrent: 20 });
// External API: respect rate limits
pool.register('api-call', callExternalApi, { maxConcurrent: 5 });
## Filesystem-Based Persistence
For simple deployments without Redis:
const fs = require('fs').promises;
const path = require('path');
class PersistentQueue {
constructor(dataDir) {
this.dataDir = dataDir;
this.pendingDir = path.join(dataDir, 'pending');
this.processingDir = path.join(dataDir, 'processing');
this.completedDir = path.join(dataDir, 'completed');
this.failedDir = path.join(dataDir, 'failed');
}
async init() {
await fs.mkdir(this.pendingDir, { recursive: true });
await fs.mkdir(this.processingDir, { recursive: true });
await fs.mkdir(this.completedDir, { recursive: true });
await fs.mkdir(this.failedDir, { recursive: true });
// Recover jobs from processing (crashed workers)
await this.recoverProcessing();
}
async add(job) {
const filename = ${Date.now()}-${job.id}.json;
const filepath = path.join(this.pendingDir, filename);
await fs.writeFile(filepath, JSON.stringify(job));
}
async getNext() {
const files = await fs.readdir(this.pendingDir);
if (files.length === 0) return null;
// Sort by timestamp (oldest first)
files.sort();
const filename = files[0];
// Atomic move to processing
const src = path.join(this.pendingDir, filename);
const dest = path.join(this.processingDir, filename);
await fs.rename(src, dest);
const content = await fs.readFile(dest, 'utf8');
return { ...JSON.parse(content), filename: filename };
}
async complete(job) {
const src = path.join(this.processingDir, job.filename);
const dest = path.join(this.completedDir, job.filename);
await fs.rename(src, dest);
}
async fail(job, error) {
job.error = error.message;
job.failedAt = Date.now();
const src = path.join(this.processingDir, job.filename);
const dest = path.join(this.failedDir, job._filename);
await fs.writeFile(dest, JSON.stringify(job));
await fs.unlink(src);
}
async recoverProcessing() {
const files = await fs.readdir(this.processingDir);
for (const filename of files) {
const src = path.join(this.processingDir, filename);
const dest = path.join(this.pendingDir, filename);
await fs.rename(src, dest);
}
}
}
## Performance Comparison
| Approach | Latency (idle) | CPU (idle) | Throughput |
|---|---|---|---|
| Polling (5s) | 2.5s avg | Constant | Limited |
| Polling (1s) | 0.5s avg | 5x more | Limited |
| Event-driven | <1ms | Near zero | Unlimited |
## Monitoring Events
Track everything through events:
queue.on('job:added', (job) => {
metrics.increment('jobs.added', { type: job.type });
});
queue.on('job:started', (job) => {
metrics.gauge('jobs.processing', queue.processing.size);
});
queue.on('job:completed', (job) => {
metrics.timing('jobs.duration', Date.now() - job.startedAt);
});
queue.on('job:failed', (job, error) => {
metrics.increment('jobs.failed', { type: job.type, error: error.code });
});
queue.on('job:retry', (job, attempt, delay) => {
metrics.increment('jobs.retries', { type: job.type, attempt });
});
## Conclusion
Event-driven job queues eliminate polling waste while providing instant response to new work. Combined with smart retry logic and per-worker concurrency controls, this architecture handles any workload efficiently.
The filesystem persistence option makes deployment simple—no Redis or database required for many use cases. Start simple, scale up as needed.