Files
Catalyst/server/jobs.js
josh 8f35724bde
All checks were successful
CI / test (pull_request) Successful in 13s
CI / build-dev (pull_request) Has been skipped
fix: queue on-create jobs sequentially and fix history ordering
runJobsOnCreate now awaits each job before starting the next,
ensuring they don't stomp each other's DB writes in parallel.

getInstanceHistory changed to ORDER BY changed_at ASC, id ASC so
the creation event (lowest id) is always first regardless of
same-second timestamps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 20:09:32 -04:00

151 lines
5.8 KiB
JavaScript

import { getJobs, getJob, getInstances, updateInstance, createJobRun, completeJobRun } from './db.js';
// ── Handlers ──────────────────────────────────────────────────────────────────
const TAILSCALE_API = 'https://api.tailscale.com/api/v2';
async function tailscaleSyncHandler(cfg) {
const { api_key, tailnet } = cfg;
if (!api_key || !tailnet) throw new Error('Tailscale not configured — set API key and tailnet');
const res = await fetch(
`${TAILSCALE_API}/tailnet/${encodeURIComponent(tailnet)}/devices`,
{ headers: { Authorization: `Bearer ${api_key}` } }
);
if (!res.ok) throw new Error(`Tailscale API ${res.status}`);
const { devices } = await res.json();
const tsMap = new Map(
devices.map(d => [d.hostname, (d.addresses ?? []).find(a => a.startsWith('100.')) ?? ''])
);
const instances = getInstances();
let updated = 0;
for (const inst of instances) {
const tsIp = tsMap.get(inst.name);
const matched = tsIp !== undefined;
const newTailscale = matched ? 1 : (inst.tailscale === 1 ? 0 : inst.tailscale);
const newIp = matched ? tsIp : (inst.tailscale === 1 ? '' : inst.tailscale_ip);
if (newTailscale !== inst.tailscale || newIp !== inst.tailscale_ip) {
const { id: _id, created_at: _ca, updated_at: _ua, ...instData } = inst;
updateInstance(inst.vmid, { ...instData, tailscale: newTailscale, tailscale_ip: newIp });
updated++;
}
}
return { summary: `${updated} updated of ${instances.length}` };
}
// ── Patchmon Sync ─────────────────────────────────────────────────────────────
async function patchmonSyncHandler(cfg) {
const { api_url, api_token } = cfg;
if (!api_url || !api_token) throw new Error('Patchmon not configured — set API URL and token');
const res = await fetch(api_url, {
headers: { Authorization: `Basic ${api_token}` },
});
if (!res.ok) throw new Error(`Patchmon API ${res.status}`);
const data = await res.json();
const items = Array.isArray(data) ? data : (data.hosts ?? data.data ?? []);
const hostSet = new Set(
items.map(h => (typeof h === 'string' ? h : (h.name ?? h.hostname ?? h.host ?? '')))
.filter(Boolean)
);
const instances = getInstances();
let updated = 0;
for (const inst of instances) {
const newPatchmon = hostSet.has(inst.name) ? 1 : 0;
if (newPatchmon !== inst.patchmon) {
const { id: _id, created_at: _ca, updated_at: _ua, ...instData } = inst;
updateInstance(inst.vmid, { ...instData, patchmon: newPatchmon });
updated++;
}
}
return { summary: `${updated} updated of ${instances.length}` };
}
// ── Semaphore Sync ────────────────────────────────────────────────────────────
async function semaphoreSyncHandler(cfg) {
const { api_url, api_token } = cfg;
if (!api_url || !api_token) throw new Error('Semaphore not configured — set API URL and token');
const res = await fetch(api_url, {
headers: { Authorization: `Bearer ${api_token}` },
});
if (!res.ok) throw new Error(`Semaphore API ${res.status}`);
const data = await res.json();
// Inventory is an Ansible INI string; extract bare hostnames
const hostSet = new Set(
(data.inventory ?? '').split('\n')
.map(l => l.trim())
.filter(l => l && !l.startsWith('[') && !l.startsWith('#') && !l.startsWith(';'))
.map(l => l.split(/[\s=]/)[0])
.filter(Boolean)
);
const instances = getInstances();
let updated = 0;
for (const inst of instances) {
const newSemaphore = hostSet.has(inst.name) ? 1 : 0;
if (newSemaphore !== inst.semaphore) {
const { id: _id, created_at: _ca, updated_at: _ua, ...instData } = inst;
updateInstance(inst.vmid, { ...instData, semaphore: newSemaphore });
updated++;
}
}
return { summary: `${updated} updated of ${instances.length}` };
}
// ── Registry ──────────────────────────────────────────────────────────────────
const HANDLERS = {
tailscale_sync: tailscaleSyncHandler,
patchmon_sync: patchmonSyncHandler,
semaphore_sync: semaphoreSyncHandler,
};
// ── Public API ────────────────────────────────────────────────────────────────
export async function runJob(jobId) {
const job = getJob(jobId);
if (!job) throw new Error('Job not found');
const handler = HANDLERS[job.key];
if (!handler) throw new Error(`No handler for '${job.key}'`);
const cfg = JSON.parse(job.config || '{}');
const runId = createJobRun(jobId);
try {
const result = await handler(cfg);
completeJobRun(runId, 'success', result.summary ?? '');
return result;
} catch (e) {
completeJobRun(runId, 'error', e.message);
throw e;
}
}
const _intervals = new Map();
export async function runJobsOnCreate() {
for (const job of getJobs()) {
const cfg = JSON.parse(job.config || '{}');
if (cfg.run_on_create) {
try { await runJob(job.id); } catch (e) { console.error(`runJobsOnCreate job ${job.id}:`, e); }
}
}
}
export function restartJobs() {
for (const iv of _intervals.values()) clearInterval(iv);
_intervals.clear();
for (const job of getJobs()) {
if (!job.enabled) continue;
const ms = Math.max(1, job.schedule || 15) * 60_000;
const id = job.id;
_intervals.set(id, setInterval(() => runJob(id).catch(() => {}), ms));
}
}