package api import ( "context" "crypto/sha256" "crypto/subtle" "encoding/hex" "encoding/json" "errors" "fmt" "log" "net" "net/http" "os" "path/filepath" "strconv" "strings" "time" "github.com/go-chi/chi/v5" "vetting/internal/config" "vetting/internal/events" "vetting/internal/hold" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/notify" "vetting/internal/orchestrator" "vetting/internal/pxe" "vetting/internal/report" "vetting/internal/spec" "vetting/internal/store" ) // Agent collects the collaborators used by agent-facing HTTP routes: // the iPXE chainload endpoint and the /api/v1/runs/:id/* endpoints. type Agent struct { Hosts *store.Hosts Runs *store.Runs Stages *store.Stages SubSteps *store.SubSteps Artifacts *store.Artifacts SpecDiffs *store.SpecDiffs Measurements *store.Measurements Thresholds *store.Thresholds // Phase 1: seeded per run; consulted on each /sensor batch Firmware *store.Firmware // Phase 4: firmware snapshots (unused before then) Profiles *config.ProfileRegistry // Phase 2: /claim resolves the run's profile → stage knobs Runner *orchestrator.Runner EventHub *events.Hub Logs *logs.Hub Notify *notify.Registry ArtifactsDir string // ./var/artifacts OrchestratorURL string // baked into iPXE cmdline PublicURL string // user-visible URL base for notification click-throughs LiveKernelURL string LiveInitrdURL string TLSCertFPR string // optional; empty = skip pinning IperfPort int // orchestrator-supervised iperf3 port; 0 = 5201 } // IPXEScript serves a per-MAC iPXE script. Called by iPXE itself after // dnsmasq hands it the chainload URL. Unknown MAC → halt script. // Known MAC with no active run → poweroff script. Known MAC with active // run → real boot script; the fetch triggers PXEObserved. func (a *Agent) IPXEScript(w http.ResponseWriter, r *http.Request) { mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac"))) w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Cache-Control", "no-store") if !macRe.MatchString(mac) { log.Printf("ipxe: rejected malformed mac %q from %s", mac, r.RemoteAddr) _, _ = w.Write([]byte(pxe.NotRegisteredScript(mac))) return } run, err := a.Runs.FindActiveByMAC(r.Context(), mac) if err != nil { log.Printf("ipxe: find run by mac %s: %v", mac, err) http.Error(w, "internal error", http.StatusInternalServerError) return } if run == nil { _, _ = w.Write([]byte(pxe.NoActiveRunScript(mac))) return } // The token hash in the DB is the sha256 of the plaintext. The // plaintext itself cannot be recovered from the hash — we issued it // once when the run was created. For iPXE we re-issue a fresh token // on every PXE fetch: this is safe because the hash in the DB is // rewritten to match and only the most recent PXE can be claimed. plain, hash, err := orchestrator.IssueRunToken() if err != nil { http.Error(w, "token", http.StatusInternalServerError) return } if err := a.Runs.RotateTokenHash(r.Context(), run.ID, hash); err != nil { log.Printf("ipxe: rotate token run %d: %v", run.ID, err) http.Error(w, "token", http.StatusInternalServerError) return } script := pxe.BuildScript(pxe.IPXEParams{ OrchestratorURL: a.OrchestratorURL, LiveKernelURL: a.LiveKernelURL, LiveInitrdURL: a.LiveInitrdURL, TLSCertFPR: a.TLSCertFPR, RunID: run.ID, MAC: mac, Token: plain, }) _, _ = w.Write([]byte(script)) // iPXE has now fetched the script — treat this as PXEObserved. If we // were already in Booting the transition table allows staying. if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerPXEObserved); err != nil { // Non-fatal: the agent may still claim via /claim. log.Printf("ipxe: PXEObserved for run %d: %v", run.ID, err) } } // Hello is the first call an agent makes once userspace is up. It's // idempotent and only writes a log line; the authoritative transition // comes from /claim. The agent sends Hello early so operators see a // signal in the tile even before the token is validated. func (a *Agent) Hello(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } log.Printf("agent hello: run=%d remote=%s", runID, r.RemoteAddr) writeJSON(w, http.StatusOK, map[string]any{"ok": true, "run_id": runID}) } // Claim is the binding call: the agent proves it holds the plaintext // token for this run, and in return the orchestrator transitions to // InventoryCheck and seeds the stage rows. All destructive actions the // agent takes later require a prior successful claim. func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } var body struct { AgentIP string `json:"agent_ip"` } if r.Body != nil { // agent_ip is informational; if missing fall back to RemoteAddr. _ = json.NewDecoder(r.Body).Decode(&body) } agentIP := strings.TrimSpace(body.AgentIP) if agentIP == "" { if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { agentIP = host } else { agentIP = r.RemoteAddr } } // First claim seeds the stage rows; subsequent claims are a no-op // so agent retries after transient network failures stay safe. if len(mustListStages(a.Stages, r, runID)) == 0 { if err := a.Stages.Seed(r.Context(), runID); err != nil { log.Printf("claim: seed stages run %d: %v", runID, err) http.Error(w, "seed stages", http.StatusInternalServerError) return } } // Drive the transition. If we're already past Booting this returns // an error — treat as "already claimed" and report OK, don't 500. if run.State == model.StateWaitingWoL || run.State == model.StateBooting { if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerAgentClaimed); err != nil { log.Printf("claim: transition run %d: %v", runID, err) http.Error(w, "transition", http.StatusConflict) return } } // Re-fetch run state: the Transition above may have advanced us from // Booting → InventoryCheck, and we want to hand that fresh state to // the agent so a re-claim after a crash resumes at the stored state // instead of silently replaying Inventory. currentState := run.State if fresh, err := a.Runs.Get(r.Context(), runID); err == nil && fresh != nil { currentState = fresh.State } log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP) if a.Logs != nil { if w, err := a.Logs.WriterFor(runID); err == nil { w.Append(logs.Line{Level: "info", Text: fmt.Sprintf("agent claimed from %s — entering Inventory", agentIP)}) } } // Stage-driven agent needs a bit of per-run config: the device // allowlist (serial + expected size) for Storage, and the iperf3 // server port for Network. Parse the host's expected spec here so // the agent doesn't need to read YAML. expectedDisks := []map[string]any{} if host, err := a.Hosts.Get(r.Context(), run.HostID); err == nil && host != nil { if parsed, err := spec.Parse(host.ExpectedSpecYAML); err == nil && parsed != nil { for _, dd := range parsed.Disks { expectedDisks = append(expectedDisks, map[string]any{ "serial": dd.Serial, "size_gb": dd.SizeGB, }) } } } iperfPort := a.IperfPort if iperfPort == 0 { iperfPort = 5201 } // Resolve the run's profile → agent-visible stage knobs. The agent // reads these to size CPUStress / Storage / Network work. An empty // profile (legacy runs seeded before Phase 1) falls back to "quick". profileName := run.Profile if profileName == "" { profileName = config.ProfileQuick } var stageCfg config.StageConfig if a.Profiles != nil { stageCfg = a.Profiles.ResolveStageConfig(profileName) } else { stageCfg = config.StageConfig{Profile: profileName} } writeJSON(w, http.StatusOK, map[string]any{ "ok": true, "run_id": runID, "stages": store.DefaultStageOrder, "expected_disks": expectedDisks, "iperf_port": iperfPort, "non_destructive": run.NonDestructive, "current_state": string(currentState), "stage_config": stageCfg, }) } // Heartbeat is the agent's periodic liveness ping. The response body // acts as a control channel: cmd=continue is the normal case; cmd=abort // once the run enters FailedHolding/Released; cmd=retry_stage when the // operator has overridden a failed stage (wipe-probe override). func (a *Agent) Heartbeat(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } a.Runner.TouchHeartbeat(runID) cmd := "continue" resp := map[string]any{"state": run.State} switch { case run.State == model.StateCompleted: // Pipeline succeeded — agent reboots so the host falls through // iPXE's no-active-run script to the next boot device (local // disk), landing back on the installed OS without operator // intervention. cmd = "reboot" case run.State == model.StateCancelled: // Operator clicked Cancel. Two sub-cases: // - FailedStage set → run was sitting in FailedHolding with no // in-flight stage subprocess; the agent is parked in // waitForOverride. Send cmd=reboot so the heartbeat loop // reboots the host, falls through iPXE's no-active-run // script and boots local disk. // - FailedStage empty → cancel mid-stage; kill the stage ctx // first so the running subprocess exits cleanly, then the // agent powers off via its existing cancel path. if run.FailedStage != "" { cmd = "reboot" } else { cmd = "cancel_stage" } case run.State == model.StateReleased: // Operator accepted the failure outcome. No further agent // action is possible — stop the heartbeat loop. cmd = "abort" // FailedHolding intentionally falls through to cmd=continue: the // agent is parked in waitForOverride awaiting operator action // (Cancel → reboot, Override → retry_stage). Keeping the // heartbeat loop alive is what lets those commands reach it. case run.FailedStage == "Storage" && overrideWipeSet(run.OverrideFlagsJSON): // Operator pressed "Override wipe & retry". Agent should // re-enter Storage with the wipe-probe bypass armed. cmd = "retry_stage" resp["stage"] = "Storage" resp["override_flags"] = json.RawMessage(run.OverrideFlagsJSON) } resp["cmd"] = cmd writeJSON(w, http.StatusOK, resp) } // overrideWipeSet inspects a Run.OverrideFlagsJSON blob for the wipe flag. // Malformed JSON is ignored — the operator has to reapply the override if // it didn't round-trip correctly. func overrideWipeSet(blob string) bool { if blob == "" { return false } var flags struct { Wipe bool `json:"wipe"` } _ = json.Unmarshal([]byte(blob), &flags) return flags.Wipe } // authenticate verifies the Bearer token against the run's stored hash // and returns the Run for downstream handlers. Responds 401/404 on // failure and returns ok=false so the caller can bail early. func (a *Agent) authenticate(w http.ResponseWriter, r *http.Request, runID int64) (*model.Run, bool) { run, err := a.Runs.Get(r.Context(), runID) if err != nil { if errors.Is(err, store.ErrNotFound) { http.Error(w, "run not found", http.StatusNotFound) return nil, false } http.Error(w, "internal error", http.StatusInternalServerError) return nil, false } token := bearerToken(r) if token == "" { http.Error(w, "missing bearer", http.StatusUnauthorized) return nil, false } presented := orchestrator.HashRunToken(token) if subtle.ConstantTimeCompare([]byte(presented), []byte(run.AgentTokenHash)) != 1 { http.Error(w, "bad token", http.StatusUnauthorized) return nil, false } return run, true } func bearerToken(r *http.Request) string { h := r.Header.Get("Authorization") if !strings.HasPrefix(h, "Bearer ") { return "" } return strings.TrimSpace(strings.TrimPrefix(h, "Bearer ")) } func runIDFromURL(w http.ResponseWriter, r *http.Request) (int64, bool) { idStr := chi.URLParam(r, "id") id, err := strconv.ParseInt(idStr, 10, 64) if err != nil || id <= 0 { http.Error(w, "bad run id", http.StatusBadRequest) return 0, false } return id, true } func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) } // mustListStages is a small wrapper that hides the error path from // /claim — a DB read failure just pretends there are zero stages, and // the subsequent Seed will surface the real error. func mustListStages(s *store.Stages, r *http.Request, runID int64) []model.Stage { rows, err := s.ListForRun(r.Context(), runID) if err != nil { return nil } return rows } // ===== Phase 3 endpoints ================================================= // LogBatch is what the agent POSTs to /log: zero or more lines with // timestamp + level + text. Lines are written in order to the per-run // file and fanned out on the SSE hub. type LogBatch struct { Lines []LogLine `json:"lines"` } type LogLine struct { TS string `json:"ts,omitempty"` // RFC3339Nano; server clock used if empty Level string `json:"level,omitempty"` // info|warn|error|debug Stage string `json:"stage,omitempty"` // optional stage tag for per-stage log fan-out Text string `json:"text"` } // Log accepts a batch of log lines from the agent. Empty batches are // legal (useful for agent-side flush ping). func (a *Agent) Log(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } var batch LogBatch if err := json.NewDecoder(r.Body).Decode(&batch); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } writer, err := a.Logs.WriterFor(runID) if err != nil { http.Error(w, "open log: "+err.Error(), http.StatusInternalServerError) return } for _, l := range batch.Lines { ts, _ := time.Parse(time.RFC3339Nano, l.TS) writer.Append(logs.Line{TS: ts, Level: l.Level, Stage: l.Stage, Text: l.Text}) } writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(batch.Lines)}) } // StageResult is the body of /result. Kind is the stage name (from // DefaultStageOrder); Passed drives StageCompleted vs StageFailed. // Inventory is optional and only set when kind == "Inventory" — the // orchestrator persists it as an artifact and feeds it to spec.Diff. // // SubSteps is agent-authored granular rows (CPU/Memory pass, per-disk // SMART, per-device GPU, …). Empty for stages with no natural // breakdown. Persisted after the mismatch guard fires; per-row SSE is // emitted at the same time so the detail pane can surface them without // a full page reload. type StageResult struct { Stage string `json:"stage"` Passed bool `json:"passed"` Summary json.RawMessage `json:"summary,omitempty"` Inventory *spec.Inventory `json:"inventory,omitempty"` Firmware []FirmwareLine `json:"firmware,omitempty"` Message string `json:"message,omitempty"` SubSteps []SubStepResultLine `json:"sub_steps,omitempty"` } // FirmwareLine is a single firmware snapshot POSTed alongside the // Firmware stage's /result body. Mirrors agent/probes.FirmwareSnapshot. // The server converts each line to a store.FirmwareSnapshot and persists // it under the run — SpecValidate reads these back to diff against the // host's expected_firmware. type FirmwareLine struct { Component string `json:"component"` Identifier string `json:"identifier"` Version string `json:"version"` Vendor string `json:"vendor,omitempty"` Raw map[string]string `json:"raw,omitempty"` } // SubStepResultLine is one entry in StageResult.SubSteps. Ordinal is // assigned from slice index server-side; the agent doesn't set it. type SubStepResultLine struct { Name string `json:"name"` Passed bool `json:"passed"` Skipped bool `json:"skipped,omitempty"` StartedAt string `json:"started_at,omitempty"` CompletedAt string `json:"completed_at,omitempty"` Summary json.RawMessage `json:"summary,omitempty"` } // Result receives a stage's outcome. Flow: // 1. Mark the stage row passed/failed + record summary JSON. // 2. For Inventory: persist the inventory artifact. // 3. For Inventory (on pass): run spec diff server-side, persist rows, // bump the run into SpecValidate and immediately resolve SpecValidate // from that diff — the agent isn't involved in SpecValidate at all. // 4. Transition the run via StageCompleted/StageFailed. func (a *Agent) Result(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } var body StageResult if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } body.Stage = strings.TrimSpace(body.Stage) if _, ok := orchestrator.StateForStage(body.Stage); !ok { http.Error(w, "unknown stage: "+body.Stage, http.StatusBadRequest) return } // Silent-skip guard. Orchestrator advances the run state via // TriggerStageCompleted against the *current* state, not against // body.Stage — so an Inventory result posted while the run is in // StateCPUStress would silently advance CPUStress → Storage and mark // CPUStress as passed without it ever running. That's exactly what // happened on Orion when the agent OOM-crashed mid-CPUStress, // systemd restarted it, and the restarted agent (which hardcoded // "Inventory" as its first stage) re-ran Inventory and reported it. // Guard: if body.Stage doesn't match the stage the run is currently // in, park the run in FailedHolding so the operator can investigate // rather than trusting the claim and cascading silent passes. expectedStage := orchestrator.StageNameForState(run.State) if expectedStage != "" && body.Stage != expectedStage { failedLabel := fmt.Sprintf("%s (expected %s)", body.Stage, expectedStage) if err := a.Runs.SetFailedStage(r.Context(), runID, failedLabel); err != nil { log.Printf("result: set failed stage on mismatch run %d: %v", runID, err) } if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageMismatch); err != nil { log.Printf("result: stage-mismatch transition run %d: %v", runID, err) } hostName := a.hostNameFor(r.Context(), run.HostID) a.dispatchEvent(notify.Event{ Kind: notify.KindStageFailed, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s stage mismatch: %s", hostName, body.Stage), Body: fmt.Sprintf("Run %d reported stage %s while orchestrator expected %s — parked in FailedHolding to prevent silent skip.", runID, body.Stage, expectedStage), URL: a.runLinkURL(runID), }) log.Printf("result: stage mismatch run=%d got=%s expected=%s — parked", runID, body.Stage, expectedStage) http.Error(w, "stage mismatch: got "+body.Stage+", expected "+expectedStage, http.StatusConflict) return } // Aggregate threshold gate: flip Passed=false server-side when any // critical breach landed for this stage. The agent's verdict is // advisory — a stage-executor can miss a runaway sample that the // sidecar caught. We check this *before* writing the stage state // so the DB reflects the server-side decision. thresholdDetail := "" if body.Passed { if breached, detail := a.stageHadCriticalBreach(r.Context(), runID, body.Stage); breached { body.Passed = false thresholdDetail = detail a.appendLog(runID, "error", fmt.Sprintf("%s reported passed but %s — flipping to failed", body.Stage, detail)) } } stageState := model.StagePassed if !body.Passed { stageState = model.StageFailed } summaryJSON := "" if len(body.Summary) > 0 { summaryJSON = string(body.Summary) } if err := a.Runner.CompleteStage(r.Context(), runID, body.Stage, stageState, summaryJSON); err != nil { http.Error(w, "complete stage: "+err.Error(), http.StatusInternalServerError) return } if thresholdDetail != "" && body.Message == "" { body.Message = thresholdDetail } // Agent-authored sub-steps: persist in slice order (ordinal = index) // and fan out a per-row SSE event each so the detail pane shows them // without a reload. Best-effort — a persistence error is logged but // doesn't fail the whole /result. a.persistSubSteps(r.Context(), runID, body.Stage, body.SubSteps) // Inventory-specific: persist artifact + compute spec diff. if body.Stage == "Inventory" && body.Inventory != nil { if err := a.persistInventory(r, run, body.Inventory); err != nil { log.Printf("persist inventory run %d: %v", runID, err) } } // Firmware-specific: persist each snapshot into firmware_snapshots. // SpecValidate reads them back to diff against expected_firmware. if body.Stage == "Firmware" && len(body.Firmware) > 0 { if err := a.persistFirmware(r.Context(), runID, body.Firmware); err != nil { log.Printf("persist firmware run %d: %v", runID, err) } } if !body.Passed { if err := a.Runs.SetFailedStage(r.Context(), runID, body.Stage); err != nil { log.Printf("set failed stage: %v", err) } if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("result: failed-transition run %d: %v", runID, err) http.Error(w, "transition", http.StatusConflict) return } hostName := a.hostNameFor(r.Context(), run.HostID) detail := body.Message if detail == "" { detail = "stage reported failure" } a.dispatchEvent(notify.Event{ Kind: notify.KindStageFailed, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s FAILED: %s", hostName, body.Stage), Body: fmt.Sprintf("Run %d on %s failed at stage %s.\n%s", runID, hostName, body.Stage, detail), URL: a.runLinkURL(runID), }) writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": "FailedHolding"}) return } // Passed: advance to the next stage in the pipeline. next, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted) if err != nil { http.Error(w, "advance: "+err.Error(), http.StatusConflict) return } log.Printf("result: run %d stage %s passed → %s", runID, body.Stage, next) // If the just-advanced-into state is SpecValidate or Reporting, the // orchestrator owns those stages entirely. The resolve function may // transition further (→ next stage on pass, → FailedHolding on fail, // → Completed for Reporting), so we re-read the run after each. if next == model.StateSpecValidate { a.resolveSpecValidate(r, runID) if after, err := a.Runs.Get(r.Context(), runID); err == nil { next = after.State } } if next == model.StateReporting { a.resolveReporting(r, runID) if after, err := a.Runs.Get(r.Context(), runID); err == nil { next = after.State } } writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": string(next)}) } // persistSubSteps writes each reported sub-step as a row keyed by // (runID, stage, ordinal) where ordinal is the slice index, then emits // a per-row SSE event so an open detail page updates without a reload. // Silently no-ops when SubSteps is unwired (tests that don't supply a // store) or the slice is empty. func (a *Agent) persistSubSteps(ctx context.Context, runID int64, stage string, lines []SubStepResultLine) { if a.SubSteps == nil || len(lines) == 0 { return } for i, line := range lines { state := model.StagePassed switch { case line.Skipped: state = model.StageSkipped case !line.Passed: state = model.StageFailed } started := parseResultTime(line.StartedAt) completed := parseResultTime(line.CompletedAt) summaryJSON := "" if len(line.Summary) > 0 { summaryJSON = string(line.Summary) } ss := model.SubStep{ RunID: runID, StageName: stage, Ordinal: i, Name: line.Name, State: state, StartedAt: started, CompletedAt: completed, SummaryJSON: summaryJSON, } if err := a.SubSteps.Upsert(ctx, ss); err != nil { log.Printf("substep upsert run=%d stage=%s ord=%d: %v", runID, stage, i, err) continue } if a.Runner != nil { a.Runner.PublishSubStepUpdate(ctx, ss) } } } // parseResultTime tolerates RFC3339 / RFC3339Nano and returns nil for // empty or unparseable values so a missing timestamp doesn't block the // persist path. func parseResultTime(s string) *time.Time { if s == "" { return nil } if t, err := time.Parse(time.RFC3339Nano, s); err == nil { return &t } if t, err := time.Parse(time.RFC3339, s); err == nil { return &t } return nil } // persistFirmware writes the reported snapshots. A nil/unset a.Firmware // store is a no-op so tests that don't wire it up stay green; a mid-run // persist error is logged but doesn't fail the stage (Firmware is // advisory — SpecValidate is the gate). func (a *Agent) persistFirmware(ctx context.Context, runID int64, lines []FirmwareLine) error { if a.Firmware == nil || len(lines) == 0 { return nil } rows := make([]store.FirmwareSnapshot, 0, len(lines)) for _, l := range lines { raw := "{}" if len(l.Raw) > 0 { if b, err := json.Marshal(l.Raw); err == nil { raw = string(b) } } rows = append(rows, store.FirmwareSnapshot{ RunID: runID, Component: l.Component, Identifier: l.Identifier, Version: l.Version, Vendor: l.Vendor, RawJSON: raw, }) } return a.Firmware.CreateBatch(ctx, rows) } func (a *Agent) persistInventory(r *http.Request, run *model.Run, inv *spec.Inventory) error { dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", run.ID)) if err := os.MkdirAll(dir, 0o755); err != nil { return err } path := filepath.Join(dir, "inventory.json") buf, err := json.MarshalIndent(inv, "", " ") if err != nil { return err } if err := os.WriteFile(path, buf, 0o644); err != nil { return err } sum := sha256.Sum256(buf) _, err = a.Artifacts.Create(r.Context(), store.Artifact{ RunID: run.ID, Kind: "inventory", Path: path, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(buf)), }) return err } // resolveSpecValidate runs the expected-vs-actual diff against the // just-stored inventory artifact, persists spec_diffs rows, and drives // the state machine — all on the server. The agent does nothing for // this stage. func (a *Agent) resolveSpecValidate(r *http.Request, runID int64) { run, err := a.Runs.Get(r.Context(), runID) if err != nil { log.Printf("specvalidate: get run: %v", err) return } host, err := a.Hosts.Get(r.Context(), run.HostID) if err != nil { log.Printf("specvalidate: get host: %v", err) return } expected, err := spec.Parse(host.ExpectedSpecYAML) if err != nil { log.Printf("specvalidate: parse expected yaml: %v", err) a.failStage(r, runID, "SpecValidate", "malformed expected spec: "+err.Error()) return } inv, err := a.readInventoryArtifact(r, runID) if err != nil { log.Printf("specvalidate: read inventory: %v", err) a.failStage(r, runID, "SpecValidate", "missing inventory artifact") return } diffs := spec.Diff(expected, inv) if a.Firmware != nil && len(expected.Firmware) > 0 { snaps, err := a.Firmware.ListForRun(r.Context(), runID) if err != nil { log.Printf("specvalidate: list firmware: %v", err) } else { observed := make([]spec.FirmwareObserved, 0, len(snaps)) for _, s := range snaps { observed = append(observed, spec.FirmwareObserved{ Component: s.Component, Identifier: s.Identifier, Version: s.Version, }) } diffs = append(diffs, spec.DiffFirmware(expected.Firmware, observed)...) } } if err := a.SpecDiffs.ReplaceForRun(r.Context(), runID, diffs); err != nil { log.Printf("specvalidate: write diffs: %v", err) } if err := a.Stages.StartByName(r.Context(), runID, "SpecValidate"); err != nil { log.Printf("specvalidate: start stage: %v", err) } critical := 0 for _, d := range diffs { if d.Severity == "critical" && !d.Ignored { critical++ } } summaryBuf, _ := json.Marshal(map[string]any{ "diffs": len(diffs), "critical": critical, }) if critical > 0 { _ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StageFailed, string(summaryBuf)) _ = a.Runs.SetFailedStage(r.Context(), runID, "SpecValidate") if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("specvalidate: failed-transition: %v", err) } a.appendLog(runID, "error", fmt.Sprintf("SpecValidate: %d critical diff(s) — holding host", critical)) hostName := a.hostNameFor(r.Context(), run.HostID) a.dispatchEvent(notify.Event{ Kind: notify.KindSpecMismatch, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s spec mismatch (%d critical)", hostName, critical), Body: fmt.Sprintf("SpecValidate found %d critical diff(s) on %s. Host is held for inspection.", critical, hostName), URL: a.runLinkURL(runID), }) } else { _ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StagePassed, string(summaryBuf)) if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted); err != nil { log.Printf("specvalidate: advance: %v", err) } a.appendLog(runID, "info", "SpecValidate: all fields match expected spec") } } func (a *Agent) readInventoryArtifact(r *http.Request, runID int64) (*spec.Inventory, error) { arts, err := a.Artifacts.ListForRun(r.Context(), runID) if err != nil { return nil, err } for i := len(arts) - 1; i >= 0; i-- { if arts[i].Kind == "inventory" { buf, err := os.ReadFile(arts[i].Path) if err != nil { return nil, err } var inv spec.Inventory if err := json.Unmarshal(buf, &inv); err != nil { return nil, err } return &inv, nil } } return nil, errors.New("no inventory artifact") } func (a *Agent) failStage(r *http.Request, runID int64, stage, message string) { _ = a.Runner.CompleteStage(r.Context(), runID, stage, model.StageFailed, fmt.Sprintf(`{"error":%q}`, message)) _ = a.Runs.SetFailedStage(r.Context(), runID, stage) if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("failStage: transition run %d: %v", runID, err) } a.appendLog(runID, "error", stage+": "+message) } func (a *Agent) appendLog(runID int64, level, text string) { if a.Logs == nil { return } w, err := a.Logs.WriterFor(runID) if err != nil { log.Printf("appendLog: %v", err) return } w.Append(logs.Line{Level: level, Text: text}) } // Hold issues the per-run ephemeral ed25519 keypair: the agent gets // the authorized_keys line, the orchestrator keeps the privkey on disk. // Hold also records the agent's reported IP so the tile can print the // ssh invocation. type HoldRequest struct { AgentIP string `json:"agent_ip"` } type HoldResponse struct { AuthorizedKey string `json:"authorized_key"` RunID int64 `json:"run_id"` } func (a *Agent) Hold(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } var body HoldRequest _ = json.NewDecoder(r.Body).Decode(&body) agentIP := strings.TrimSpace(body.AgentIP) if agentIP == "" { if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { agentIP = host } } if agentIP != "" { if err := a.Runs.SetHoldIP(r.Context(), runID, agentIP); err != nil { log.Printf("hold: set hold_ip: %v", err) } } kp, err := hold.Issue(runID) if err != nil { http.Error(w, "generate key: "+err.Error(), http.StatusInternalServerError) return } keyPath := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID), "hold.key") abs, err := kp.WritePrivateTo(keyPath) if err != nil { http.Error(w, "write key: "+err.Error(), http.StatusInternalServerError) return } sum := sha256.Sum256(kp.PrivatePEM) if _, err := a.Artifacts.Create(r.Context(), store.Artifact{ RunID: runID, Kind: "hold_key", Path: abs, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(kp.PrivatePEM)), }); err != nil { log.Printf("hold: record artifact: %v", err) } a.appendLog(runID, "info", fmt.Sprintf("Hold key issued. SSH in with: ssh -i %s root@%s", abs, agentIP)) hostID := mustHostID(a, r, runID) if hostID != 0 { hostName := a.hostNameFor(r.Context(), hostID) a.dispatchEvent(notify.Event{ Kind: notify.KindHoldingOpened, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s holding — SSH ready", hostName), Body: fmt.Sprintf("Host %s is holding at %s.\nssh -i %s root@%s", hostName, agentIP, abs, agentIP), URL: a.runLinkURL(runID), }) } // Refresh the tile + all detail-page fragments so the operator // sees the ssh command and the hold banner without reloading. if id := mustHostID(a, r, runID); id != 0 && a.Runner != nil { a.Runner.PublishTileUpdate(r.Context(), id) } writeJSON(w, http.StatusOK, HoldResponse{AuthorizedKey: kp.AuthorizedKey, RunID: runID}) } // dispatchEvent hands an already-populated Event to the notify Registry // if one is wired. Handler code uses hostNameFor to resolve the host // name for the event payload; this keeps call sites terse. func (a *Agent) dispatchEvent(ev notify.Event) { if a.Notify == nil { return } a.Notify.Dispatch(ev) } // hostNameFor returns a human-readable host name for a run, or "host-N" // if the lookup fails — notifications should never fail silently over a // missing name. func (a *Agent) hostNameFor(ctx context.Context, hostID int64) string { if host, err := a.Hosts.Get(ctx, hostID); err == nil && host != nil { return host.Name } return fmt.Sprintf("host-%d", hostID) } func (a *Agent) runLinkURL(runID int64) string { if a.PublicURL == "" { return "" } return strings.TrimRight(a.PublicURL, "/") + "/reports/" + fmt.Sprintf("%d", runID) } func mustHostID(a *Agent, r *http.Request, runID int64) int64 { run, err := a.Runs.Get(r.Context(), runID) if err != nil || run == nil { return 0 } return run.HostID } // ===== Phase 4 endpoints ================================================= // SensorBatch is what the agent POSTs to /sensor: a stream of numeric // samples (temps, fan rpm, PSU rails, iperf throughput). Each sample is // (kind, key, value, unit). Timestamps default to server-now when empty // so the thermal sidecar doesn't have to carry a clock. type SensorBatch struct { Samples []SensorSample `json:"samples"` } type SensorSample struct { TS string `json:"ts,omitempty"` Kind string `json:"kind"` // temp|fan|psu_volt|iperf|fio|smart_attr Key string `json:"key"` Value float64 `json:"value"` Unit string `json:"unit,omitempty"` } // Sensor persists a batch of numeric samples. The thermal sidecar hits // this on a tick; stage executors (iperf, fio) also drop here. Each // sample is evaluated against the run's seeded thresholds — critical // breaches fail the run immediately (thermal runaway, EDAC UE, voltage // sag); warning breaches are recorded for the report only. func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } if a.Measurements == nil { http.Error(w, "measurements store not wired", http.StatusInternalServerError) return } var body SensorBatch if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } rows := make([]model.Measurement, 0, len(body.Samples)) sampleStages := make([]string, 0, len(body.Samples)) for _, s := range body.Samples { ts, _ := time.Parse(time.RFC3339Nano, s.TS) if ts.IsZero() { ts = time.Now().UTC() } rows = append(rows, model.Measurement{ RunID: runID, TS: ts, Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit, }) // Stage the sample belongs to drives threshold selector // matching. We use the run's current state — the agent does // not tag samples with a stage. sampleStages = append(sampleStages, orchestrator.StageNameForState(run.State)) } if err := a.Measurements.CreateBatch(r.Context(), rows); err != nil { http.Error(w, "write samples: "+err.Error(), http.StatusInternalServerError) return } critical := a.evaluateSensorBatch(r.Context(), runID, rows, sampleStages) writeJSON(w, http.StatusOK, map[string]any{ "ok": true, "written": len(rows), "breach": critical != "", "breach_kind": critical, }) if critical != "" { a.failRunOnCriticalBreach(r, run, critical) } } // evaluateSensorBatch runs each sample through the run's thresholds, // persists evaluations, and returns a short human-readable label for // the first critical breach it sees (empty when all samples pass or // only hit warning-severity rules). func (a *Agent) evaluateSensorBatch(ctx context.Context, runID int64, rows []model.Measurement, sampleStages []string) string { if a.Thresholds == nil || len(rows) == 0 { return "" } rules, err := a.Thresholds.ListForRun(ctx, runID) if err != nil { log.Printf("sensor: list thresholds run %d: %v", runID, err) return "" } if len(rules) == 0 { return "" } evalRules := make([]orchestrator.Threshold, 0, len(rules)) for _, r := range rules { evalRules = append(evalRules, orchestrator.Threshold{ ID: r.ID, Stage: r.Stage, Kind: r.Kind, Key: r.Key, Op: orchestrator.ThresholdOp(r.Op), Value: r.Threshold, Nominal: r.Nominal, Severity: orchestrator.ThresholdSeverity(r.Severity), }) } evals := make([]store.ThresholdEvaluation, 0, len(rows)) critical := "" for i, m := range rows { sample := orchestrator.Sample{ Stage: sampleStages[i], Kind: m.Kind, Key: m.Key, Value: m.Value, } for _, res := range orchestrator.Evaluate(sample, evalRules) { evals = append(evals, store.ThresholdEvaluation{ RunID: runID, ThresholdID: res.Threshold.ID, Stage: sample.Stage, Kind: sample.Kind, Key: sample.Key, TS: m.TS, Observed: res.Observed, Passed: res.Passed, }) if critical == "" && res.CriticalBreach() { critical = fmt.Sprintf("%s %s=%g breached %s %g", res.Threshold.Kind, sample.Key, res.Observed, res.Threshold.Op, res.Threshold.Value) } } } if err := a.Thresholds.RecordBatch(ctx, evals); err != nil { log.Printf("sensor: record evals run %d: %v", runID, err) } return critical } // stageHadCriticalBreach returns true if any critical-severity // threshold evaluation for this run matched samples attributed to the // given stage (stage selector "*" or exact). Called at /result close // so even an agent that reports Passed=true gets overridden when the // aggregate view says the stage tripped a gate. func (a *Agent) stageHadCriticalBreach(ctx context.Context, runID int64, stage string) (bool, string) { if a.Thresholds == nil { return false, "" } breaches, err := a.Thresholds.CriticalBreaches(ctx, runID) if err != nil { log.Printf("result: list breaches run %d: %v", runID, err) return false, "" } for _, b := range breaches { if b.Stage == stage || b.Stage == "" || b.Stage == "*" { return true, fmt.Sprintf("critical threshold breach: %s %s=%g", b.Kind, b.Key, b.Observed) } } return false, "" } // failRunOnCriticalBreach flips the run to FailedHolding in response // to a live threshold breach (thermal runaway, EDAC UE, rail sag). // The agent's pending /result for the current stage may still arrive — // the silent-skip guard handles that by refusing to double-transition. func (a *Agent) failRunOnCriticalBreach(r *http.Request, run *model.Run, detail string) { stage := orchestrator.StageNameForState(run.State) if stage == "" { stage = "threshold" } if err := a.Runs.SetFailedStage(r.Context(), run.ID, stage+" (threshold)"); err != nil { log.Printf("sensor: set failed stage run %d: %v", run.ID, err) } if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerStageFailed); err != nil { // If we're already in FailedHolding the transition errors — // that's fine, the first breach wins. log.Printf("sensor: fail-transition run %d: %v", run.ID, err) return } hostName := a.hostNameFor(r.Context(), run.HostID) a.dispatchEvent(notify.Event{ Kind: notify.KindStageFailed, Severity: notify.SeverityCritical, RunID: run.ID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s FAILED: %s (threshold)", hostName, stage), Body: fmt.Sprintf("Run %d on %s tripped a critical threshold during %s: %s", run.ID, hostName, stage, detail), URL: a.runLinkURL(run.ID), }) a.appendLog(run.ID, "error", fmt.Sprintf("threshold breach during %s: %s — run parked in FailedHolding", stage, detail)) } // resolveReporting runs when the pipeline advances into StateReporting. // It's an orchestrator-owned stage like SpecValidate: no agent action. // Writes a JSON report bundling run + stages + diffs + measurements, // then advances the run to Completed. Heartbeat will then return abort // and the agent will power the host off in Phase 5. func (a *Agent) resolveReporting(r *http.Request, runID int64) { ctx := r.Context() if err := a.Stages.StartByName(ctx, runID, "Reporting"); err != nil { log.Printf("reporting: start stage: %v", err) } run, err := a.Runs.Get(ctx, runID) if err != nil { log.Printf("reporting: get run: %v", err) return } host, err := a.Hosts.Get(ctx, run.HostID) if err != nil { log.Printf("reporting: get host: %v", err) return } stages, err := a.Stages.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list stages: %v", err) } diffs, err := a.SpecDiffs.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list diffs: %v", err) } var measurements []model.Measurement if a.Measurements != nil { measurements, err = a.Measurements.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list measurements: %v", err) } } var firmware []store.FirmwareSnapshot if a.Firmware != nil { firmware, err = a.Firmware.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list firmware: %v", err) } } // Inventory is optional for reporting: runs cancelled before the // Inventory stage finishes won't have the artifact. Failure to load // is logged but never aborts the report. inventory, err := a.readInventoryArtifact(r, runID) if err != nil { log.Printf("reporting: read inventory: %v", err) inventory = nil } bundle := map[string]any{ "run": run, "host": host, "stages": stages, "spec_diffs": diffs, "measurements": measurements, "firmware": firmware, "generated_at": time.Now().UTC().Format(time.RFC3339), } buf, err := json.MarshalIndent(bundle, "", " ") if err != nil { log.Printf("reporting: marshal: %v", err) a.failStage(r, runID, "Reporting", "marshal report: "+err.Error()) return } dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID)) if err := os.MkdirAll(dir, 0o755); err != nil { a.failStage(r, runID, "Reporting", "mkdir: "+err.Error()) return } path := filepath.Join(dir, "report.json") if err := os.WriteFile(path, buf, 0o644); err != nil { a.failStage(r, runID, "Reporting", "write: "+err.Error()) return } sum := sha256.Sum256(buf) if _, err := a.Artifacts.Create(ctx, store.Artifact{ RunID: runID, Kind: "report", Path: path, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(buf)), }); err != nil { log.Printf("reporting: record artifact: %v", err) } // Also render the operator-facing HTML summary alongside the JSON. // Failures here are non-fatal — the JSON is the source of truth. if host != nil { fwRows := make([]report.FirmwareSnapshot, 0, len(firmware)) for _, f := range firmware { fwRows = append(fwRows, report.FirmwareSnapshot{ Component: f.Component, Identifier: f.Identifier, Version: f.Version, Vendor: f.Vendor, }) } htmlData := report.Data{ GeneratedAt: time.Now().UTC(), Run: *run, Host: *host, Stages: stages, SpecDiffs: diffs, Aggregates: report.AggregateMeasurements(measurements), Firmware: fwRows, Inventory: inventory, } if htmlBuf, err := report.RenderHTML(htmlData); err != nil { log.Printf("reporting: render html: %v", err) } else { htmlPath := filepath.Join(dir, "report.html") if err := os.WriteFile(htmlPath, htmlBuf, 0o644); err != nil { log.Printf("reporting: write html: %v", err) } else { htmlSum := sha256.Sum256(htmlBuf) if _, err := a.Artifacts.Create(ctx, store.Artifact{ RunID: runID, Kind: "report_html", Path: htmlPath, SHA256: hex.EncodeToString(htmlSum[:]), SizeBytes: int64(len(htmlBuf)), }); err != nil { log.Printf("reporting: record html artifact: %v", err) } } } } summaryBuf, _ := json.Marshal(map[string]any{ "report_path": path, "stages": len(stages), "diffs": len(diffs), }) if err := a.Runner.CompleteStage(ctx, runID, "Reporting", model.StagePassed, string(summaryBuf)); err != nil { log.Printf("reporting: complete stage: %v", err) } if err := a.Runs.MarkCompleted(ctx, runID, path); err != nil { log.Printf("reporting: mark completed: %v", err) } a.appendLog(runID, "info", "Reporting: wrote "+path+"; run completed.") // Publish a final tile + detail update so the dashboard flips to // pass mood and the detail page's summary/actions update without // the operator reloading. if host != nil && a.Runner != nil { a.Runner.PublishTileUpdate(ctx, host.ID) } hostName := "host" if host != nil { hostName = host.Name } a.dispatchEvent(notify.Event{ Kind: notify.KindRunCompleted, Severity: notify.SeverityInfo, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s passed vetting", hostName), Body: fmt.Sprintf("Run %d on %s completed all stages. Report: %s", runID, hostName, path), URL: a.runLinkURL(runID), }) }