0%
[/]JOSHIDEAS
SYSTEM_BLOG
TIME:13:33:23
STATUS:ONLINE
~/blog/event-driven-job-queue-nodejs
$cat event-driven-job-queue-nodejs.md_

Event-Driven Job Queues: Why Polling is Dead

#Node.js#Architecture#Queues

# Event-Driven Job Queues: Why Polling is Dead

Traditional job queues poll for work at fixed intervals. This wastes resources when idle and introduces latency when busy. Event-driven architecture solves both problems.

## The Polling Problem

Consider a typical polling implementation:

javascript
// DON'T DO THIS setInterval(async () => { const jobs = await queue.getNextJobs(10); if (jobs.length === 0) { // Wasted cycle - nothing to process return; } for (const job of jobs) { await processJob(job); } }, 5000); // Check every 5 seconds

Problems:

  • >5 second latency minimum for new jobs
  • >CPU cycles wasted checking empty queue
  • >No backpressure when overwhelmed
  • >Difficult to scale horizontally

## Event-Driven Architecture

React to jobs immediately as they arrive:

javascript
const EventEmitter = require('events'); class EventQueue extends EventEmitter { constructor() { super(); this.jobs = []; this.processing = new Set(); this.maxConcurrent = 5; } add(job) { this.jobs.push(job); this.emit('job:added', job); this.processNext(); } processNext() { if (this.processing.size >= this.maxConcurrent) { return; // At capacity } const job = this.jobs.shift(); if (!job) { return; // No jobs waiting } this.processing.add(job.id); this.emit('job:started', job); this.executeJob(job) .then(() => this.emit('job:completed', job)) .catch(err => this.emit('job:failed', job, err)) .finally(() => { this.processing.delete(job.id); this.processNext(); // Check for more work }); } async executeJob(job) { // Override in subclass throw new Error('Not implemented'); } }

## Smart Retry Logic

Not all failures are equal. Handle them appropriately:

javascript
class RetryableQueue extends EventQueue { constructor(options = {}) { super(); this.maxRetries = options.maxRetries || 3; this.retryDelay = options.retryDelay || 1000; this.retryMultiplier = options.retryMultiplier || 2; } async executeJob(job) { let lastError; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { return await this.processJob(job); } catch (error) { lastError = error; if (!this.isRetryable(error)) { throw error; // Fail immediately } if (attempt < this.maxRetries) { const delay = this.retryDelay * Math.pow(this.retryMultiplier, attempt - 1); this.emit('job:retry', job, attempt, delay); await this.sleep(delay); } } } throw lastError; } isRetryable(error) { // Network errors, timeouts = retry // Validation errors = don't retry const retryableCodes = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND']; return retryableCodes.includes(error.code); } sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } }

## Per-Worker Concurrency

Different job types need different concurrency limits:

javascript
class WorkerPool { constructor() { this.workers = new Map(); } register(type, handler, options = {}) { this.workers.set(type, { handler, maxConcurrent: options.maxConcurrent || 5, active: 0, queue: [] }); } async dispatch(job) { const worker = this.workers.get(job.type); if (!worker) { throw new Error(`No worker for job type: ${job.type}`); } if (worker.active >= worker.maxConcurrent) { // Queue for later return new Promise((resolve, reject) => { worker.queue.push({ job, resolve, reject }); }); } return this.execute(worker, job); } async execute(worker, job) { worker.active++; try { return await worker.handler(job); } finally { worker.active--; this.processQueue(worker); } } processQueue(worker) { if (worker.queue.length === 0) return; if (worker.active >= worker.maxConcurrent) return; const { job, resolve, reject } = worker.queue.shift(); this.execute(worker, job) .then(resolve) .catch(reject); } }

Usage:

javascript
const pool = new WorkerPool(); // CPU-intensive: limit to 2 concurrent pool.register('image-processing', processImage, { maxConcurrent: 2 }); // I/O-bound: allow more concurrency pool.register('email-send', sendEmail, { maxConcurrent: 20 }); // External API: respect rate limits pool.register('api-call', callExternalApi, { maxConcurrent: 5 });

## Filesystem-Based Persistence

For simple deployments without Redis:

javascript
const fs = require('fs').promises; const path = require('path'); class PersistentQueue { constructor(dataDir) { this.dataDir = dataDir; this.pendingDir = path.join(dataDir, 'pending'); this.processingDir = path.join(dataDir, 'processing'); this.completedDir = path.join(dataDir, 'completed'); this.failedDir = path.join(dataDir, 'failed'); } async init() { await fs.mkdir(this.pendingDir, { recursive: true }); await fs.mkdir(this.processingDir, { recursive: true }); await fs.mkdir(this.completedDir, { recursive: true }); await fs.mkdir(this.failedDir, { recursive: true }); // Recover jobs from processing (crashed workers) await this.recoverProcessing(); } async add(job) { const filename = `${Date.now()}-${job.id}.json`; const filepath = path.join(this.pendingDir, filename); await fs.writeFile(filepath, JSON.stringify(job)); } async getNext() { const files = await fs.readdir(this.pendingDir); if (files.length === 0) return null; // Sort by timestamp (oldest first) files.sort(); const filename = files[0]; // Atomic move to processing const src = path.join(this.pendingDir, filename); const dest = path.join(this.processingDir, filename); await fs.rename(src, dest); const content = await fs.readFile(dest, 'utf8'); return { ...JSON.parse(content), _filename: filename }; } async complete(job) { const src = path.join(this.processingDir, job._filename); const dest = path.join(this.completedDir, job._filename); await fs.rename(src, dest); } async fail(job, error) { job.error = error.message; job.failedAt = Date.now(); const src = path.join(this.processingDir, job._filename); const dest = path.join(this.failedDir, job._filename); await fs.writeFile(dest, JSON.stringify(job)); await fs.unlink(src); } async recoverProcessing() { const files = await fs.readdir(this.processingDir); for (const filename of files) { const src = path.join(this.processingDir, filename); const dest = path.join(this.pendingDir, filename); await fs.rename(src, dest); } } }

## Performance Comparison

ApproachLatency (idle)CPU (idle)Throughput
Polling (5s)2.5s avgConstantLimited
Polling (1s)0.5s avg5x moreLimited
Event-driven<1msNear zeroUnlimited*

*Limited by worker capacity, not architecture

## Monitoring Events

Track everything through events:

javascript
queue.on('job:added', (job) => { metrics.increment('jobs.added', { type: job.type }); }); queue.on('job:started', (job) => { metrics.gauge('jobs.processing', queue.processing.size); }); queue.on('job:completed', (job) => { metrics.timing('jobs.duration', Date.now() - job.startedAt); }); queue.on('job:failed', (job, error) => { metrics.increment('jobs.failed', { type: job.type, error: error.code }); }); queue.on('job:retry', (job, attempt, delay) => { metrics.increment('jobs.retries', { type: job.type, attempt }); });

## Conclusion

Event-driven job queues eliminate polling waste while providing instant response to new work. Combined with smart retry logic and per-worker concurrency controls, this architecture handles any workload efficiently.

The filesystem persistence option makes deployment simple—no Redis or database required for many use cases. Start simple, scale up as needed.