Skip to content

Queue

@veloxts/queue provides background job processing with type-safe job definitions, retries, delays, and priorities.

Terminal window
pnpm add @veloxts/queue
# For BullMQ (production)
pnpm add bullmq ioredis
import { queuePlugin } from '@veloxts/queue';
// Sync driver - jobs run immediately in-process
app.register(queuePlugin({
driver: 'sync',
}));
import { defineJob } from '@veloxts/queue';
import { z } from 'zod';
export const SendWelcomeEmail = defineJob({
name: 'send-welcome-email',
schema: z.object({
userId: z.string(),
email: z.string().email(),
}),
handler: async ({ data, ctx, progress }) => {
await progress(50);
await ctx.mail.send(WelcomeEmail, { to: data.email, data: { ... } });
await progress(100);
},
options: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
});
// Dispatch immediately
await ctx.queue.dispatch(SendWelcomeEmail, {
userId: '123',
email: 'user@example.com',
});
// With delay
await ctx.queue.dispatch(SendWelcomeEmail, data, {
delay: '10m',
});
// With priority (lower = higher priority)
await ctx.queue.dispatch(SendWelcomeEmail, data, {
priority: 1,
});
// Batch dispatch
await ctx.queue.dispatchBatch(SendWelcomeEmail, [
{ userId: '1', email: 'a@example.com' },
{ userId: '2', email: 'b@example.com' },
]);
defineJob({
name: 'my-job',
schema: MySchema,
handler: async ({ data }) => { ... },
options: {
attempts: 3, // Retry count
backoff: {
type: 'exponential', // or 'fixed'
delay: 1000, // Base delay (ms)
},
priority: 1, // Lower = higher priority
timeout: 30000, // Job timeout (ms)
removeOnComplete: true, // Clean up completed
removeOnFail: false, // Keep failed for inspection
},
});

Sync

Immediate execution for development and testing.

BullMQ

Redis-backed queues for production with retries and monitoring.

FeatureSyncBullMQ
Persistent jobsNoYes
Retries with backoffNoYes
Delayed jobsNoYes
Priority queuesNoYes
Job progressNoYes
Multiple workersNoYes
src/index.ts
app.register(queuePlugin({
driver: 'bullmq',
config: {
url: process.env.REDIS_URL,
prefix: 'myapp:queue:',
defaultConcurrency: 5,
},
}));
.env
REDIS_URL=redis://user:password@your-redis-host:6379

In production, run workers as separate processes:

worker.ts
import { createWorker } from '@veloxts/queue';
import { SendWelcomeEmail, ProcessOrder } from './jobs';
const worker = await createWorker({
connection: { url: process.env.REDIS_URL },
jobs: [SendWelcomeEmail, ProcessOrder],
concurrency: 5,
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.close();
process.exit(0);
});
  1. Redis for persistence - Jobs survive restarts
  2. Separate worker processes - Don’t block web server
  3. Graceful shutdown - Let jobs complete
  4. Monitor failed jobs - Alert on failures
  5. Tune concurrency - Match workload
ProviderBest For
UpstashServerless, pay-per-request
Redis CloudManaged Redis clusters
RailwaySimple Redis add-on