SYSTEM_BLOG
TIME: 00:00:00
STATUS: ONLINE
~/blog/event-driven-job-queue-nodejs
$ cat event-driven-job-queue-nodejs.md _
| 2025-10-10 | 9 min

Event-Driven Job Queues: Why Polling is Dead

# Event-Driven Job Queues: Why Polling is Dead

Traditional job queues poll for work at fixed intervals. This wastes resources when idle and introduces latency when busy. Event-driven architecture solves both problems.

## The Polling Problem

Consider a typical polling implementation:

JAVASCRIPT
// DON'T DO THIS
setInterval(async () => {
  const jobs = await queue.getNextJobs(10);

if (jobs.length === 0) {
// Wasted cycle - nothing to process
return;
}

for (const job of jobs) {
await processJob(job);
}
}, 5000); // Check every 5 seconds

Problems:


## Event-Driven Architecture

React to jobs immediately as they arrive:

JAVASCRIPT
const EventEmitter = require('events');

class EventQueue extends EventEmitter {
constructor() {
super();
this.jobs = [];
this.processing = new Set();
this.maxConcurrent = 5;
}

add(job) {
this.jobs.push(job);
this.emit('job:added', job);
this.processNext();
}

processNext() {
if (this.processing.size >= this.maxConcurrent) {
return; // At capacity
}

const job = this.jobs.shift();
if (!job) {
return; // No jobs waiting
}

this.processing.add(job.id);
this.emit('job:started', job);

this.executeJob(job)
.then(() => this.emit('job:completed', job))
.catch(err => this.emit('job:failed', job, err))
.finally(() => {
this.processing.delete(job.id);
this.processNext(); // Check for more work
});
}

async executeJob(job) {
// Override in subclass
throw new Error('Not implemented');
}
}

## Smart Retry Logic

Not all failures are equal. Handle them appropriately:

JAVASCRIPT
class RetryableQueue extends EventQueue {
  constructor(options = {}) {
    super();
    this.maxRetries = options.maxRetries || 3;
    this.retryDelay = options.retryDelay || 1000;
    this.retryMultiplier = options.retryMultiplier || 2;
  }

async executeJob(job) {
let lastError;

for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await this.processJob(job);
} catch (error) {
lastError = error;

if (!this.isRetryable(error)) {
throw error; // Fail immediately
}

if (attempt < this.maxRetries) {
const delay = this.retryDelay Math.pow(this.retryMultiplier, attempt - 1);
this.emit('job:retry', job, attempt, delay);
await this.sleep(delay);
}
}
}

throw lastError;
}

isRetryable(error) {
// Network errors, timeouts = retry
// Validation errors = don't retry
const retryableCodes = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND'];
return retryableCodes.includes(error.code);
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

## Per-Worker Concurrency

Different job types need different concurrency limits:

JAVASCRIPT
class WorkerPool {
  constructor() {
    this.workers = new Map();
  }

register(type, handler, options = {}) {
this.workers.set(type, {
handler,
maxConcurrent: options.maxConcurrent || 5,
active: 0,
queue: []
});
}

async dispatch(job) {
const worker = this.workers.get(job.type);

if (!worker) {
throw new Error(No worker for job type: ${job.type});
}

if (worker.active >= worker.maxConcurrent) {
// Queue for later
return new Promise((resolve, reject) => {
worker.queue.push({ job, resolve, reject });
});
}

return this.execute(worker, job);
}

async execute(worker, job) {
worker.active++;

try {
return await worker.handler(job);
} finally {
worker.active--;
this.processQueue(worker);
}
}

processQueue(worker) {
if (worker.queue.length === 0) return;
if (worker.active >= worker.maxConcurrent) return;

const { job, resolve, reject } = worker.queue.shift();

this.execute(worker, job)
.then(resolve)
.catch(reject);
}
}

Usage:

JAVASCRIPT
const pool = new WorkerPool();

// CPU-intensive: limit to 2 concurrent
pool.register('image-processing', processImage, { maxConcurrent: 2 });

// I/O-bound: allow more concurrency
pool.register('email-send', sendEmail, { maxConcurrent: 20 });

// External API: respect rate limits
pool.register('api-call', callExternalApi, { maxConcurrent: 5 });

## Filesystem-Based Persistence

For simple deployments without Redis:

JAVASCRIPT
const fs = require('fs').promises;
const path = require('path');

class PersistentQueue {
constructor(dataDir) {
this.dataDir = dataDir;
this.pendingDir = path.join(dataDir, 'pending');
this.processingDir = path.join(dataDir, 'processing');
this.completedDir = path.join(dataDir, 'completed');
this.failedDir = path.join(dataDir, 'failed');
}

async init() {
await fs.mkdir(this.pendingDir, { recursive: true });
await fs.mkdir(this.processingDir, { recursive: true });
await fs.mkdir(this.completedDir, { recursive: true });
await fs.mkdir(this.failedDir, { recursive: true });

// Recover jobs from processing (crashed workers)
await this.recoverProcessing();
}

async add(job) {
const filename = ${Date.now()}-${job.id}.json;
const filepath = path.join(this.pendingDir, filename);
await fs.writeFile(filepath, JSON.stringify(job));
}

async getNext() {
const files = await fs.readdir(this.pendingDir);
if (files.length === 0) return null;

// Sort by timestamp (oldest first)
files.sort();
const filename = files[0];

// Atomic move to processing
const src = path.join(this.pendingDir, filename);
const dest = path.join(this.processingDir, filename);
await fs.rename(src, dest);

const content = await fs.readFile(dest, 'utf8');
return { ...JSON.parse(content), filename: filename };
}

async complete(job) {
const src = path.join(this.processingDir, job.
filename);
const dest = path.join(this.completedDir, job.filename);
await fs.rename(src, dest);
}

async fail(job, error) {
job.error = error.message;
job.failedAt = Date.now();

const src = path.join(this.processingDir, job.filename);
const dest = path.join(this.failedDir, job._filename);
await fs.writeFile(dest, JSON.stringify(job));
await fs.unlink(src);
}

async recoverProcessing() {
const files = await fs.readdir(this.processingDir);
for (const filename of files) {
const src = path.join(this.processingDir, filename);
const dest = path.join(this.pendingDir, filename);
await fs.rename(src, dest);
}
}
}

## Performance Comparison

ApproachLatency (idle)CPU (idle)Throughput
Polling (5s)2.5s avgConstantLimited
Polling (1s)0.5s avg5x moreLimited
Event-driven<1msNear zeroUnlimited
*Limited by worker capacity, not architecture

## Monitoring Events

Track everything through events:

JAVASCRIPT
queue.on('job:added', (job) => {
  metrics.increment('jobs.added', { type: job.type });
});

queue.on('job:started', (job) => {
metrics.gauge('jobs.processing', queue.processing.size);
});

queue.on('job:completed', (job) => {
metrics.timing('jobs.duration', Date.now() - job.startedAt);
});

queue.on('job:failed', (job, error) => {
metrics.increment('jobs.failed', { type: job.type, error: error.code });
});

queue.on('job:retry', (job, attempt, delay) => {
metrics.increment('jobs.retries', { type: job.type, attempt });
});

## Conclusion

Event-driven job queues eliminate polling waste while providing instant response to new work. Combined with smart retry logic and per-worker concurrency controls, this architecture handles any workload efficiently.

The filesystem persistence option makes deployment simple—no Redis or database required for many use cases. Start simple, scale up as needed.