Files
Vetting/internal/api/agent_handlers.go
T
josh 23c689aa5b
CI / Lint + build + test (push) Failing after 1m57s
Release / release (push) Has been cancelled
deep profile + threshold gating + firmware stage + Burn super-stage
Ships all five phases of the deep-profile overhaul together. Runs now
carry a profile (quick/deep/soak); every profile walks the same
11-stage order — Inventory → Firmware → SpecValidate → SMART →
CPUStress → Storage → Network → Burn → GPU → PSU → Reporting —
with only per-stage durations and concurrency scaled.

Phase 1: profiles.ProfileRegistry loaded from vetting.yaml; runs.profile
column + CreateWithProfile; threshold table + evaluator seeded per-run
from the shared vetting.thresholds block; breach flips result at
/sensor + /result.

Phase 2: upgraded CPUStress (stress-ng --cpu-method=all --verify +
EDAC/MCE poll), Storage (fio --verify=md5 + SMART start/end delta),
Network (sustained iperf + /proc/net/dev deltas) with per-profile
knobs from Deps.

Phase 3: Burn super-stage with goroutine fan-out for CPU + memory +
fio + iperf, PSU rails sampled across the Burn window, SensorMux
(2 s flush, 500-sample cap) to absorb backpressure.

Phase 4: Firmware stage + firmware_snapshots table; probes dmidecode
(BIOS), ipmitool (BMC), ethtool -i (NIC), nvme (sysfs + id-ctrl),
lspci (HBA), /proc/cpuinfo (microcode). spec.DiffFirmware folds into
SpecValidate with pin-by-identifier and fan-out-across-component
matching; mismatches park the run in FailedHolding.

Phase 5: profile radio on the host start form, profile chip on the
run header, Firmware section in the HTML report, coverage artifact
uploaded from CI, agent/tests/fakes/ scaffold with Deps.LookPath
seam + stress_ng and dmidecode example fakes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 22:50:57 -04:00

1312 lines
44 KiB
Go

package api
import (
"context"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"vetting/internal/config"
"vetting/internal/events"
"vetting/internal/hold"
"vetting/internal/logs"
"vetting/internal/model"
"vetting/internal/notify"
"vetting/internal/orchestrator"
"vetting/internal/pxe"
"vetting/internal/report"
"vetting/internal/spec"
"vetting/internal/store"
)
// Agent collects the collaborators used by agent-facing HTTP routes:
// the iPXE chainload endpoint and the /api/v1/runs/:id/* endpoints.
type Agent struct {
Hosts *store.Hosts
Runs *store.Runs
Stages *store.Stages
SubSteps *store.SubSteps
Artifacts *store.Artifacts
SpecDiffs *store.SpecDiffs
Measurements *store.Measurements
Thresholds *store.Thresholds // Phase 1: seeded per run; consulted on each /sensor batch
Firmware *store.Firmware // Phase 4: firmware snapshots (unused before then)
Profiles *config.ProfileRegistry // Phase 2: /claim resolves the run's profile → stage knobs
Runner *orchestrator.Runner
EventHub *events.Hub
Logs *logs.Hub
Notify *notify.Registry
ArtifactsDir string // ./var/artifacts
OrchestratorURL string // baked into iPXE cmdline
PublicURL string // user-visible URL base for notification click-throughs
LiveKernelURL string
LiveInitrdURL string
TLSCertFPR string // optional; empty = skip pinning
IperfPort int // orchestrator-supervised iperf3 port; 0 = 5201
}
// IPXEScript serves a per-MAC iPXE script. Called by iPXE itself after
// dnsmasq hands it the chainload URL. Unknown MAC → halt script.
// Known MAC with no active run → poweroff script. Known MAC with active
// run → real boot script; the fetch triggers PXEObserved.
func (a *Agent) IPXEScript(w http.ResponseWriter, r *http.Request) {
mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac")))
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
if !macRe.MatchString(mac) {
log.Printf("ipxe: rejected malformed mac %q from %s", mac, r.RemoteAddr)
_, _ = w.Write([]byte(pxe.NotRegisteredScript(mac)))
return
}
run, err := a.Runs.FindActiveByMAC(r.Context(), mac)
if err != nil {
log.Printf("ipxe: find run by mac %s: %v", mac, err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
if run == nil {
_, _ = w.Write([]byte(pxe.NoActiveRunScript(mac)))
return
}
// The token hash in the DB is the sha256 of the plaintext. The
// plaintext itself cannot be recovered from the hash — we issued it
// once when the run was created. For iPXE we re-issue a fresh token
// on every PXE fetch: this is safe because the hash in the DB is
// rewritten to match and only the most recent PXE can be claimed.
plain, hash, err := orchestrator.IssueRunToken()
if err != nil {
http.Error(w, "token", http.StatusInternalServerError)
return
}
if err := a.Runs.RotateTokenHash(r.Context(), run.ID, hash); err != nil {
log.Printf("ipxe: rotate token run %d: %v", run.ID, err)
http.Error(w, "token", http.StatusInternalServerError)
return
}
script := pxe.BuildScript(pxe.IPXEParams{
OrchestratorURL: a.OrchestratorURL,
LiveKernelURL: a.LiveKernelURL,
LiveInitrdURL: a.LiveInitrdURL,
TLSCertFPR: a.TLSCertFPR,
RunID: run.ID,
MAC: mac,
Token: plain,
})
_, _ = w.Write([]byte(script))
// iPXE has now fetched the script — treat this as PXEObserved. If we
// were already in Booting the transition table allows staying.
if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerPXEObserved); err != nil {
// Non-fatal: the agent may still claim via /claim.
log.Printf("ipxe: PXEObserved for run %d: %v", run.ID, err)
}
}
// Hello is the first call an agent makes once userspace is up. It's
// idempotent and only writes a log line; the authoritative transition
// comes from /claim. The agent sends Hello early so operators see a
// signal in the tile even before the token is validated.
func (a *Agent) Hello(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
log.Printf("agent hello: run=%d remote=%s", runID, r.RemoteAddr)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "run_id": runID})
}
// Claim is the binding call: the agent proves it holds the plaintext
// token for this run, and in return the orchestrator transitions to
// InventoryCheck and seeds the stage rows. All destructive actions the
// agent takes later require a prior successful claim.
func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
var body struct {
AgentIP string `json:"agent_ip"`
}
if r.Body != nil {
// agent_ip is informational; if missing fall back to RemoteAddr.
_ = json.NewDecoder(r.Body).Decode(&body)
}
agentIP := strings.TrimSpace(body.AgentIP)
if agentIP == "" {
if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
agentIP = host
} else {
agentIP = r.RemoteAddr
}
}
// First claim seeds the stage rows; subsequent claims are a no-op
// so agent retries after transient network failures stay safe.
if len(mustListStages(a.Stages, r, runID)) == 0 {
if err := a.Stages.Seed(r.Context(), runID); err != nil {
log.Printf("claim: seed stages run %d: %v", runID, err)
http.Error(w, "seed stages", http.StatusInternalServerError)
return
}
}
// Drive the transition. If we're already past Booting this returns
// an error — treat as "already claimed" and report OK, don't 500.
if run.State == model.StateWaitingWoL || run.State == model.StateBooting {
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerAgentClaimed); err != nil {
log.Printf("claim: transition run %d: %v", runID, err)
http.Error(w, "transition", http.StatusConflict)
return
}
}
// Re-fetch run state: the Transition above may have advanced us from
// Booting → InventoryCheck, and we want to hand that fresh state to
// the agent so a re-claim after a crash resumes at the stored state
// instead of silently replaying Inventory.
currentState := run.State
if fresh, err := a.Runs.Get(r.Context(), runID); err == nil && fresh != nil {
currentState = fresh.State
}
log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP)
if a.Logs != nil {
if w, err := a.Logs.WriterFor(runID); err == nil {
w.Append(logs.Line{Level: "info", Text: fmt.Sprintf("agent claimed from %s — entering Inventory", agentIP)})
}
}
// Stage-driven agent needs a bit of per-run config: the device
// allowlist (serial + expected size) for Storage, and the iperf3
// server port for Network. Parse the host's expected spec here so
// the agent doesn't need to read YAML.
expectedDisks := []map[string]any{}
if host, err := a.Hosts.Get(r.Context(), run.HostID); err == nil && host != nil {
if parsed, err := spec.Parse(host.ExpectedSpecYAML); err == nil && parsed != nil {
for _, dd := range parsed.Disks {
expectedDisks = append(expectedDisks, map[string]any{
"serial": dd.Serial,
"size_gb": dd.SizeGB,
})
}
}
}
iperfPort := a.IperfPort
if iperfPort == 0 {
iperfPort = 5201
}
// Resolve the run's profile → agent-visible stage knobs. The agent
// reads these to size CPUStress / Storage / Network work. An empty
// profile (legacy runs seeded before Phase 1) falls back to "quick".
profileName := run.Profile
if profileName == "" {
profileName = config.ProfileQuick
}
var stageCfg config.StageConfig
if a.Profiles != nil {
stageCfg = a.Profiles.ResolveStageConfig(profileName)
} else {
stageCfg = config.StageConfig{Profile: profileName}
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"run_id": runID,
"stages": store.DefaultStageOrder,
"expected_disks": expectedDisks,
"iperf_port": iperfPort,
"non_destructive": run.NonDestructive,
"current_state": string(currentState),
"stage_config": stageCfg,
})
}
// Heartbeat is the agent's periodic liveness ping. The response body
// acts as a control channel: cmd=continue is the normal case; cmd=abort
// once the run enters FailedHolding/Released; cmd=retry_stage when the
// operator has overridden a failed stage (wipe-probe override).
func (a *Agent) Heartbeat(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
a.Runner.TouchHeartbeat(runID)
cmd := "continue"
resp := map[string]any{"state": run.State}
switch {
case run.State == model.StateCompleted:
// Pipeline succeeded — agent should power the host down.
cmd = "shutdown"
case run.State == model.StateCancelled:
// Operator clicked Cancel — agent cancels the active stage ctx,
// posts a cancelled outcome, and powers off.
cmd = "cancel_stage"
case run.State == model.StateFailedHolding || run.State == model.StateReleased:
cmd = "abort"
case run.FailedStage == "Storage" && overrideWipeSet(run.OverrideFlagsJSON):
// Operator pressed "Override wipe & retry". Agent should
// re-enter Storage with the wipe-probe bypass armed.
cmd = "retry_stage"
resp["stage"] = "Storage"
resp["override_flags"] = json.RawMessage(run.OverrideFlagsJSON)
}
resp["cmd"] = cmd
writeJSON(w, http.StatusOK, resp)
}
// overrideWipeSet inspects a Run.OverrideFlagsJSON blob for the wipe flag.
// Malformed JSON is ignored — the operator has to reapply the override if
// it didn't round-trip correctly.
func overrideWipeSet(blob string) bool {
if blob == "" {
return false
}
var flags struct {
Wipe bool `json:"wipe"`
}
_ = json.Unmarshal([]byte(blob), &flags)
return flags.Wipe
}
// authenticate verifies the Bearer token against the run's stored hash
// and returns the Run for downstream handlers. Responds 401/404 on
// failure and returns ok=false so the caller can bail early.
func (a *Agent) authenticate(w http.ResponseWriter, r *http.Request, runID int64) (*model.Run, bool) {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
http.Error(w, "run not found", http.StatusNotFound)
return nil, false
}
http.Error(w, "internal error", http.StatusInternalServerError)
return nil, false
}
token := bearerToken(r)
if token == "" {
http.Error(w, "missing bearer", http.StatusUnauthorized)
return nil, false
}
presented := orchestrator.HashRunToken(token)
if subtle.ConstantTimeCompare([]byte(presented), []byte(run.AgentTokenHash)) != 1 {
http.Error(w, "bad token", http.StatusUnauthorized)
return nil, false
}
return run, true
}
func bearerToken(r *http.Request) string {
h := r.Header.Get("Authorization")
if !strings.HasPrefix(h, "Bearer ") {
return ""
}
return strings.TrimSpace(strings.TrimPrefix(h, "Bearer "))
}
func runIDFromURL(w http.ResponseWriter, r *http.Request) (int64, bool) {
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
http.Error(w, "bad run id", http.StatusBadRequest)
return 0, false
}
return id, true
}
func writeJSON(w http.ResponseWriter, status int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(body)
}
// mustListStages is a small wrapper that hides the error path from
// /claim — a DB read failure just pretends there are zero stages, and
// the subsequent Seed will surface the real error.
func mustListStages(s *store.Stages, r *http.Request, runID int64) []model.Stage {
rows, err := s.ListForRun(r.Context(), runID)
if err != nil {
return nil
}
return rows
}
// ===== Phase 3 endpoints =================================================
// LogBatch is what the agent POSTs to /log: zero or more lines with
// timestamp + level + text. Lines are written in order to the per-run
// file and fanned out on the SSE hub.
type LogBatch struct {
Lines []LogLine `json:"lines"`
}
type LogLine struct {
TS string `json:"ts,omitempty"` // RFC3339Nano; server clock used if empty
Level string `json:"level,omitempty"` // info|warn|error|debug
Stage string `json:"stage,omitempty"` // optional stage tag for per-stage log fan-out
Text string `json:"text"`
}
// Log accepts a batch of log lines from the agent. Empty batches are
// legal (useful for agent-side flush ping).
func (a *Agent) Log(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
var batch LogBatch
if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
writer, err := a.Logs.WriterFor(runID)
if err != nil {
http.Error(w, "open log: "+err.Error(), http.StatusInternalServerError)
return
}
for _, l := range batch.Lines {
ts, _ := time.Parse(time.RFC3339Nano, l.TS)
writer.Append(logs.Line{TS: ts, Level: l.Level, Stage: l.Stage, Text: l.Text})
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(batch.Lines)})
}
// StageResult is the body of /result. Kind is the stage name (from
// DefaultStageOrder); Passed drives StageCompleted vs StageFailed.
// Inventory is optional and only set when kind == "Inventory" — the
// orchestrator persists it as an artifact and feeds it to spec.Diff.
//
// SubSteps is agent-authored granular rows (CPU/Memory pass, per-disk
// SMART, per-device GPU, …). Empty for stages with no natural
// breakdown. Persisted after the mismatch guard fires; per-row SSE is
// emitted at the same time so the detail pane can surface them without
// a full page reload.
type StageResult struct {
Stage string `json:"stage"`
Passed bool `json:"passed"`
Summary json.RawMessage `json:"summary,omitempty"`
Inventory *spec.Inventory `json:"inventory,omitempty"`
Firmware []FirmwareLine `json:"firmware,omitempty"`
Message string `json:"message,omitempty"`
SubSteps []SubStepResultLine `json:"sub_steps,omitempty"`
}
// FirmwareLine is a single firmware snapshot POSTed alongside the
// Firmware stage's /result body. Mirrors agent/probes.FirmwareSnapshot.
// The server converts each line to a store.FirmwareSnapshot and persists
// it under the run — SpecValidate reads these back to diff against the
// host's expected_firmware.
type FirmwareLine struct {
Component string `json:"component"`
Identifier string `json:"identifier"`
Version string `json:"version"`
Vendor string `json:"vendor,omitempty"`
Raw map[string]string `json:"raw,omitempty"`
}
// SubStepResultLine is one entry in StageResult.SubSteps. Ordinal is
// assigned from slice index server-side; the agent doesn't set it.
type SubStepResultLine struct {
Name string `json:"name"`
Passed bool `json:"passed"`
Skipped bool `json:"skipped,omitempty"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
Summary json.RawMessage `json:"summary,omitempty"`
}
// Result receives a stage's outcome. Flow:
// 1. Mark the stage row passed/failed + record summary JSON.
// 2. For Inventory: persist the inventory artifact.
// 3. For Inventory (on pass): run spec diff server-side, persist rows,
// bump the run into SpecValidate and immediately resolve SpecValidate
// from that diff — the agent isn't involved in SpecValidate at all.
// 4. Transition the run via StageCompleted/StageFailed.
func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
var body StageResult
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
body.Stage = strings.TrimSpace(body.Stage)
if _, ok := orchestrator.StateForStage(body.Stage); !ok {
http.Error(w, "unknown stage: "+body.Stage, http.StatusBadRequest)
return
}
// Silent-skip guard. Orchestrator advances the run state via
// TriggerStageCompleted against the *current* state, not against
// body.Stage — so an Inventory result posted while the run is in
// StateCPUStress would silently advance CPUStress → Storage and mark
// CPUStress as passed without it ever running. That's exactly what
// happened on Orion when the agent OOM-crashed mid-CPUStress,
// systemd restarted it, and the restarted agent (which hardcoded
// "Inventory" as its first stage) re-ran Inventory and reported it.
// Guard: if body.Stage doesn't match the stage the run is currently
// in, park the run in FailedHolding so the operator can investigate
// rather than trusting the claim and cascading silent passes.
expectedStage := orchestrator.StageNameForState(run.State)
if expectedStage != "" && body.Stage != expectedStage {
failedLabel := fmt.Sprintf("%s (expected %s)", body.Stage, expectedStage)
if err := a.Runs.SetFailedStage(r.Context(), runID, failedLabel); err != nil {
log.Printf("result: set failed stage on mismatch run %d: %v", runID, err)
}
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageMismatch); err != nil {
log.Printf("result: stage-mismatch transition run %d: %v", runID, err)
}
hostName := a.hostNameFor(r.Context(), run.HostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindStageFailed,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s stage mismatch: %s", hostName, body.Stage),
Body: fmt.Sprintf("Run %d reported stage %s while orchestrator expected %s — parked in FailedHolding to prevent silent skip.",
runID, body.Stage, expectedStage),
URL: a.runLinkURL(runID),
})
log.Printf("result: stage mismatch run=%d got=%s expected=%s — parked", runID, body.Stage, expectedStage)
http.Error(w, "stage mismatch: got "+body.Stage+", expected "+expectedStage, http.StatusConflict)
return
}
// Aggregate threshold gate: flip Passed=false server-side when any
// critical breach landed for this stage. The agent's verdict is
// advisory — a stage-executor can miss a runaway sample that the
// sidecar caught. We check this *before* writing the stage state
// so the DB reflects the server-side decision.
thresholdDetail := ""
if body.Passed {
if breached, detail := a.stageHadCriticalBreach(r.Context(), runID, body.Stage); breached {
body.Passed = false
thresholdDetail = detail
a.appendLog(runID, "error", fmt.Sprintf("%s reported passed but %s — flipping to failed", body.Stage, detail))
}
}
stageState := model.StagePassed
if !body.Passed {
stageState = model.StageFailed
}
summaryJSON := ""
if len(body.Summary) > 0 {
summaryJSON = string(body.Summary)
}
if err := a.Runner.CompleteStage(r.Context(), runID, body.Stage, stageState, summaryJSON); err != nil {
http.Error(w, "complete stage: "+err.Error(), http.StatusInternalServerError)
return
}
if thresholdDetail != "" && body.Message == "" {
body.Message = thresholdDetail
}
// Agent-authored sub-steps: persist in slice order (ordinal = index)
// and fan out a per-row SSE event each so the detail pane shows them
// without a reload. Best-effort — a persistence error is logged but
// doesn't fail the whole /result.
a.persistSubSteps(r.Context(), runID, body.Stage, body.SubSteps)
// Inventory-specific: persist artifact + compute spec diff.
if body.Stage == "Inventory" && body.Inventory != nil {
if err := a.persistInventory(r, run, body.Inventory); err != nil {
log.Printf("persist inventory run %d: %v", runID, err)
}
}
// Firmware-specific: persist each snapshot into firmware_snapshots.
// SpecValidate reads them back to diff against expected_firmware.
if body.Stage == "Firmware" && len(body.Firmware) > 0 {
if err := a.persistFirmware(r.Context(), runID, body.Firmware); err != nil {
log.Printf("persist firmware run %d: %v", runID, err)
}
}
if !body.Passed {
if err := a.Runs.SetFailedStage(r.Context(), runID, body.Stage); err != nil {
log.Printf("set failed stage: %v", err)
}
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("result: failed-transition run %d: %v", runID, err)
http.Error(w, "transition", http.StatusConflict)
return
}
hostName := a.hostNameFor(r.Context(), run.HostID)
detail := body.Message
if detail == "" {
detail = "stage reported failure"
}
a.dispatchEvent(notify.Event{
Kind: notify.KindStageFailed,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s FAILED: %s", hostName, body.Stage),
Body: fmt.Sprintf("Run %d on %s failed at stage %s.\n%s", runID, hostName, body.Stage, detail),
URL: a.runLinkURL(runID),
})
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": "FailedHolding"})
return
}
// Passed: advance to the next stage in the pipeline.
next, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted)
if err != nil {
http.Error(w, "advance: "+err.Error(), http.StatusConflict)
return
}
log.Printf("result: run %d stage %s passed → %s", runID, body.Stage, next)
// If the just-advanced-into state is SpecValidate or Reporting, the
// orchestrator owns those stages entirely. The resolve function may
// transition further (→ next stage on pass, → FailedHolding on fail,
// → Completed for Reporting), so we re-read the run after each.
if next == model.StateSpecValidate {
a.resolveSpecValidate(r, runID)
if after, err := a.Runs.Get(r.Context(), runID); err == nil {
next = after.State
}
}
if next == model.StateReporting {
a.resolveReporting(r, runID)
if after, err := a.Runs.Get(r.Context(), runID); err == nil {
next = after.State
}
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": string(next)})
}
// persistSubSteps writes each reported sub-step as a row keyed by
// (runID, stage, ordinal) where ordinal is the slice index, then emits
// a per-row SSE event so an open detail page updates without a reload.
// Silently no-ops when SubSteps is unwired (tests that don't supply a
// store) or the slice is empty.
func (a *Agent) persistSubSteps(ctx context.Context, runID int64, stage string, lines []SubStepResultLine) {
if a.SubSteps == nil || len(lines) == 0 {
return
}
for i, line := range lines {
state := model.StagePassed
switch {
case line.Skipped:
state = model.StageSkipped
case !line.Passed:
state = model.StageFailed
}
started := parseResultTime(line.StartedAt)
completed := parseResultTime(line.CompletedAt)
summaryJSON := ""
if len(line.Summary) > 0 {
summaryJSON = string(line.Summary)
}
ss := model.SubStep{
RunID: runID,
StageName: stage,
Ordinal: i,
Name: line.Name,
State: state,
StartedAt: started,
CompletedAt: completed,
SummaryJSON: summaryJSON,
}
if err := a.SubSteps.Upsert(ctx, ss); err != nil {
log.Printf("substep upsert run=%d stage=%s ord=%d: %v", runID, stage, i, err)
continue
}
if a.Runner != nil {
a.Runner.PublishSubStepUpdate(ctx, ss)
}
}
}
// parseResultTime tolerates RFC3339 / RFC3339Nano and returns nil for
// empty or unparseable values so a missing timestamp doesn't block the
// persist path.
func parseResultTime(s string) *time.Time {
if s == "" {
return nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return &t
}
if t, err := time.Parse(time.RFC3339, s); err == nil {
return &t
}
return nil
}
// persistFirmware writes the reported snapshots. A nil/unset a.Firmware
// store is a no-op so tests that don't wire it up stay green; a mid-run
// persist error is logged but doesn't fail the stage (Firmware is
// advisory — SpecValidate is the gate).
func (a *Agent) persistFirmware(ctx context.Context, runID int64, lines []FirmwareLine) error {
if a.Firmware == nil || len(lines) == 0 {
return nil
}
rows := make([]store.FirmwareSnapshot, 0, len(lines))
for _, l := range lines {
raw := "{}"
if len(l.Raw) > 0 {
if b, err := json.Marshal(l.Raw); err == nil {
raw = string(b)
}
}
rows = append(rows, store.FirmwareSnapshot{
RunID: runID,
Component: l.Component,
Identifier: l.Identifier,
Version: l.Version,
Vendor: l.Vendor,
RawJSON: raw,
})
}
return a.Firmware.CreateBatch(ctx, rows)
}
func (a *Agent) persistInventory(r *http.Request, run *model.Run, inv *spec.Inventory) error {
dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", run.ID))
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
path := filepath.Join(dir, "inventory.json")
buf, err := json.MarshalIndent(inv, "", " ")
if err != nil {
return err
}
if err := os.WriteFile(path, buf, 0o644); err != nil {
return err
}
sum := sha256.Sum256(buf)
_, err = a.Artifacts.Create(r.Context(), store.Artifact{
RunID: run.ID,
Kind: "inventory",
Path: path,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(buf)),
})
return err
}
// resolveSpecValidate runs the expected-vs-actual diff against the
// just-stored inventory artifact, persists spec_diffs rows, and drives
// the state machine — all on the server. The agent does nothing for
// this stage.
func (a *Agent) resolveSpecValidate(r *http.Request, runID int64) {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil {
log.Printf("specvalidate: get run: %v", err)
return
}
host, err := a.Hosts.Get(r.Context(), run.HostID)
if err != nil {
log.Printf("specvalidate: get host: %v", err)
return
}
expected, err := spec.Parse(host.ExpectedSpecYAML)
if err != nil {
log.Printf("specvalidate: parse expected yaml: %v", err)
a.failStage(r, runID, "SpecValidate", "malformed expected spec: "+err.Error())
return
}
inv, err := a.readInventoryArtifact(r, runID)
if err != nil {
log.Printf("specvalidate: read inventory: %v", err)
a.failStage(r, runID, "SpecValidate", "missing inventory artifact")
return
}
diffs := spec.Diff(expected, inv)
if a.Firmware != nil && len(expected.Firmware) > 0 {
snaps, err := a.Firmware.ListForRun(r.Context(), runID)
if err != nil {
log.Printf("specvalidate: list firmware: %v", err)
} else {
observed := make([]spec.FirmwareObserved, 0, len(snaps))
for _, s := range snaps {
observed = append(observed, spec.FirmwareObserved{
Component: s.Component,
Identifier: s.Identifier,
Version: s.Version,
})
}
diffs = append(diffs, spec.DiffFirmware(expected.Firmware, observed)...)
}
}
if err := a.SpecDiffs.ReplaceForRun(r.Context(), runID, diffs); err != nil {
log.Printf("specvalidate: write diffs: %v", err)
}
if err := a.Stages.StartByName(r.Context(), runID, "SpecValidate"); err != nil {
log.Printf("specvalidate: start stage: %v", err)
}
critical := 0
for _, d := range diffs {
if d.Severity == "critical" && !d.Ignored {
critical++
}
}
summaryBuf, _ := json.Marshal(map[string]any{
"diffs": len(diffs),
"critical": critical,
})
if critical > 0 {
_ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StageFailed, string(summaryBuf))
_ = a.Runs.SetFailedStage(r.Context(), runID, "SpecValidate")
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("specvalidate: failed-transition: %v", err)
}
a.appendLog(runID, "error", fmt.Sprintf("SpecValidate: %d critical diff(s) — holding host", critical))
hostName := a.hostNameFor(r.Context(), run.HostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindSpecMismatch,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s spec mismatch (%d critical)", hostName, critical),
Body: fmt.Sprintf("SpecValidate found %d critical diff(s) on %s. Host is held for inspection.", critical, hostName),
URL: a.runLinkURL(runID),
})
} else {
_ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StagePassed, string(summaryBuf))
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted); err != nil {
log.Printf("specvalidate: advance: %v", err)
}
a.appendLog(runID, "info", "SpecValidate: all fields match expected spec")
}
}
func (a *Agent) readInventoryArtifact(r *http.Request, runID int64) (*spec.Inventory, error) {
arts, err := a.Artifacts.ListForRun(r.Context(), runID)
if err != nil {
return nil, err
}
for i := len(arts) - 1; i >= 0; i-- {
if arts[i].Kind == "inventory" {
buf, err := os.ReadFile(arts[i].Path)
if err != nil {
return nil, err
}
var inv spec.Inventory
if err := json.Unmarshal(buf, &inv); err != nil {
return nil, err
}
return &inv, nil
}
}
return nil, errors.New("no inventory artifact")
}
func (a *Agent) failStage(r *http.Request, runID int64, stage, message string) {
_ = a.Runner.CompleteStage(r.Context(), runID, stage, model.StageFailed, fmt.Sprintf(`{"error":%q}`, message))
_ = a.Runs.SetFailedStage(r.Context(), runID, stage)
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("failStage: transition run %d: %v", runID, err)
}
a.appendLog(runID, "error", stage+": "+message)
}
func (a *Agent) appendLog(runID int64, level, text string) {
if a.Logs == nil {
return
}
w, err := a.Logs.WriterFor(runID)
if err != nil {
log.Printf("appendLog: %v", err)
return
}
w.Append(logs.Line{Level: level, Text: text})
}
// Hold issues the per-run ephemeral ed25519 keypair: the agent gets
// the authorized_keys line, the orchestrator keeps the privkey on disk.
// Hold also records the agent's reported IP so the tile can print the
// ssh invocation.
type HoldRequest struct {
AgentIP string `json:"agent_ip"`
}
type HoldResponse struct {
AuthorizedKey string `json:"authorized_key"`
RunID int64 `json:"run_id"`
}
func (a *Agent) Hold(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
var body HoldRequest
_ = json.NewDecoder(r.Body).Decode(&body)
agentIP := strings.TrimSpace(body.AgentIP)
if agentIP == "" {
if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
agentIP = host
}
}
if agentIP != "" {
if err := a.Runs.SetHoldIP(r.Context(), runID, agentIP); err != nil {
log.Printf("hold: set hold_ip: %v", err)
}
}
kp, err := hold.Issue(runID)
if err != nil {
http.Error(w, "generate key: "+err.Error(), http.StatusInternalServerError)
return
}
keyPath := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID), "hold.key")
abs, err := kp.WritePrivateTo(keyPath)
if err != nil {
http.Error(w, "write key: "+err.Error(), http.StatusInternalServerError)
return
}
sum := sha256.Sum256(kp.PrivatePEM)
if _, err := a.Artifacts.Create(r.Context(), store.Artifact{
RunID: runID,
Kind: "hold_key",
Path: abs,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(kp.PrivatePEM)),
}); err != nil {
log.Printf("hold: record artifact: %v", err)
}
a.appendLog(runID, "info", fmt.Sprintf("Hold key issued. SSH in with: ssh -i %s root@%s", abs, agentIP))
hostID := mustHostID(a, r, runID)
if hostID != 0 {
hostName := a.hostNameFor(r.Context(), hostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindHoldingOpened,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s holding — SSH ready", hostName),
Body: fmt.Sprintf("Host %s is holding at %s.\nssh -i %s root@%s", hostName, agentIP, abs, agentIP),
URL: a.runLinkURL(runID),
})
}
// Refresh the tile + all detail-page fragments so the operator
// sees the ssh command and the hold banner without reloading.
if id := mustHostID(a, r, runID); id != 0 && a.Runner != nil {
a.Runner.PublishTileUpdate(r.Context(), id)
}
writeJSON(w, http.StatusOK, HoldResponse{AuthorizedKey: kp.AuthorizedKey, RunID: runID})
}
// dispatchEvent hands an already-populated Event to the notify Registry
// if one is wired. Handler code uses hostNameFor to resolve the host
// name for the event payload; this keeps call sites terse.
func (a *Agent) dispatchEvent(ev notify.Event) {
if a.Notify == nil {
return
}
a.Notify.Dispatch(ev)
}
// hostNameFor returns a human-readable host name for a run, or "host-N"
// if the lookup fails — notifications should never fail silently over a
// missing name.
func (a *Agent) hostNameFor(ctx context.Context, hostID int64) string {
if host, err := a.Hosts.Get(ctx, hostID); err == nil && host != nil {
return host.Name
}
return fmt.Sprintf("host-%d", hostID)
}
func (a *Agent) runLinkURL(runID int64) string {
if a.PublicURL == "" {
return ""
}
return strings.TrimRight(a.PublicURL, "/") + "/reports/" + fmt.Sprintf("%d", runID)
}
func mustHostID(a *Agent, r *http.Request, runID int64) int64 {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil || run == nil {
return 0
}
return run.HostID
}
// ===== Phase 4 endpoints =================================================
// SensorBatch is what the agent POSTs to /sensor: a stream of numeric
// samples (temps, fan rpm, PSU rails, iperf throughput). Each sample is
// (kind, key, value, unit). Timestamps default to server-now when empty
// so the thermal sidecar doesn't have to carry a clock.
type SensorBatch struct {
Samples []SensorSample `json:"samples"`
}
type SensorSample struct {
TS string `json:"ts,omitempty"`
Kind string `json:"kind"` // temp|fan|psu_volt|iperf|fio|smart_attr
Key string `json:"key"`
Value float64 `json:"value"`
Unit string `json:"unit,omitempty"`
}
// Sensor persists a batch of numeric samples. The thermal sidecar hits
// this on a tick; stage executors (iperf, fio) also drop here. Each
// sample is evaluated against the run's seeded thresholds — critical
// breaches fail the run immediately (thermal runaway, EDAC UE, voltage
// sag); warning breaches are recorded for the report only.
func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
if a.Measurements == nil {
http.Error(w, "measurements store not wired", http.StatusInternalServerError)
return
}
var body SensorBatch
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
rows := make([]model.Measurement, 0, len(body.Samples))
sampleStages := make([]string, 0, len(body.Samples))
for _, s := range body.Samples {
ts, _ := time.Parse(time.RFC3339Nano, s.TS)
if ts.IsZero() {
ts = time.Now().UTC()
}
rows = append(rows, model.Measurement{
RunID: runID,
TS: ts,
Kind: s.Kind,
Key: s.Key,
Value: s.Value,
Unit: s.Unit,
})
// Stage the sample belongs to drives threshold selector
// matching. We use the run's current state — the agent does
// not tag samples with a stage.
sampleStages = append(sampleStages, orchestrator.StageNameForState(run.State))
}
if err := a.Measurements.CreateBatch(r.Context(), rows); err != nil {
http.Error(w, "write samples: "+err.Error(), http.StatusInternalServerError)
return
}
critical := a.evaluateSensorBatch(r.Context(), runID, rows, sampleStages)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"written": len(rows),
"breach": critical != "",
"breach_kind": critical,
})
if critical != "" {
a.failRunOnCriticalBreach(r, run, critical)
}
}
// evaluateSensorBatch runs each sample through the run's thresholds,
// persists evaluations, and returns a short human-readable label for
// the first critical breach it sees (empty when all samples pass or
// only hit warning-severity rules).
func (a *Agent) evaluateSensorBatch(ctx context.Context, runID int64, rows []model.Measurement, sampleStages []string) string {
if a.Thresholds == nil || len(rows) == 0 {
return ""
}
rules, err := a.Thresholds.ListForRun(ctx, runID)
if err != nil {
log.Printf("sensor: list thresholds run %d: %v", runID, err)
return ""
}
if len(rules) == 0 {
return ""
}
evalRules := make([]orchestrator.Threshold, 0, len(rules))
for _, r := range rules {
evalRules = append(evalRules, orchestrator.Threshold{
ID: r.ID,
Stage: r.Stage,
Kind: r.Kind,
Key: r.Key,
Op: orchestrator.ThresholdOp(r.Op),
Value: r.Threshold,
Nominal: r.Nominal,
Severity: orchestrator.ThresholdSeverity(r.Severity),
})
}
evals := make([]store.ThresholdEvaluation, 0, len(rows))
critical := ""
for i, m := range rows {
sample := orchestrator.Sample{
Stage: sampleStages[i],
Kind: m.Kind,
Key: m.Key,
Value: m.Value,
}
for _, res := range orchestrator.Evaluate(sample, evalRules) {
evals = append(evals, store.ThresholdEvaluation{
RunID: runID,
ThresholdID: res.Threshold.ID,
Stage: sample.Stage,
Kind: sample.Kind,
Key: sample.Key,
TS: m.TS,
Observed: res.Observed,
Passed: res.Passed,
})
if critical == "" && res.CriticalBreach() {
critical = fmt.Sprintf("%s %s=%g breached %s %g",
res.Threshold.Kind, sample.Key, res.Observed, res.Threshold.Op, res.Threshold.Value)
}
}
}
if err := a.Thresholds.RecordBatch(ctx, evals); err != nil {
log.Printf("sensor: record evals run %d: %v", runID, err)
}
return critical
}
// stageHadCriticalBreach returns true if any critical-severity
// threshold evaluation for this run matched samples attributed to the
// given stage (stage selector "*" or exact). Called at /result close
// so even an agent that reports Passed=true gets overridden when the
// aggregate view says the stage tripped a gate.
func (a *Agent) stageHadCriticalBreach(ctx context.Context, runID int64, stage string) (bool, string) {
if a.Thresholds == nil {
return false, ""
}
breaches, err := a.Thresholds.CriticalBreaches(ctx, runID)
if err != nil {
log.Printf("result: list breaches run %d: %v", runID, err)
return false, ""
}
for _, b := range breaches {
if b.Stage == stage || b.Stage == "" || b.Stage == "*" {
return true, fmt.Sprintf("critical threshold breach: %s %s=%g", b.Kind, b.Key, b.Observed)
}
}
return false, ""
}
// failRunOnCriticalBreach flips the run to FailedHolding in response
// to a live threshold breach (thermal runaway, EDAC UE, rail sag).
// The agent's pending /result for the current stage may still arrive —
// the silent-skip guard handles that by refusing to double-transition.
func (a *Agent) failRunOnCriticalBreach(r *http.Request, run *model.Run, detail string) {
stage := orchestrator.StageNameForState(run.State)
if stage == "" {
stage = "threshold"
}
if err := a.Runs.SetFailedStage(r.Context(), run.ID, stage+" (threshold)"); err != nil {
log.Printf("sensor: set failed stage run %d: %v", run.ID, err)
}
if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerStageFailed); err != nil {
// If we're already in FailedHolding the transition errors —
// that's fine, the first breach wins.
log.Printf("sensor: fail-transition run %d: %v", run.ID, err)
return
}
hostName := a.hostNameFor(r.Context(), run.HostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindStageFailed,
Severity: notify.SeverityCritical,
RunID: run.ID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s FAILED: %s (threshold)", hostName, stage),
Body: fmt.Sprintf("Run %d on %s tripped a critical threshold during %s: %s", run.ID, hostName, stage, detail),
URL: a.runLinkURL(run.ID),
})
a.appendLog(run.ID, "error", fmt.Sprintf("threshold breach during %s: %s — run parked in FailedHolding", stage, detail))
}
// resolveReporting runs when the pipeline advances into StateReporting.
// It's an orchestrator-owned stage like SpecValidate: no agent action.
// Writes a JSON report bundling run + stages + diffs + measurements,
// then advances the run to Completed. Heartbeat will then return abort
// and the agent will power the host off in Phase 5.
func (a *Agent) resolveReporting(r *http.Request, runID int64) {
ctx := r.Context()
if err := a.Stages.StartByName(ctx, runID, "Reporting"); err != nil {
log.Printf("reporting: start stage: %v", err)
}
run, err := a.Runs.Get(ctx, runID)
if err != nil {
log.Printf("reporting: get run: %v", err)
return
}
host, err := a.Hosts.Get(ctx, run.HostID)
if err != nil {
log.Printf("reporting: get host: %v", err)
return
}
stages, err := a.Stages.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list stages: %v", err)
}
diffs, err := a.SpecDiffs.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list diffs: %v", err)
}
var measurements []model.Measurement
if a.Measurements != nil {
measurements, err = a.Measurements.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list measurements: %v", err)
}
}
var firmware []store.FirmwareSnapshot
if a.Firmware != nil {
firmware, err = a.Firmware.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list firmware: %v", err)
}
}
bundle := map[string]any{
"run": run,
"host": host,
"stages": stages,
"spec_diffs": diffs,
"measurements": measurements,
"firmware": firmware,
"generated_at": time.Now().UTC().Format(time.RFC3339),
}
buf, err := json.MarshalIndent(bundle, "", " ")
if err != nil {
log.Printf("reporting: marshal: %v", err)
a.failStage(r, runID, "Reporting", "marshal report: "+err.Error())
return
}
dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID))
if err := os.MkdirAll(dir, 0o755); err != nil {
a.failStage(r, runID, "Reporting", "mkdir: "+err.Error())
return
}
path := filepath.Join(dir, "report.json")
if err := os.WriteFile(path, buf, 0o644); err != nil {
a.failStage(r, runID, "Reporting", "write: "+err.Error())
return
}
sum := sha256.Sum256(buf)
if _, err := a.Artifacts.Create(ctx, store.Artifact{
RunID: runID,
Kind: "report",
Path: path,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(buf)),
}); err != nil {
log.Printf("reporting: record artifact: %v", err)
}
// Also render the operator-facing HTML summary alongside the JSON.
// Failures here are non-fatal — the JSON is the source of truth.
if host != nil {
fwRows := make([]report.FirmwareSnapshot, 0, len(firmware))
for _, f := range firmware {
fwRows = append(fwRows, report.FirmwareSnapshot{
Component: f.Component,
Identifier: f.Identifier,
Version: f.Version,
Vendor: f.Vendor,
})
}
htmlData := report.Data{
GeneratedAt: time.Now().UTC(),
Run: *run,
Host: *host,
Stages: stages,
SpecDiffs: diffs,
Aggregates: report.AggregateMeasurements(measurements),
Firmware: fwRows,
}
if htmlBuf, err := report.RenderHTML(htmlData); err != nil {
log.Printf("reporting: render html: %v", err)
} else {
htmlPath := filepath.Join(dir, "report.html")
if err := os.WriteFile(htmlPath, htmlBuf, 0o644); err != nil {
log.Printf("reporting: write html: %v", err)
} else {
htmlSum := sha256.Sum256(htmlBuf)
if _, err := a.Artifacts.Create(ctx, store.Artifact{
RunID: runID,
Kind: "report_html",
Path: htmlPath,
SHA256: hex.EncodeToString(htmlSum[:]),
SizeBytes: int64(len(htmlBuf)),
}); err != nil {
log.Printf("reporting: record html artifact: %v", err)
}
}
}
}
summaryBuf, _ := json.Marshal(map[string]any{
"report_path": path,
"stages": len(stages),
"diffs": len(diffs),
})
if err := a.Runner.CompleteStage(ctx, runID, "Reporting", model.StagePassed, string(summaryBuf)); err != nil {
log.Printf("reporting: complete stage: %v", err)
}
if err := a.Runs.MarkCompleted(ctx, runID, path); err != nil {
log.Printf("reporting: mark completed: %v", err)
}
a.appendLog(runID, "info", "Reporting: wrote "+path+"; run completed.")
// Publish a final tile + detail update so the dashboard flips to
// pass mood and the detail page's summary/actions update without
// the operator reloading.
if host != nil && a.Runner != nil {
a.Runner.PublishTileUpdate(ctx, host.ID)
}
hostName := "host"
if host != nil {
hostName = host.Name
}
a.dispatchEvent(notify.Event{
Kind: notify.KindRunCompleted,
Severity: notify.SeverityInfo,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s passed vetting", hostName),
Body: fmt.Sprintf("Run %d on %s completed all stages. Report: %s", runID, hostName, path),
URL: a.runLinkURL(runID),
})
}