📚 BullMQ basics
BullMQ é a fila padrão do ecossistema Node — sucessora do Bull, baseada em Redis. Três classes formam a base: Queue (produz jobs), Worker (consome) e QueueEvents (escuta lifecycle de qualquer processo). Toda comunicação acontece via Redis — nenhum processo conhece o outro diretamente.
Para agendamento de posts, a fila resolve dois problemas que setTimeout não resolve: durabilidade (sobreviver a restart do processo) e distribuição (vários workers consumindo em paralelo).
// src/queue/connection.ts
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
// Conexão dedicada — BullMQ exige maxRetriesPerRequest: null
export const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// Queue: produtor. Adicione jobs com queue.add()
export const publishQueue = new Queue('publish', { connection });
// QueueEvents: escuta lifecycle (completed, failed, delayed...)
export const publishEvents = new QueueEvents('publish', { connection });
publishEvents.on('completed', ({ jobId }) => {
console.log(`[queue] job ${jobId} completed`);
});
publishEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`[queue] job ${jobId} failed: ${failedReason}`);
});
💡 Dica prática
Nunca compartilhe a mesma IORedis entre Queue e código de aplicação. BullMQ exige maxRetriesPerRequest: null para suportar comandos blocking (BRPOPLPUSH) — se sua API usar a mesma instância para cache, vai quebrar.
Conceitos-chave
Produtor. Adiciona jobs no Redis.
Consumidor. Processa jobs em paralelo.
Pub/sub de lifecycle, opcional mas útil.
IORedis com config específica.
⏱️ Job com delay (agendado)
Para agendar um post para "daqui a 2 horas", calcule o delta em milissegundos e passe em { delay }. BullMQ guarda o job num sorted set do Redis ordenado pelo timestamp de processamento — quando o relógio bate, o job vai para a fila ativa.
Sempre use jobId determinístico (ex: post:${post.id}). Isso evita agendar o mesmo post duas vezes se o usuário clicar "salvar" rápido — BullMQ rejeita duplicatas com mesmo ID.
// src/scheduler/schedulePost.ts
import { publishQueue } from '../queue/connection';
export async function schedulePost(post: {
id: string;
providerId: string;
scheduledFor: Date;
payload: Record<string, unknown>;
}) {
const delay = post.scheduledFor.getTime() - Date.now();
if (delay < 0) {
throw new Error('scheduledFor está no passado');
}
const job = await publishQueue.add(
'publish',
{
postId: post.id,
providerId: post.providerId,
payload: post.payload,
},
{
jobId: `post:${post.id}`, // idempotente
delay, // ms até processar
attempts: 5, // ver tópico 4
backoff: { type: 'exponential', delay: 60_000 },
removeOnComplete: { age: 86_400 }, // mantém 24h após sucesso
removeOnFail: { age: 7 * 86_400 }, // 7 dias para investigar
}
);
return { jobId: job.id, runAt: post.scheduledFor };
}
// Cancelar agendamento (usuário editou ou deletou)
export async function cancelPost(postId: string) {
const job = await publishQueue.getJob(`post:${postId}`);
if (job) await job.remove();
}
⚙️ Anatomia das opções de job
- delay: ms até o job ficar elegível. Aceita até 24 dias (limite do Redis sorted set).
- jobId: ID custom — chave da idempotência. Sem ele, BullMQ gera um numérico sequencial.
- attempts: número total de tentativas (incluindo a primeira).
- removeOnComplete / removeOnFail: TTL no Redis. Sem isso a fila incha indefinidamente.
Conceitos-chave
Milissegundos até o job ser elegível.
Chave única — base da idempotência.
Estrutura Redis que ordena jobs por timestamp.
Mesmo jobId não duplica execução.
👷 Worker que publica
O Worker é um processo separado da API. Ele faz BRPOPLPUSH no Redis, pega o próximo job e chama o handler. Concurrency controla quantos jobs cada instância processa em paralelo — comece com 5 e ajuste medindo.
O handler resolve o Provider certo via factory (módulo 5.3), chama publish() e retorna o resultado. Qualquer throw dentro do handler dispara o retry — não engula erros.
// src/workers/publish.worker.ts
import { Worker } from 'bullmq';
import { connection } from '../queue/connection';
import { getProvider } from '../providers/registry';
import { prisma } from '../db/client';
type PublishJob = {
postId: string;
providerId: string; // 'twitter' | 'linkedin' | ...
payload: Record<string, unknown>;
};
export const publishWorker = new Worker<PublishJob>(
'publish',
async (job) => {
const { postId, providerId, payload } = job.data;
// 1. Recuperar credenciais do BD (não passar token via payload!)
const account = await prisma.account.findUniqueOrThrow({
where: { id: postId },
include: { credentials: true },
});
// 2. Resolver provider e publicar
const provider = getProvider(providerId);
const result = await provider.publish(payload, account.credentials);
// 3. Persistir resultado para auditoria
await prisma.post.update({
where: { id: postId },
data: {
status: 'published',
externalId: result.externalId,
publishedAt: new Date(),
},
});
return result; // vira job.returnvalue
},
{
connection,
concurrency: 5, // 5 jobs em paralelo por instância
limiter: { // rate limit global da fila
max: 60,
duration: 60_000, // 60 jobs/min
},
}
);
publishWorker.on('failed', (job, err) => {
console.error(`[worker] ${job?.id} failed:`, err.message);
});
publishWorker.on('completed', (job) => {
console.log(`[worker] ${job.id} ok in ${job.processedOn! - job.timestamp}ms`);
});
⚡ Dica prática
Rode o worker num processo Node separado da API (node dist/workers/publish.worker.js). Worker bloqueando o event loop da API web é o jeito mais rápido de derrubar o serviço — e impede escalar horizontalmente.
Conceitos-chave
Função async chamada por job.
Jobs paralelos por instância.
Rate limit global da fila.
Valor retornado fica armazenado no job.
🔄 Retry com backoff
APIs de redes sociais caem, dão 429 e 5xx transiente. Retry imediato piora a situação — backoff exponencial espalha as tentativas: 1min, 2min, 4min, 8min, 16min. Cinco tentativas cobrem ~30min de instabilidade.
Distinga erros recuperáveis (timeout, 429, 503) dos permanentes (401 token revogado, 400 payload inválido). Para os permanentes, lance UnrecoverableError — BullMQ pula os retries restantes e manda direto pra failed.
// src/workers/publish.worker.ts (handler refinado)
import { UnrecoverableError } from 'bullmq';
async (job) => {
try {
const provider = getProvider(job.data.providerId);
return await provider.publish(job.data.payload, credentials);
} catch (err: any) {
// 401/403: token quebrado — não adianta tentar de novo
if (err.status === 401 || err.status === 403) {
await prisma.post.update({
where: { id: job.data.postId },
data: { status: 'auth_failed' },
});
throw new UnrecoverableError(`auth: ${err.message}`);
}
// 400: payload errado — usuário precisa corrigir
if (err.status === 400) {
throw new UnrecoverableError(`invalid: ${err.message}`);
}
// 429: respeitar Retry-After do provider
if (err.status === 429 && err.retryAfter) {
await job.moveToDelayed(Date.now() + err.retryAfter * 1000, job.token!);
throw new Error('rate limited, delayed');
}
// Timeout, 5xx: retry com backoff
throw err;
}
}
// Job options já definidas no produtor:
// attempts: 5
// backoff: { type: 'exponential', delay: 60_000 }
// Resultado: 1min → 2min → 4min → 8min → 16min
✓ O que FAZER
- ✓Backoff exponencial com base ≥ 30s (evita martelar API).
- ✓
UnrecoverableErrorem 4xx que não é 429. - ✓Respeitar
Retry-Afterdo header. - ✓Logar tentativa atual (
job.attemptsMade).
✗ O que NÃO fazer
- ✗Retry fixo de 1 segundo — DDOS no provider.
- ✗
attempts: Infinity— job zumbi para sempre. - ✗Engolir erro com
catch {}vazio. - ✗Retry em 401 — só queima rate limit.
Conceitos-chave
Total de tentativas incluindo a primeira.
Espera crescente entre retries.
Erro que pula retries restantes.
Header HTTP que diz quando voltar.
💀 Dead letter queue
Quando um job esgota todas as attempts, ele vai para o estado failed — esse é o equivalente nativo de DLQ no BullMQ. Falha final = humano precisa olhar. Sem alerta, o post simplesmente não publica e o usuário descobre na semana seguinte.
Use QueueEvents num processo de monitoramento, ou hook on('failed') com job.attemptsMade === job.opts.attempts para disparar alerta apenas no esgotamento (não em cada tentativa).
// src/workers/dlq-monitor.ts
import { QueueEvents } from 'bullmq';
import { connection, publishQueue } from '../queue/connection';
import { sendAlert } from '../alerts/send';
const events = new QueueEvents('publish', { connection });
events.on('failed', async ({ jobId, failedReason, prev }) => {
const job = await publishQueue.getJob(jobId);
if (!job) return;
const final = job.attemptsMade >= (job.opts.attempts ?? 1);
if (!final) return; // ainda vai tentar de novo
// 1. Marcar post como falho no BD
await prisma.post.update({
where: { id: job.data.postId },
data: { status: 'failed', errorMessage: failedReason },
});
// 2. Alertar humano (Slack, e-mail, etc)
await sendAlert({
severity: 'high',
title: `Post ${job.data.postId} falhou em definitivo`,
body: `Provider: ${job.data.providerId}\nErro: ${failedReason}\nTentativas: ${job.attemptsMade}`,
action: `Reprocessar manualmente após investigar`,
});
});
// Reprocessar jobs failed (operação manual via admin)
export async function retryFailedJob(jobId: string) {
const job = await publishQueue.getJob(jobId);
if (!job) throw new Error('job não existe');
await job.retry();
}
// Listar todos os failed para dashboard
export async function listFailedJobs(limit = 50) {
return publishQueue.getJobs(['failed'], 0, limit - 1);
}
🚨 Por que alerta importa
- Job silencioso = post perdido: sem alerta, ninguém sabe que falhou.
- Token revogado em massa: 200 posts falhando em 5 minutos é incidente, não bug.
- Padrões revelam bugs: mesmo provider falhando sempre = problema no adapter.
- Reprocesso manual: admin precisa de UI para listar failed e fazer
job.retry().
Conceitos-chave
Estado failed = dead letter no BullMQ.
Notificação humana só na falha final.
Move failed → waiting de novo.
Mantenha failed jobs por dias para investigar.
🚀 Migrar para Temporal
BullMQ resolve o caso "agendar e republicar com retry". Mas quando o fluxo cresce — approval em múltiplas etapas, timers de dias, cancelamento condicional, compensação (postou no Twitter mas falhou no LinkedIn, o que fazer?) — você está reimplementando workflows em cima de jobs. É hora de olhar para Temporal.
Temporal oferece workflows duráveis: o código do workflow é replayed do log de eventos a cada falha, então variáveis locais, await sleep('7 days') e signals "simplesmente funcionam" mesmo com restart do worker no meio.
// Exemplo Temporal — workflow de publicação multi-provider
// src/workflows/publishPost.workflow.ts
import { proxyActivities, sleep, defineSignal, setHandler } from '@temporalio/workflow';
import type * as activities from '../activities';
const { publishToProvider, markPublished, markFailed } = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
retry: { initialInterval: '1m', backoffCoefficient: 2, maximumAttempts: 5 },
});
export const cancelSignal = defineSignal('cancel');
export async function publishPostWorkflow(input: {
postId: string;
providers: string[];
scheduledFor: string;
}) {
let cancelled = false;
setHandler(cancelSignal, () => { cancelled = true; });
// Espera durável — sobrevive a restarts de worker
const delay = new Date(input.scheduledFor).getTime() - Date.now();
await sleep(delay);
if (cancelled) return { status: 'cancelled' };
// Publicar em paralelo, capturar falhas individuais
const results = await Promise.allSettled(
input.providers.map((p) => publishToProvider(input.postId, p))
);
const ok = results.filter((r) => r.status === 'fulfilled');
const fail = results.filter((r) => r.status === 'rejected');
if (fail.length === 0) {
await markPublished(input.postId);
return { status: 'published', count: ok.length };
}
await markFailed(input.postId, fail.map((f: any) => f.reason.message));
return { status: 'partial', ok: ok.length, fail: fail.length };
}
⚖️ BullMQ vs Temporal
Continue com BullMQ se…
- • Jobs são single-step (publicar e acabou).
- • Delays até dias, não meses.
- • Time já conhece Redis.
- • Sem necessidade de orquestração entre múltiplas atividades.
Migre para Temporal se…
- • Workflows com múltiplas etapas e signals.
- • Timers longos (7 dias, 30 dias) precisam ser duráveis.
- • Compensação/saga entre serviços externos.
- • Visibilidade temporal de eventos é requisito.
🧭 Dica prática
Não migre por hype. Temporal adiciona um cluster inteiro (history, matching, frontend, worker) e uma curva de aprendizado real. Comece com BullMQ, e migre quando você se pegar implementando "estado de workflow numa tabela do Postgres" — esse é o sinal claro de que cruzou a fronteira.
Conceitos-chave
Função durável replayed do event log.
Side effect (chamada externa, BD).
Mensagem assíncrona para workflow rodando.
Estado preservado a restarts e crashes.
🎯 Resumo do Módulo
maxRetriesPerRequest: null.queue.add('publish', data, { delay, jobId }) com TTL e idempotência.getProvider(), concurrency e rate limit.attempts: 5, exponential delay e UnrecoverableError para 4xx fatais.job.retry() manual.Próximo Módulo:
5.5 — Multi-tenancy, billing e operação em produção