Files
Catalyst/server/db.js
josh 8f35724bde
All checks were successful
CI / test (pull_request) Successful in 13s
CI / build-dev (pull_request) Has been skipped
fix: queue on-create jobs sequentially and fix history ordering
runJobsOnCreate now awaits each job before starting the next,
ensuring they don't stomp each other's DB writes in parallel.

getInstanceHistory changed to ORDER BY changed_at ASC, id ASC so
the creation event (lowest id) is always first regardless of
same-second timestamps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 20:09:32 -04:00

348 lines
15 KiB
JavaScript

import { DatabaseSync } from 'node:sqlite';
import { mkdirSync } from 'fs';
import { dirname, join } from 'path';
import { fileURLToPath } from 'url';
const __dirname = dirname(fileURLToPath(import.meta.url));
const DEFAULT_PATH = join(__dirname, '../data/catalyst.db');
let db;
function init(path) {
if (path !== ':memory:') {
mkdirSync(dirname(path), { recursive: true });
}
db = new DatabaseSync(path);
db.exec('PRAGMA journal_mode = WAL');
db.exec('PRAGMA foreign_keys = ON');
db.exec('PRAGMA synchronous = NORMAL');
createSchema();
if (path !== ':memory:') { seed(); seedJobs(); }
}
function createSchema() {
db.exec(`
CREATE TABLE IF NOT EXISTS instances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL CHECK(length(name) BETWEEN 1 AND 100),
state TEXT NOT NULL DEFAULT 'deployed'
CHECK(state IN ('deployed','testing','degraded')),
stack TEXT NOT NULL DEFAULT 'development'
CHECK(stack IN ('production','development')),
vmid INTEGER NOT NULL UNIQUE CHECK(vmid > 0),
atlas INTEGER NOT NULL DEFAULT 0 CHECK(atlas IN (0,1)),
argus INTEGER NOT NULL DEFAULT 0 CHECK(argus IN (0,1)),
semaphore INTEGER NOT NULL DEFAULT 0 CHECK(semaphore IN (0,1)),
patchmon INTEGER NOT NULL DEFAULT 0 CHECK(patchmon IN (0,1)),
tailscale INTEGER NOT NULL DEFAULT 0 CHECK(tailscale IN (0,1)),
andromeda INTEGER NOT NULL DEFAULT 0 CHECK(andromeda IN (0,1)),
tailscale_ip TEXT NOT NULL DEFAULT '',
hardware_acceleration INTEGER NOT NULL DEFAULT 0 CHECK(hardware_acceleration IN (0,1)),
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_instances_state ON instances(state);
CREATE INDEX IF NOT EXISTS idx_instances_stack ON instances(stack);
CREATE TABLE IF NOT EXISTS instance_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vmid INTEGER NOT NULL,
field TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
changed_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_history_vmid ON instance_history(vmid);
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
enabled INTEGER NOT NULL DEFAULT 0 CHECK(enabled IN (0,1)),
schedule INTEGER NOT NULL DEFAULT 15,
config TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS job_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL,
started_at TEXT NOT NULL DEFAULT (datetime('now')),
ended_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running','success','error')),
result TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_job_runs_job_id ON job_runs(job_id);
`);
}
const SEED = [
{ name: 'plex', state: 'deployed', stack: 'production', vmid: 117, atlas: 1, argus: 1, semaphore: 0, patchmon: 1, tailscale: 1, andromeda: 0, tailscale_ip: '100.64.0.1', hardware_acceleration: 1 },
{ name: 'foldergram', state: 'testing', stack: 'development', vmid: 137, atlas: 0, argus: 0, semaphore: 0, patchmon: 0, tailscale: 0, andromeda: 0, tailscale_ip: '', hardware_acceleration: 0 },
{ name: 'homeassistant', state: 'deployed', stack: 'production', vmid: 102, atlas: 1, argus: 1, semaphore: 1, patchmon: 1, tailscale: 1, andromeda: 0, tailscale_ip: '100.64.0.5', hardware_acceleration: 0 },
{ name: 'gitea', state: 'deployed', stack: 'production', vmid: 110, atlas: 1, argus: 0, semaphore: 1, patchmon: 1, tailscale: 1, andromeda: 0, tailscale_ip: '100.64.0.8', hardware_acceleration: 0 },
{ name: 'postgres-primary', state: 'degraded', stack: 'production', vmid: 201, atlas: 1, argus: 1, semaphore: 0, patchmon: 1, tailscale: 0, andromeda: 1, tailscale_ip: '', hardware_acceleration: 0 },
{ name: 'nextcloud', state: 'testing', stack: 'development', vmid: 144, atlas: 0, argus: 0, semaphore: 0, patchmon: 0, tailscale: 1, andromeda: 0, tailscale_ip: '100.64.0.12', hardware_acceleration: 0 },
{ name: 'traefik', state: 'deployed', stack: 'production', vmid: 100, atlas: 1, argus: 1, semaphore: 0, patchmon: 1, tailscale: 1, andromeda: 0, tailscale_ip: '100.64.0.2', hardware_acceleration: 0 },
{ name: 'monitoring-stack', state: 'testing', stack: 'development', vmid: 155, atlas: 0, argus: 0, semaphore: 1, patchmon: 0, tailscale: 0, andromeda: 0, tailscale_ip: '', hardware_acceleration: 0 },
];
function seed() {
const count = db.prepare('SELECT COUNT(*) as n FROM instances').get().n;
if (count > 0) return;
const insert = db.prepare(`
INSERT INTO instances
(name, state, stack, vmid, atlas, argus, semaphore, patchmon,
tailscale, andromeda, tailscale_ip, hardware_acceleration)
VALUES
(@name, @state, @stack, @vmid, @atlas, @argus, @semaphore, @patchmon,
@tailscale, @andromeda, @tailscale_ip, @hardware_acceleration)
`);
db.exec('BEGIN');
for (const s of SEED) insert.run(s);
db.exec('COMMIT');
}
function seedJobs() {
const upsert = db.prepare(`
INSERT OR IGNORE INTO jobs (key, name, description, enabled, schedule, config)
VALUES (?, ?, ?, ?, ?, ?)
`);
const apiKey = getConfig('tailscale_api_key');
const tailnet = getConfig('tailscale_tailnet');
const tsSchedule = parseInt(getConfig('tailscale_poll_minutes', '15'), 10) || 15;
const tsEnabled = getConfig('tailscale_enabled') === '1' ? 1 : 0;
upsert.run('tailscale_sync', 'Tailscale Sync',
'Syncs Tailscale device status and IPs to instances by matching hostnames.',
tsEnabled, tsSchedule, JSON.stringify({ api_key: apiKey, tailnet }));
upsert.run('patchmon_sync', 'Patchmon Sync',
'Syncs Patchmon host registration status to instances by matching hostnames.',
0, 60, JSON.stringify({ api_url: 'http://patchmon:3000/api/v1/api/hosts', api_token: '' }));
upsert.run('semaphore_sync', 'Semaphore Sync',
'Syncs Semaphore inventory membership to instances by matching hostnames.',
0, 60, JSON.stringify({ api_url: 'http://semaphore:3000/api/project/1/inventory/1', api_token: '' }));
}
// ── Queries ───────────────────────────────────────────────────────────────────
export function getInstances(filters = {}) {
const parts = ['SELECT * FROM instances WHERE 1=1'];
const params = {};
if (filters.search) {
parts.push('AND (name LIKE @search OR CAST(vmid AS TEXT) LIKE @search OR stack LIKE @search)');
params.search = `%${filters.search}%`;
}
if (filters.state) { parts.push('AND state = @state'); params.state = filters.state; }
if (filters.stack) { parts.push('AND stack = @stack'); params.stack = filters.stack; }
parts.push('ORDER BY name ASC');
return db.prepare(parts.join(' ')).all(params);
}
export function getInstance(vmid) {
return db.prepare('SELECT * FROM instances WHERE vmid = ?').get(vmid) ?? null;
}
export function getDistinctStacks() {
return db.prepare(`SELECT DISTINCT stack FROM instances WHERE stack != '' ORDER BY stack`)
.all().map(r => r.stack);
}
// ── Mutations ─────────────────────────────────────────────────────────────────
const HISTORY_FIELDS = [
'name', 'state', 'stack', 'vmid', 'tailscale_ip',
'atlas', 'argus', 'semaphore', 'patchmon', 'tailscale', 'andromeda',
'hardware_acceleration',
];
export function createInstance(data) {
db.prepare(`
INSERT INTO instances
(name, state, stack, vmid, atlas, argus, semaphore, patchmon,
tailscale, andromeda, tailscale_ip, hardware_acceleration)
VALUES
(@name, @state, @stack, @vmid, @atlas, @argus, @semaphore, @patchmon,
@tailscale, @andromeda, @tailscale_ip, @hardware_acceleration)
`).run(data);
db.prepare(
`INSERT INTO instance_history (vmid, field, old_value, new_value) VALUES (?, 'created', NULL, NULL)`
).run(data.vmid);
}
export function updateInstance(vmid, data) {
const old = getInstance(vmid);
db.prepare(`
UPDATE instances SET
name=@name, state=@state, stack=@stack, vmid=@newVmid,
atlas=@atlas, argus=@argus, semaphore=@semaphore, patchmon=@patchmon,
tailscale=@tailscale, andromeda=@andromeda, tailscale_ip=@tailscale_ip,
hardware_acceleration=@hardware_acceleration, updated_at=datetime('now')
WHERE vmid=@vmid
`).run({ ...data, newVmid: data.vmid, vmid });
const newVmid = data.vmid;
const insertEvt = db.prepare(
`INSERT INTO instance_history (vmid, field, old_value, new_value) VALUES (?, ?, ?, ?)`
);
for (const field of HISTORY_FIELDS) {
const oldVal = String(old[field] ?? '');
const newVal = String(field === 'vmid' ? newVmid : (data[field] ?? ''));
if (oldVal !== newVal) insertEvt.run(newVmid, field, oldVal, newVal);
}
}
export function deleteInstance(vmid) {
db.prepare('DELETE FROM instance_history WHERE vmid = ?').run(vmid);
db.prepare('DELETE FROM instances WHERE vmid = ?').run(vmid);
}
export function importInstances(rows, historyRows = []) {
db.exec('BEGIN');
db.exec('DELETE FROM instance_history');
db.exec('DELETE FROM instances');
const insert = db.prepare(`
INSERT INTO instances
(name, state, stack, vmid, atlas, argus, semaphore, patchmon,
tailscale, andromeda, tailscale_ip, hardware_acceleration)
VALUES
(@name, @state, @stack, @vmid, @atlas, @argus, @semaphore, @patchmon,
@tailscale, @andromeda, @tailscale_ip, @hardware_acceleration)
`);
for (const row of rows) insert.run(row);
if (historyRows.length) {
const insertHist = db.prepare(
`INSERT INTO instance_history (vmid, field, old_value, new_value, changed_at) VALUES (?, ?, ?, ?, ?)`
);
for (const h of historyRows) insertHist.run(h.vmid, h.field, h.old_value ?? null, h.new_value ?? null, h.changed_at);
}
db.exec('COMMIT');
}
export function getInstanceHistory(vmid) {
return db.prepare(
'SELECT * FROM instance_history WHERE vmid = ? ORDER BY changed_at ASC, id ASC'
).all(vmid);
}
export function getAllHistory() {
return db.prepare('SELECT * FROM instance_history ORDER BY vmid, changed_at').all();
}
export function getAllJobs() {
return db.prepare('SELECT id, key, name, description, enabled, schedule, config FROM jobs ORDER BY id').all();
}
export function getAllJobRuns() {
return db.prepare('SELECT * FROM job_runs ORDER BY job_id, id').all();
}
export function importJobs(jobRows, jobRunRows = []) {
db.exec('BEGIN');
db.exec('DELETE FROM job_runs');
db.exec('DELETE FROM jobs');
const insertJob = db.prepare(`
INSERT INTO jobs (id, key, name, description, enabled, schedule, config)
VALUES (@id, @key, @name, @description, @enabled, @schedule, @config)
`);
for (const j of jobRows) insertJob.run(j);
if (jobRunRows.length) {
const insertRun = db.prepare(`
INSERT INTO job_runs (id, job_id, started_at, ended_at, status, result)
VALUES (@id, @job_id, @started_at, @ended_at, @status, @result)
`);
for (const r of jobRunRows) insertRun.run(r);
}
db.exec('COMMIT');
}
export function getConfig(key, defaultVal = '') {
const row = db.prepare('SELECT value FROM config WHERE key = ?').get(key);
return row ? row.value : defaultVal;
}
export function setConfig(key, value) {
db.prepare(
`INSERT INTO config (key, value) VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value`
).run(key, String(value));
}
// ── Jobs ──────────────────────────────────────────────────────────────────────
const JOB_WITH_LAST_RUN = `
SELECT j.*,
r.id AS last_run_id,
r.started_at AS last_run_at,
r.status AS last_status,
r.result AS last_result
FROM jobs j
LEFT JOIN job_runs r
ON r.id = (SELECT id FROM job_runs WHERE job_id = j.id ORDER BY id DESC LIMIT 1)
`;
export function getJobs() {
return db.prepare(JOB_WITH_LAST_RUN + ' ORDER BY j.id').all();
}
export function getJob(id) {
return db.prepare(JOB_WITH_LAST_RUN + ' WHERE j.id = ?').get(id) ?? null;
}
export function createJob(data) {
db.prepare(`
INSERT INTO jobs (key, name, description, enabled, schedule, config)
VALUES (@key, @name, @description, @enabled, @schedule, @config)
`).run(data);
}
export function updateJob(id, { enabled, schedule, config }) {
db.prepare(`
UPDATE jobs SET enabled=@enabled, schedule=@schedule, config=@config WHERE id=@id
`).run({ id, enabled, schedule, config });
}
export function createJobRun(jobId) {
return Number(db.prepare('INSERT INTO job_runs (job_id) VALUES (?)').run(jobId).lastInsertRowid);
}
export function completeJobRun(runId, status, result) {
db.prepare(`
UPDATE job_runs SET ended_at=datetime('now'), status=@status, result=@result WHERE id=@id
`).run({ id: runId, status, result });
}
export function getJobRuns(jobId) {
return db.prepare('SELECT * FROM job_runs WHERE job_id = ? ORDER BY id DESC').all(jobId);
}
// ── Test helpers ──────────────────────────────────────────────────────────────
export function _resetForTest() {
if (db) db.close();
init(':memory:');
}
// ── Boot ──────────────────────────────────────────────────────────────────────
// Skipped in test environment — parallel Vitest workers would race to open
// the same file, causing "database is locked". _resetForTest() in beforeEach
// handles initialisation for every test worker using :memory: instead.
if (process.env.NODE_ENV !== 'test') {
const DB_PATH = process.env.DB_PATH ?? DEFAULT_PATH;
try {
init(DB_PATH);
} catch (e) {
console.error('[catalyst] fatal: could not open database at', DB_PATH);
console.error('[catalyst] ensure the data directory exists and is writable by the server process.');
console.error(e);
process.exit(1);
}
}