Files
Vector/apps/api/src/lib/webhook-emitter.ts
T
josh 7c0d422228
CI / Lint · Typecheck · Test · Build (push) Failing after 5m41s
CI / Playwright (smoke) (push) Has been skipped
chore: initial Vector 2.0 monorepo
Ground-up TypeScript rewrite of the Vector hardware parts inventory
system. Ships the full roadmap (Phases 0-8) in one initial commit:

- pnpm + Turbo monorepo: apps/{api,web,e2e}, packages/{db,shared,ui,config}
- Express 5 + Prisma 5 + zod validation + JWT w/ refresh-token rotation
- React 19 + Vite + shadcn/ui + TanStack Query/Table + nuqs URL state
- Repair/RMA, tags, bulk ops, saved views, CSV audit export
- Analytics dashboard on Recharts + EOL tracking
- Signed webhook subscriptions (HMAC-SHA256) with in-process emitter
- Vitest unit tests (shared schemas, api services/helpers) + Playwright skeleton
- Gitea Actions CI (lint, typecheck, test+coverage, build) + Renovate

Deferred follow-ups: Postgres cutover (data-migration script ready),
BullMQ worker for webhook delivery, @react-pdf PDF export, CSV import wizard.
2026-04-16 20:52:32 -04:00

85 lines
3.0 KiB
TypeScript

import type { WebhookEventName } from '@vector/shared';
import { prisma } from '@vector/db';
import * as webhooksSvc from '../services/webhooks.js';
import { logger } from './logger.js';
// Recursion guard: deliveries include this header so receivers know the payload
// originated from Vector and can short-circuit echo loops. Worker-side BullMQ
// delivery (planned in the Phase 7 follow-up) will honor the same header plus a
// max-depth check.
export const VECTOR_HOOK_HEADER = 'x-vector-webhook';
const DELIVERY_TIMEOUT_MS = 8_000;
const MAX_ATTEMPTS = 3;
const BACKOFF_MS = [0, 2_000, 10_000];
interface EmitOptions {
event: WebhookEventName;
payload: Record<string, unknown>;
}
// Fire-and-forget: collects active subscriptions for the event and schedules delivery
// to each. Never throws into caller. This is the interim in-process implementation;
// the plan calls for a BullMQ worker — keep the signature stable so swapping stays
// a one-line change in `emit`.
export async function emit({ event, payload }: EmitOptions): Promise<void> {
const subs = await prisma
.$transaction((tx) => webhooksSvc.listActiveForEvent(tx, event))
.catch((err) => {
logger.warn({ err, event }, 'webhook emit: subscription lookup failed');
return [];
});
if (subs.length === 0) return;
const body = JSON.stringify({ event, data: payload, emittedAt: new Date().toISOString() });
for (const sub of subs) {
if (!sub.secret) continue;
void deliver(sub.id, sub.url, sub.secret, body, event).catch((err) => {
logger.warn({ err, event, subId: sub.id }, 'webhook delivery crashed');
});
}
}
async function deliver(
subId: string,
url: string,
secret: string,
body: string,
event: WebhookEventName,
): Promise<void> {
for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt += 1) {
const wait = BACKOFF_MS[attempt] ?? 0;
if (wait > 0) await new Promise((r) => setTimeout(r, wait));
const timestamp = Math.floor(Date.now() / 1000);
const signature = webhooksSvc.signBody(secret, body, timestamp);
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), DELIVERY_TIMEOUT_MS);
try {
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
[VECTOR_HOOK_HEADER]: 'v1',
'x-vector-event': event,
'x-vector-timestamp': String(timestamp),
'x-vector-signature': signature,
},
body,
signal: controller.signal,
});
clearTimeout(timeout);
if (res.ok) {
logger.debug({ subId, event, status: res.status, attempt }, 'webhook delivered');
return;
}
logger.warn(
{ subId, event, status: res.status, attempt },
'webhook non-2xx, will retry',
);
} catch (err) {
clearTimeout(timeout);
logger.warn({ err, subId, event, attempt }, 'webhook delivery error');
}
}
logger.error({ subId, event }, 'webhook delivery exhausted retries');
}