⚡ AutomationsAI|Portal de Cursos →

Verificando acesso...

MÓDULO 5.4

⏰ Fila e agendamento

BullMQ na prática: Queue, Worker, QueueEvents, jobs com delay, retry com backoff exponencial, dead letter queue e quando migrar para Temporal.

6
Tópicos
40
Minutos
Avançado
Nível
Prático
Tipo
1

📚 BullMQ basics

BullMQ é a fila padrão do ecossistema Node — sucessora do Bull, baseada em Redis. Três classes formam a base: Queue (produz jobs), Worker (consome) e QueueEvents (escuta lifecycle de qualquer processo). Toda comunicação acontece via Redis — nenhum processo conhece o outro diretamente.

Para agendamento de posts, a fila resolve dois problemas que setTimeout não resolve: durabilidade (sobreviver a restart do processo) e distribuição (vários workers consumindo em paralelo).

// src/queue/connection.ts
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';

// Conexão dedicada — BullMQ exige maxRetriesPerRequest: null
export const connection = new IORedis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
});

// Queue: produtor. Adicione jobs com queue.add()
export const publishQueue = new Queue('publish', { connection });

// QueueEvents: escuta lifecycle (completed, failed, delayed...)
export const publishEvents = new QueueEvents('publish', { connection });

publishEvents.on('completed', ({ jobId }) => {
  console.log(`[queue] job ${jobId} completed`);
});

publishEvents.on('failed', ({ jobId, failedReason }) => {
  console.error(`[queue] job ${jobId} failed: ${failedReason}`);
});

💡 Dica prática

Nunca compartilhe a mesma IORedis entre Queue e código de aplicação. BullMQ exige maxRetriesPerRequest: null para suportar comandos blocking (BRPOPLPUSH) — se sua API usar a mesma instância para cache, vai quebrar.

Conceitos-chave

Queue

Produtor. Adiciona jobs no Redis.

Worker

Consumidor. Processa jobs em paralelo.

QueueEvents

Pub/sub de lifecycle, opcional mas útil.

Connection

IORedis com config específica.

2

⏱️ Job com delay (agendado)

Para agendar um post para "daqui a 2 horas", calcule o delta em milissegundos e passe em { delay }. BullMQ guarda o job num sorted set do Redis ordenado pelo timestamp de processamento — quando o relógio bate, o job vai para a fila ativa.

Sempre use jobId determinístico (ex: post:${post.id}). Isso evita agendar o mesmo post duas vezes se o usuário clicar "salvar" rápido — BullMQ rejeita duplicatas com mesmo ID.

// src/scheduler/schedulePost.ts
import { publishQueue } from '../queue/connection';

export async function schedulePost(post: {
  id: string;
  providerId: string;
  scheduledFor: Date;
  payload: Record<string, unknown>;
}) {
  const delay = post.scheduledFor.getTime() - Date.now();

  if (delay < 0) {
    throw new Error('scheduledFor está no passado');
  }

  const job = await publishQueue.add(
    'publish',
    {
      postId: post.id,
      providerId: post.providerId,
      payload: post.payload,
    },
    {
      jobId: `post:${post.id}`,        // idempotente
      delay,                            // ms até processar
      attempts: 5,                      // ver tópico 4
      backoff: { type: 'exponential', delay: 60_000 },
      removeOnComplete: { age: 86_400 }, // mantém 24h após sucesso
      removeOnFail: { age: 7 * 86_400 }, // 7 dias para investigar
    }
  );

  return { jobId: job.id, runAt: post.scheduledFor };
}

// Cancelar agendamento (usuário editou ou deletou)
export async function cancelPost(postId: string) {
  const job = await publishQueue.getJob(`post:${postId}`);
  if (job) await job.remove();
}

⚙️ Anatomia das opções de job

  • delay: ms até o job ficar elegível. Aceita até 24 dias (limite do Redis sorted set).
  • jobId: ID custom — chave da idempotência. Sem ele, BullMQ gera um numérico sequencial.
  • attempts: número total de tentativas (incluindo a primeira).
  • removeOnComplete / removeOnFail: TTL no Redis. Sem isso a fila incha indefinidamente.

Conceitos-chave

delay

Milissegundos até o job ser elegível.

jobId

Chave única — base da idempotência.

Sorted set

Estrutura Redis que ordena jobs por timestamp.

Idempotência

Mesmo jobId não duplica execução.

3

👷 Worker que publica

O Worker é um processo separado da API. Ele faz BRPOPLPUSH no Redis, pega o próximo job e chama o handler. Concurrency controla quantos jobs cada instância processa em paralelo — comece com 5 e ajuste medindo.

O handler resolve o Provider certo via factory (módulo 5.3), chama publish() e retorna o resultado. Qualquer throw dentro do handler dispara o retry — não engula erros.

// src/workers/publish.worker.ts
import { Worker } from 'bullmq';
import { connection } from '../queue/connection';
import { getProvider } from '../providers/registry';
import { prisma } from '../db/client';

type PublishJob = {
  postId: string;
  providerId: string;        // 'twitter' | 'linkedin' | ...
  payload: Record<string, unknown>;
};

export const publishWorker = new Worker<PublishJob>(
  'publish',
  async (job) => {
    const { postId, providerId, payload } = job.data;

    // 1. Recuperar credenciais do BD (não passar token via payload!)
    const account = await prisma.account.findUniqueOrThrow({
      where: { id: postId },
      include: { credentials: true },
    });

    // 2. Resolver provider e publicar
    const provider = getProvider(providerId);
    const result = await provider.publish(payload, account.credentials);

    // 3. Persistir resultado para auditoria
    await prisma.post.update({
      where: { id: postId },
      data: {
        status: 'published',
        externalId: result.externalId,
        publishedAt: new Date(),
      },
    });

    return result; // vira job.returnvalue
  },
  {
    connection,
    concurrency: 5,            // 5 jobs em paralelo por instância
    limiter: {                  // rate limit global da fila
      max: 60,
      duration: 60_000,         // 60 jobs/min
    },
  }
);

publishWorker.on('failed', (job, err) => {
  console.error(`[worker] ${job?.id} failed:`, err.message);
});

publishWorker.on('completed', (job) => {
  console.log(`[worker] ${job.id} ok in ${job.processedOn! - job.timestamp}ms`);
});

Dica prática

Rode o worker num processo Node separado da API (node dist/workers/publish.worker.js). Worker bloqueando o event loop da API web é o jeito mais rápido de derrubar o serviço — e impede escalar horizontalmente.

Conceitos-chave

Handler

Função async chamada por job.

Concurrency

Jobs paralelos por instância.

Limiter

Rate limit global da fila.

returnvalue

Valor retornado fica armazenado no job.

4

🔄 Retry com backoff

APIs de redes sociais caem, dão 429 e 5xx transiente. Retry imediato piora a situação — backoff exponencial espalha as tentativas: 1min, 2min, 4min, 8min, 16min. Cinco tentativas cobrem ~30min de instabilidade.

Distinga erros recuperáveis (timeout, 429, 503) dos permanentes (401 token revogado, 400 payload inválido). Para os permanentes, lance UnrecoverableError — BullMQ pula os retries restantes e manda direto pra failed.

// src/workers/publish.worker.ts (handler refinado)
import { UnrecoverableError } from 'bullmq';

async (job) => {
  try {
    const provider = getProvider(job.data.providerId);
    return await provider.publish(job.data.payload, credentials);
  } catch (err: any) {
    // 401/403: token quebrado — não adianta tentar de novo
    if (err.status === 401 || err.status === 403) {
      await prisma.post.update({
        where: { id: job.data.postId },
        data: { status: 'auth_failed' },
      });
      throw new UnrecoverableError(`auth: ${err.message}`);
    }

    // 400: payload errado — usuário precisa corrigir
    if (err.status === 400) {
      throw new UnrecoverableError(`invalid: ${err.message}`);
    }

    // 429: respeitar Retry-After do provider
    if (err.status === 429 && err.retryAfter) {
      await job.moveToDelayed(Date.now() + err.retryAfter * 1000, job.token!);
      throw new Error('rate limited, delayed');
    }

    // Timeout, 5xx: retry com backoff
    throw err;
  }
}

// Job options já definidas no produtor:
// attempts: 5
// backoff: { type: 'exponential', delay: 60_000 }
// Resultado: 1min → 2min → 4min → 8min → 16min

✓ O que FAZER

  • Backoff exponencial com base ≥ 30s (evita martelar API).
  • UnrecoverableError em 4xx que não é 429.
  • Respeitar Retry-After do header.
  • Logar tentativa atual (job.attemptsMade).

✗ O que NÃO fazer

  • Retry fixo de 1 segundo — DDOS no provider.
  • attempts: Infinity — job zumbi para sempre.
  • Engolir erro com catch {} vazio.
  • Retry em 401 — só queima rate limit.

Conceitos-chave

attempts

Total de tentativas incluindo a primeira.

Backoff

Espera crescente entre retries.

Unrecoverable

Erro que pula retries restantes.

Retry-After

Header HTTP que diz quando voltar.

5

💀 Dead letter queue

Quando um job esgota todas as attempts, ele vai para o estado failed — esse é o equivalente nativo de DLQ no BullMQ. Falha final = humano precisa olhar. Sem alerta, o post simplesmente não publica e o usuário descobre na semana seguinte.

Use QueueEvents num processo de monitoramento, ou hook on('failed') com job.attemptsMade === job.opts.attempts para disparar alerta apenas no esgotamento (não em cada tentativa).

// src/workers/dlq-monitor.ts
import { QueueEvents } from 'bullmq';
import { connection, publishQueue } from '../queue/connection';
import { sendAlert } from '../alerts/send';

const events = new QueueEvents('publish', { connection });

events.on('failed', async ({ jobId, failedReason, prev }) => {
  const job = await publishQueue.getJob(jobId);
  if (!job) return;

  const final = job.attemptsMade >= (job.opts.attempts ?? 1);
  if (!final) return; // ainda vai tentar de novo

  // 1. Marcar post como falho no BD
  await prisma.post.update({
    where: { id: job.data.postId },
    data: { status: 'failed', errorMessage: failedReason },
  });

  // 2. Alertar humano (Slack, e-mail, etc)
  await sendAlert({
    severity: 'high',
    title: `Post ${job.data.postId} falhou em definitivo`,
    body: `Provider: ${job.data.providerId}\nErro: ${failedReason}\nTentativas: ${job.attemptsMade}`,
    action: `Reprocessar manualmente após investigar`,
  });
});

// Reprocessar jobs failed (operação manual via admin)
export async function retryFailedJob(jobId: string) {
  const job = await publishQueue.getJob(jobId);
  if (!job) throw new Error('job não existe');
  await job.retry();
}

// Listar todos os failed para dashboard
export async function listFailedJobs(limit = 50) {
  return publishQueue.getJobs(['failed'], 0, limit - 1);
}

🚨 Por que alerta importa

  • Job silencioso = post perdido: sem alerta, ninguém sabe que falhou.
  • Token revogado em massa: 200 posts falhando em 5 minutos é incidente, não bug.
  • Padrões revelam bugs: mesmo provider falhando sempre = problema no adapter.
  • Reprocesso manual: admin precisa de UI para listar failed e fazer job.retry().

Conceitos-chave

DLQ

Estado failed = dead letter no BullMQ.

Alerta

Notificação humana só na falha final.

job.retry()

Move failed → waiting de novo.

Auditoria

Mantenha failed jobs por dias para investigar.

6

🚀 Migrar para Temporal

BullMQ resolve o caso "agendar e republicar com retry". Mas quando o fluxo cresce — approval em múltiplas etapas, timers de dias, cancelamento condicional, compensação (postou no Twitter mas falhou no LinkedIn, o que fazer?) — você está reimplementando workflows em cima de jobs. É hora de olhar para Temporal.

Temporal oferece workflows duráveis: o código do workflow é replayed do log de eventos a cada falha, então variáveis locais, await sleep('7 days') e signals "simplesmente funcionam" mesmo com restart do worker no meio.

// Exemplo Temporal — workflow de publicação multi-provider
// src/workflows/publishPost.workflow.ts
import { proxyActivities, sleep, defineSignal, setHandler } from '@temporalio/workflow';
import type * as activities from '../activities';

const { publishToProvider, markPublished, markFailed } = proxyActivities<typeof activities>({
  startToCloseTimeout: '2 minutes',
  retry: { initialInterval: '1m', backoffCoefficient: 2, maximumAttempts: 5 },
});

export const cancelSignal = defineSignal('cancel');

export async function publishPostWorkflow(input: {
  postId: string;
  providers: string[];
  scheduledFor: string;
}) {
  let cancelled = false;
  setHandler(cancelSignal, () => { cancelled = true; });

  // Espera durável — sobrevive a restarts de worker
  const delay = new Date(input.scheduledFor).getTime() - Date.now();
  await sleep(delay);

  if (cancelled) return { status: 'cancelled' };

  // Publicar em paralelo, capturar falhas individuais
  const results = await Promise.allSettled(
    input.providers.map((p) => publishToProvider(input.postId, p))
  );

  const ok = results.filter((r) => r.status === 'fulfilled');
  const fail = results.filter((r) => r.status === 'rejected');

  if (fail.length === 0) {
    await markPublished(input.postId);
    return { status: 'published', count: ok.length };
  }

  await markFailed(input.postId, fail.map((f: any) => f.reason.message));
  return { status: 'partial', ok: ok.length, fail: fail.length };
}

⚖️ BullMQ vs Temporal

Continue com BullMQ se…

  • • Jobs são single-step (publicar e acabou).
  • • Delays até dias, não meses.
  • • Time já conhece Redis.
  • • Sem necessidade de orquestração entre múltiplas atividades.

Migre para Temporal se…

  • • Workflows com múltiplas etapas e signals.
  • • Timers longos (7 dias, 30 dias) precisam ser duráveis.
  • • Compensação/saga entre serviços externos.
  • • Visibilidade temporal de eventos é requisito.

🧭 Dica prática

Não migre por hype. Temporal adiciona um cluster inteiro (history, matching, frontend, worker) e uma curva de aprendizado real. Comece com BullMQ, e migre quando você se pegar implementando "estado de workflow numa tabela do Postgres" — esse é o sinal claro de que cruzou a fronteira.

Conceitos-chave

Workflow

Função durável replayed do event log.

Activity

Side effect (chamada externa, BD).

Signal

Mensagem assíncrona para workflow rodando.

Durabilidade

Estado preservado a restarts e crashes.

🎯 Resumo do Módulo

BullMQ basics — Queue, Worker e QueueEvents conectados a Redis com maxRetriesPerRequest: null.
Job com delayqueue.add('publish', data, { delay, jobId }) com TTL e idempotência.
Worker que publica — handler async, getProvider(), concurrency e rate limit.
Retry com backoffattempts: 5, exponential delay e UnrecoverableError para 4xx fatais.
Dead letter queue — failed = humano, alerta na falha final e job.retry() manual.
Quando migrar para Temporal — workflows duráveis, timers longos, signals e compensação.

Próximo Módulo:

5.5 — Multi-tenancy, billing e operação em produção