import type { WebhookEventName } from '@vector/shared'; import { prisma } from '@vector/db'; import * as webhooksSvc from '../services/webhooks.js'; import { logger } from './logger.js'; // Recursion guard: deliveries include this header so receivers know the payload // originated from Vector and can short-circuit echo loops. Worker-side BullMQ // delivery (planned in the Phase 7 follow-up) will honor the same header plus a // max-depth check. export const VECTOR_HOOK_HEADER = 'x-vector-webhook'; const DELIVERY_TIMEOUT_MS = 8_000; const MAX_ATTEMPTS = 3; const BACKOFF_MS = [0, 2_000, 10_000]; interface EmitOptions { event: WebhookEventName; payload: Record; } // Fire-and-forget: collects active subscriptions for the event and schedules delivery // to each. Never throws into caller. This is the interim in-process implementation; // the plan calls for a BullMQ worker — keep the signature stable so swapping stays // a one-line change in `emit`. export async function emit({ event, payload }: EmitOptions): Promise { const subs = await prisma .$transaction((tx) => webhooksSvc.listActiveForEvent(tx, event)) .catch((err) => { logger.warn({ err, event }, 'webhook emit: subscription lookup failed'); return []; }); if (subs.length === 0) return; const body = JSON.stringify({ event, data: payload, emittedAt: new Date().toISOString() }); for (const sub of subs) { if (!sub.secret) continue; void deliver(sub.id, sub.url, sub.secret, body, event).catch((err) => { logger.warn({ err, event, subId: sub.id }, 'webhook delivery crashed'); }); } } async function deliver( subId: string, url: string, secret: string, body: string, event: WebhookEventName, ): Promise { for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt += 1) { const wait = BACKOFF_MS[attempt] ?? 0; if (wait > 0) await new Promise((r) => setTimeout(r, wait)); const timestamp = Math.floor(Date.now() / 1000); const signature = webhooksSvc.signBody(secret, body, timestamp); const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), DELIVERY_TIMEOUT_MS); try { const res = await fetch(url, { method: 'POST', headers: { 'content-type': 'application/json', [VECTOR_HOOK_HEADER]: 'v1', 'x-vector-event': event, 'x-vector-timestamp': String(timestamp), 'x-vector-signature': signature, }, body, signal: controller.signal, }); clearTimeout(timeout); if (res.ok) { logger.debug({ subId, event, status: res.status, attempt }, 'webhook delivered'); return; } logger.warn( { subId, event, status: res.status, attempt }, 'webhook non-2xx, will retry', ); } catch (err) { clearTimeout(timeout); logger.warn({ err, subId, event, attempt }, 'webhook delivery error'); } } logger.error({ subId, event }, 'webhook delivery exhausted retries'); }