Files
Vetting/internal/api/agent_handlers.go
T
josh 4524ab8dc0
CI / Lint + build + test (push) Successful in 2m5s
Release / release (push) Successful in 3m5s
runs: add non-destructive flag + operator Cancel button
Non-destructive pre-declares "don't touch the disks" on Start: the
Storage stage skips wipe-probe, badblocks -w, and write-mode fio,
and reports a read-only summary. Runs a new non_destructive column;
threaded through Claim → agent tests.Deps → Storage stage.

Cancel halts an in-flight run. The orchestrator transitions to a
new StateCancelled via TriggerOperatorCancelled (valid from any
active state); the agent's next heartbeat returns cmd=cancel_stage,
which fires a stored CancelFunc on the per-stage context. Stage
subprocesses spawned with exec.CommandContext die with the context,
the agent posts a cancelled outcome, then powers the host off.

Destructive stages mid-run may leave the host in an intermediate
state — the UI confirm dialog warns the operator; recovery is
manual for now.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 13:01:42 -04:00

930 lines
30 KiB
Go

package api
import (
"context"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"vetting/internal/events"
"vetting/internal/hold"
"vetting/internal/logs"
"vetting/internal/model"
"vetting/internal/notify"
"vetting/internal/orchestrator"
"vetting/internal/pxe"
"vetting/internal/report"
"vetting/internal/spec"
"vetting/internal/store"
)
// Agent collects the collaborators used by agent-facing HTTP routes:
// the iPXE chainload endpoint and the /api/v1/runs/:id/* endpoints.
type Agent struct {
Hosts *store.Hosts
Runs *store.Runs
Stages *store.Stages
Artifacts *store.Artifacts
SpecDiffs *store.SpecDiffs
Measurements *store.Measurements
Runner *orchestrator.Runner
EventHub *events.Hub
Logs *logs.Hub
Notify *notify.Registry
ArtifactsDir string // ./var/artifacts
OrchestratorURL string // baked into iPXE cmdline
PublicURL string // user-visible URL base for notification click-throughs
LiveKernelURL string
LiveInitrdURL string
TLSCertFPR string // optional; empty = skip pinning
IperfPort int // orchestrator-supervised iperf3 port; 0 = 5201
}
// IPXEScript serves a per-MAC iPXE script. Called by iPXE itself after
// dnsmasq hands it the chainload URL. Unknown MAC → halt script.
// Known MAC with no active run → poweroff script. Known MAC with active
// run → real boot script; the fetch triggers PXEObserved.
func (a *Agent) IPXEScript(w http.ResponseWriter, r *http.Request) {
mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac")))
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
if !macRe.MatchString(mac) {
log.Printf("ipxe: rejected malformed mac %q from %s", mac, r.RemoteAddr)
_, _ = w.Write([]byte(pxe.NotRegisteredScript(mac)))
return
}
run, err := a.Runs.FindActiveByMAC(r.Context(), mac)
if err != nil {
log.Printf("ipxe: find run by mac %s: %v", mac, err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
if run == nil {
_, _ = w.Write([]byte(pxe.NoActiveRunScript(mac)))
return
}
// The token hash in the DB is the sha256 of the plaintext. The
// plaintext itself cannot be recovered from the hash — we issued it
// once when the run was created. For iPXE we re-issue a fresh token
// on every PXE fetch: this is safe because the hash in the DB is
// rewritten to match and only the most recent PXE can be claimed.
plain, hash, err := orchestrator.IssueRunToken()
if err != nil {
http.Error(w, "token", http.StatusInternalServerError)
return
}
if err := a.Runs.RotateTokenHash(r.Context(), run.ID, hash); err != nil {
log.Printf("ipxe: rotate token run %d: %v", run.ID, err)
http.Error(w, "token", http.StatusInternalServerError)
return
}
script := pxe.BuildScript(pxe.IPXEParams{
OrchestratorURL: a.OrchestratorURL,
LiveKernelURL: a.LiveKernelURL,
LiveInitrdURL: a.LiveInitrdURL,
TLSCertFPR: a.TLSCertFPR,
RunID: run.ID,
MAC: mac,
Token: plain,
})
_, _ = w.Write([]byte(script))
// iPXE has now fetched the script — treat this as PXEObserved. If we
// were already in Booting the transition table allows staying.
if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerPXEObserved); err != nil {
// Non-fatal: the agent may still claim via /claim.
log.Printf("ipxe: PXEObserved for run %d: %v", run.ID, err)
}
}
// Hello is the first call an agent makes once userspace is up. It's
// idempotent and only writes a log line; the authoritative transition
// comes from /claim. The agent sends Hello early so operators see a
// signal in the tile even before the token is validated.
func (a *Agent) Hello(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
log.Printf("agent hello: run=%d remote=%s", runID, r.RemoteAddr)
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "run_id": runID})
}
// Claim is the binding call: the agent proves it holds the plaintext
// token for this run, and in return the orchestrator transitions to
// InventoryCheck and seeds the stage rows. All destructive actions the
// agent takes later require a prior successful claim.
func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
var body struct {
AgentIP string `json:"agent_ip"`
}
if r.Body != nil {
// agent_ip is informational; if missing fall back to RemoteAddr.
_ = json.NewDecoder(r.Body).Decode(&body)
}
agentIP := strings.TrimSpace(body.AgentIP)
if agentIP == "" {
if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
agentIP = host
} else {
agentIP = r.RemoteAddr
}
}
// First claim seeds the stage rows; subsequent claims are a no-op
// so agent retries after transient network failures stay safe.
if len(mustListStages(a.Stages, r, runID)) == 0 {
if err := a.Stages.Seed(r.Context(), runID); err != nil {
log.Printf("claim: seed stages run %d: %v", runID, err)
http.Error(w, "seed stages", http.StatusInternalServerError)
return
}
}
// Drive the transition. If we're already past Booting this returns
// an error — treat as "already claimed" and report OK, don't 500.
if run.State == model.StateWaitingWoL || run.State == model.StateBooting {
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerAgentClaimed); err != nil {
log.Printf("claim: transition run %d: %v", runID, err)
http.Error(w, "transition", http.StatusConflict)
return
}
}
log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP)
if a.Logs != nil {
if w, err := a.Logs.WriterFor(runID); err == nil {
w.Append(logs.Line{Level: "info", Text: fmt.Sprintf("agent claimed from %s — entering Inventory", agentIP)})
}
}
// Stage-driven agent needs a bit of per-run config: the device
// allowlist (serial + expected size) for Storage, and the iperf3
// server port for Network. Parse the host's expected spec here so
// the agent doesn't need to read YAML.
expectedDisks := []map[string]any{}
if host, err := a.Hosts.Get(r.Context(), run.HostID); err == nil && host != nil {
if parsed, err := spec.Parse(host.ExpectedSpecYAML); err == nil && parsed != nil {
for _, dd := range parsed.Disks {
expectedDisks = append(expectedDisks, map[string]any{
"serial": dd.Serial,
"size_gb": dd.SizeGB,
})
}
}
}
iperfPort := a.IperfPort
if iperfPort == 0 {
iperfPort = 5201
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"run_id": runID,
"stages": store.DefaultStageOrder,
"expected_disks": expectedDisks,
"iperf_port": iperfPort,
"non_destructive": run.NonDestructive,
})
}
// Heartbeat is the agent's periodic liveness ping. The response body
// acts as a control channel: cmd=continue is the normal case; cmd=abort
// once the run enters FailedHolding/Released; cmd=retry_stage when the
// operator has overridden a failed stage (wipe-probe override).
func (a *Agent) Heartbeat(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
a.Runner.TouchHeartbeat(runID)
cmd := "continue"
resp := map[string]any{"state": run.State}
switch {
case run.State == model.StateCompleted:
// Pipeline succeeded — agent should power the host down.
cmd = "shutdown"
case run.State == model.StateCancelled:
// Operator clicked Cancel — agent cancels the active stage ctx,
// posts a cancelled outcome, and powers off.
cmd = "cancel_stage"
case run.State == model.StateFailedHolding || run.State == model.StateReleased:
cmd = "abort"
case run.FailedStage == "Storage" && overrideWipeSet(run.OverrideFlagsJSON):
// Operator pressed "Override wipe & retry". Agent should
// re-enter Storage with the wipe-probe bypass armed.
cmd = "retry_stage"
resp["stage"] = "Storage"
resp["override_flags"] = json.RawMessage(run.OverrideFlagsJSON)
}
resp["cmd"] = cmd
writeJSON(w, http.StatusOK, resp)
}
// overrideWipeSet inspects a Run.OverrideFlagsJSON blob for the wipe flag.
// Malformed JSON is ignored — the operator has to reapply the override if
// it didn't round-trip correctly.
func overrideWipeSet(blob string) bool {
if blob == "" {
return false
}
var flags struct {
Wipe bool `json:"wipe"`
}
_ = json.Unmarshal([]byte(blob), &flags)
return flags.Wipe
}
// authenticate verifies the Bearer token against the run's stored hash
// and returns the Run for downstream handlers. Responds 401/404 on
// failure and returns ok=false so the caller can bail early.
func (a *Agent) authenticate(w http.ResponseWriter, r *http.Request, runID int64) (*model.Run, bool) {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
http.Error(w, "run not found", http.StatusNotFound)
return nil, false
}
http.Error(w, "internal error", http.StatusInternalServerError)
return nil, false
}
token := bearerToken(r)
if token == "" {
http.Error(w, "missing bearer", http.StatusUnauthorized)
return nil, false
}
presented := orchestrator.HashRunToken(token)
if subtle.ConstantTimeCompare([]byte(presented), []byte(run.AgentTokenHash)) != 1 {
http.Error(w, "bad token", http.StatusUnauthorized)
return nil, false
}
return run, true
}
func bearerToken(r *http.Request) string {
h := r.Header.Get("Authorization")
if !strings.HasPrefix(h, "Bearer ") {
return ""
}
return strings.TrimSpace(strings.TrimPrefix(h, "Bearer "))
}
func runIDFromURL(w http.ResponseWriter, r *http.Request) (int64, bool) {
idStr := chi.URLParam(r, "id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil || id <= 0 {
http.Error(w, "bad run id", http.StatusBadRequest)
return 0, false
}
return id, true
}
func writeJSON(w http.ResponseWriter, status int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(body)
}
// mustListStages is a small wrapper that hides the error path from
// /claim — a DB read failure just pretends there are zero stages, and
// the subsequent Seed will surface the real error.
func mustListStages(s *store.Stages, r *http.Request, runID int64) []model.Stage {
rows, err := s.ListForRun(r.Context(), runID)
if err != nil {
return nil
}
return rows
}
// ===== Phase 3 endpoints =================================================
// LogBatch is what the agent POSTs to /log: zero or more lines with
// timestamp + level + text. Lines are written in order to the per-run
// file and fanned out on the SSE hub.
type LogBatch struct {
Lines []LogLine `json:"lines"`
}
type LogLine struct {
TS string `json:"ts,omitempty"` // RFC3339Nano; server clock used if empty
Level string `json:"level,omitempty"` // info|warn|error|debug
Stage string `json:"stage,omitempty"` // optional stage tag for per-stage log fan-out
Text string `json:"text"`
}
// Log accepts a batch of log lines from the agent. Empty batches are
// legal (useful for agent-side flush ping).
func (a *Agent) Log(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
var batch LogBatch
if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
writer, err := a.Logs.WriterFor(runID)
if err != nil {
http.Error(w, "open log: "+err.Error(), http.StatusInternalServerError)
return
}
for _, l := range batch.Lines {
ts, _ := time.Parse(time.RFC3339Nano, l.TS)
writer.Append(logs.Line{TS: ts, Level: l.Level, Stage: l.Stage, Text: l.Text})
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(batch.Lines)})
}
// StageResult is the body of /result. Kind is the stage name (from
// DefaultStageOrder); Passed drives StageCompleted vs StageFailed.
// Inventory is optional and only set when kind == "Inventory" — the
// orchestrator persists it as an artifact and feeds it to spec.Diff.
type StageResult struct {
Stage string `json:"stage"`
Passed bool `json:"passed"`
Summary json.RawMessage `json:"summary,omitempty"`
Inventory *spec.Inventory `json:"inventory,omitempty"`
Message string `json:"message,omitempty"`
}
// Result receives a stage's outcome. Flow:
// 1. Mark the stage row passed/failed + record summary JSON.
// 2. For Inventory: persist the inventory artifact.
// 3. For Inventory (on pass): run spec diff server-side, persist rows,
// bump the run into SpecValidate and immediately resolve SpecValidate
// from that diff — the agent isn't involved in SpecValidate at all.
// 4. Transition the run via StageCompleted/StageFailed.
func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
var body StageResult
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
body.Stage = strings.TrimSpace(body.Stage)
if _, ok := orchestrator.StateForStage(body.Stage); !ok {
http.Error(w, "unknown stage: "+body.Stage, http.StatusBadRequest)
return
}
stageState := model.StagePassed
if !body.Passed {
stageState = model.StageFailed
}
summaryJSON := ""
if len(body.Summary) > 0 {
summaryJSON = string(body.Summary)
}
if err := a.Runner.CompleteStage(r.Context(), runID, body.Stage, stageState, summaryJSON); err != nil {
http.Error(w, "complete stage: "+err.Error(), http.StatusInternalServerError)
return
}
// Inventory-specific: persist artifact + compute spec diff.
if body.Stage == "Inventory" && body.Inventory != nil {
if err := a.persistInventory(r, run, body.Inventory); err != nil {
log.Printf("persist inventory run %d: %v", runID, err)
}
}
if !body.Passed {
if err := a.Runs.SetFailedStage(r.Context(), runID, body.Stage); err != nil {
log.Printf("set failed stage: %v", err)
}
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("result: failed-transition run %d: %v", runID, err)
http.Error(w, "transition", http.StatusConflict)
return
}
hostName := a.hostNameFor(r.Context(), run.HostID)
detail := body.Message
if detail == "" {
detail = "stage reported failure"
}
a.dispatchEvent(notify.Event{
Kind: notify.KindStageFailed,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s FAILED: %s", hostName, body.Stage),
Body: fmt.Sprintf("Run %d on %s failed at stage %s.\n%s", runID, hostName, body.Stage, detail),
URL: a.runLinkURL(runID),
})
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": "FailedHolding"})
return
}
// Passed: advance to the next stage in the pipeline.
next, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted)
if err != nil {
http.Error(w, "advance: "+err.Error(), http.StatusConflict)
return
}
log.Printf("result: run %d stage %s passed → %s", runID, body.Stage, next)
// If the just-advanced-into state is SpecValidate or Reporting, the
// orchestrator owns those stages entirely. The resolve function may
// transition further (→ next stage on pass, → FailedHolding on fail,
// → Completed for Reporting), so we re-read the run after each.
if next == model.StateSpecValidate {
a.resolveSpecValidate(r, runID)
if after, err := a.Runs.Get(r.Context(), runID); err == nil {
next = after.State
}
}
if next == model.StateReporting {
a.resolveReporting(r, runID)
if after, err := a.Runs.Get(r.Context(), runID); err == nil {
next = after.State
}
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": string(next)})
}
func (a *Agent) persistInventory(r *http.Request, run *model.Run, inv *spec.Inventory) error {
dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", run.ID))
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
path := filepath.Join(dir, "inventory.json")
buf, err := json.MarshalIndent(inv, "", " ")
if err != nil {
return err
}
if err := os.WriteFile(path, buf, 0o644); err != nil {
return err
}
sum := sha256.Sum256(buf)
_, err = a.Artifacts.Create(r.Context(), store.Artifact{
RunID: run.ID,
Kind: "inventory",
Path: path,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(buf)),
})
return err
}
// resolveSpecValidate runs the expected-vs-actual diff against the
// just-stored inventory artifact, persists spec_diffs rows, and drives
// the state machine — all on the server. The agent does nothing for
// this stage.
func (a *Agent) resolveSpecValidate(r *http.Request, runID int64) {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil {
log.Printf("specvalidate: get run: %v", err)
return
}
host, err := a.Hosts.Get(r.Context(), run.HostID)
if err != nil {
log.Printf("specvalidate: get host: %v", err)
return
}
expected, err := spec.Parse(host.ExpectedSpecYAML)
if err != nil {
log.Printf("specvalidate: parse expected yaml: %v", err)
a.failStage(r, runID, "SpecValidate", "malformed expected spec: "+err.Error())
return
}
inv, err := a.readInventoryArtifact(r, runID)
if err != nil {
log.Printf("specvalidate: read inventory: %v", err)
a.failStage(r, runID, "SpecValidate", "missing inventory artifact")
return
}
diffs := spec.Diff(expected, inv)
if err := a.SpecDiffs.ReplaceForRun(r.Context(), runID, diffs); err != nil {
log.Printf("specvalidate: write diffs: %v", err)
}
if err := a.Stages.StartByName(r.Context(), runID, "SpecValidate"); err != nil {
log.Printf("specvalidate: start stage: %v", err)
}
critical := 0
for _, d := range diffs {
if d.Severity == "critical" && !d.Ignored {
critical++
}
}
summaryBuf, _ := json.Marshal(map[string]any{
"diffs": len(diffs),
"critical": critical,
})
if critical > 0 {
_ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StageFailed, string(summaryBuf))
_ = a.Runs.SetFailedStage(r.Context(), runID, "SpecValidate")
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("specvalidate: failed-transition: %v", err)
}
a.appendLog(runID, "error", fmt.Sprintf("SpecValidate: %d critical diff(s) — holding host", critical))
hostName := a.hostNameFor(r.Context(), run.HostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindSpecMismatch,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s spec mismatch (%d critical)", hostName, critical),
Body: fmt.Sprintf("SpecValidate found %d critical diff(s) on %s. Host is held for inspection.", critical, hostName),
URL: a.runLinkURL(runID),
})
} else {
_ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StagePassed, string(summaryBuf))
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted); err != nil {
log.Printf("specvalidate: advance: %v", err)
}
a.appendLog(runID, "info", "SpecValidate: all fields match expected spec")
}
}
func (a *Agent) readInventoryArtifact(r *http.Request, runID int64) (*spec.Inventory, error) {
arts, err := a.Artifacts.ListForRun(r.Context(), runID)
if err != nil {
return nil, err
}
for i := len(arts) - 1; i >= 0; i-- {
if arts[i].Kind == "inventory" {
buf, err := os.ReadFile(arts[i].Path)
if err != nil {
return nil, err
}
var inv spec.Inventory
if err := json.Unmarshal(buf, &inv); err != nil {
return nil, err
}
return &inv, nil
}
}
return nil, errors.New("no inventory artifact")
}
func (a *Agent) failStage(r *http.Request, runID int64, stage, message string) {
_ = a.Runner.CompleteStage(r.Context(), runID, stage, model.StageFailed, fmt.Sprintf(`{"error":%q}`, message))
_ = a.Runs.SetFailedStage(r.Context(), runID, stage)
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil {
log.Printf("failStage: transition run %d: %v", runID, err)
}
a.appendLog(runID, "error", stage+": "+message)
}
func (a *Agent) appendLog(runID int64, level, text string) {
if a.Logs == nil {
return
}
w, err := a.Logs.WriterFor(runID)
if err != nil {
log.Printf("appendLog: %v", err)
return
}
w.Append(logs.Line{Level: level, Text: text})
}
// Hold issues the per-run ephemeral ed25519 keypair: the agent gets
// the authorized_keys line, the orchestrator keeps the privkey on disk.
// Hold also records the agent's reported IP so the tile can print the
// ssh invocation.
type HoldRequest struct {
AgentIP string `json:"agent_ip"`
}
type HoldResponse struct {
AuthorizedKey string `json:"authorized_key"`
RunID int64 `json:"run_id"`
}
func (a *Agent) Hold(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
var body HoldRequest
_ = json.NewDecoder(r.Body).Decode(&body)
agentIP := strings.TrimSpace(body.AgentIP)
if agentIP == "" {
if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
agentIP = host
}
}
if agentIP != "" {
if err := a.Runs.SetHoldIP(r.Context(), runID, agentIP); err != nil {
log.Printf("hold: set hold_ip: %v", err)
}
}
kp, err := hold.Issue(runID)
if err != nil {
http.Error(w, "generate key: "+err.Error(), http.StatusInternalServerError)
return
}
keyPath := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID), "hold.key")
abs, err := kp.WritePrivateTo(keyPath)
if err != nil {
http.Error(w, "write key: "+err.Error(), http.StatusInternalServerError)
return
}
sum := sha256.Sum256(kp.PrivatePEM)
if _, err := a.Artifacts.Create(r.Context(), store.Artifact{
RunID: runID,
Kind: "hold_key",
Path: abs,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(kp.PrivatePEM)),
}); err != nil {
log.Printf("hold: record artifact: %v", err)
}
a.appendLog(runID, "info", fmt.Sprintf("Hold key issued. SSH in with: ssh -i %s root@%s", abs, agentIP))
hostID := mustHostID(a, r, runID)
if hostID != 0 {
hostName := a.hostNameFor(r.Context(), hostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindHoldingOpened,
Severity: notify.SeverityCritical,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s holding — SSH ready", hostName),
Body: fmt.Sprintf("Host %s is holding at %s.\nssh -i %s root@%s", hostName, agentIP, abs, agentIP),
URL: a.runLinkURL(runID),
})
}
// Refresh the tile so the operator sees the ssh command.
host, _ := a.Hosts.Get(r.Context(), mustHostID(a, r, runID))
if host != nil {
latest, _ := a.Runs.Get(r.Context(), runID)
if orchestrator.TileRenderer != nil {
payload := orchestrator.TileRenderer(r.Context(), *host, latest)
a.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", host.ID), Payload: payload})
}
}
writeJSON(w, http.StatusOK, HoldResponse{AuthorizedKey: kp.AuthorizedKey, RunID: runID})
}
// dispatchEvent hands an already-populated Event to the notify Registry
// if one is wired. Handler code uses hostNameFor to resolve the host
// name for the event payload; this keeps call sites terse.
func (a *Agent) dispatchEvent(ev notify.Event) {
if a.Notify == nil {
return
}
a.Notify.Dispatch(ev)
}
// hostNameFor returns a human-readable host name for a run, or "host-N"
// if the lookup fails — notifications should never fail silently over a
// missing name.
func (a *Agent) hostNameFor(ctx context.Context, hostID int64) string {
if host, err := a.Hosts.Get(ctx, hostID); err == nil && host != nil {
return host.Name
}
return fmt.Sprintf("host-%d", hostID)
}
func (a *Agent) runLinkURL(runID int64) string {
if a.PublicURL == "" {
return ""
}
return strings.TrimRight(a.PublicURL, "/") + "/reports/" + fmt.Sprintf("%d", runID)
}
func mustHostID(a *Agent, r *http.Request, runID int64) int64 {
run, err := a.Runs.Get(r.Context(), runID)
if err != nil || run == nil {
return 0
}
return run.HostID
}
// ===== Phase 4 endpoints =================================================
// SensorBatch is what the agent POSTs to /sensor: a stream of numeric
// samples (temps, fan rpm, PSU rails, iperf throughput). Each sample is
// (kind, key, value, unit). Timestamps default to server-now when empty
// so the thermal sidecar doesn't have to carry a clock.
type SensorBatch struct {
Samples []SensorSample `json:"samples"`
}
type SensorSample struct {
TS string `json:"ts,omitempty"`
Kind string `json:"kind"` // temp|fan|psu_volt|iperf|fio|smart_attr
Key string `json:"key"`
Value float64 `json:"value"`
Unit string `json:"unit,omitempty"`
}
// Sensor persists a batch of numeric samples. The thermal sidecar hits
// this on a tick; stage executors (iperf, fio) also drop here.
func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
return
}
if a.Measurements == nil {
http.Error(w, "measurements store not wired", http.StatusInternalServerError)
return
}
var body SensorBatch
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
rows := make([]model.Measurement, 0, len(body.Samples))
for _, s := range body.Samples {
ts, _ := time.Parse(time.RFC3339Nano, s.TS)
rows = append(rows, model.Measurement{
RunID: runID,
TS: ts,
Kind: s.Kind,
Key: s.Key,
Value: s.Value,
Unit: s.Unit,
})
}
if err := a.Measurements.CreateBatch(r.Context(), rows); err != nil {
http.Error(w, "write samples: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(rows)})
}
// resolveReporting runs when the pipeline advances into StateReporting.
// It's an orchestrator-owned stage like SpecValidate: no agent action.
// Writes a JSON report bundling run + stages + diffs + measurements,
// then advances the run to Completed. Heartbeat will then return abort
// and the agent will power the host off in Phase 5.
func (a *Agent) resolveReporting(r *http.Request, runID int64) {
ctx := r.Context()
if err := a.Stages.StartByName(ctx, runID, "Reporting"); err != nil {
log.Printf("reporting: start stage: %v", err)
}
run, err := a.Runs.Get(ctx, runID)
if err != nil {
log.Printf("reporting: get run: %v", err)
return
}
host, err := a.Hosts.Get(ctx, run.HostID)
if err != nil {
log.Printf("reporting: get host: %v", err)
return
}
stages, err := a.Stages.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list stages: %v", err)
}
diffs, err := a.SpecDiffs.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list diffs: %v", err)
}
var measurements []model.Measurement
if a.Measurements != nil {
measurements, err = a.Measurements.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list measurements: %v", err)
}
}
bundle := map[string]any{
"run": run,
"host": host,
"stages": stages,
"spec_diffs": diffs,
"measurements": measurements,
"generated_at": time.Now().UTC().Format(time.RFC3339),
}
buf, err := json.MarshalIndent(bundle, "", " ")
if err != nil {
log.Printf("reporting: marshal: %v", err)
a.failStage(r, runID, "Reporting", "marshal report: "+err.Error())
return
}
dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID))
if err := os.MkdirAll(dir, 0o755); err != nil {
a.failStage(r, runID, "Reporting", "mkdir: "+err.Error())
return
}
path := filepath.Join(dir, "report.json")
if err := os.WriteFile(path, buf, 0o644); err != nil {
a.failStage(r, runID, "Reporting", "write: "+err.Error())
return
}
sum := sha256.Sum256(buf)
if _, err := a.Artifacts.Create(ctx, store.Artifact{
RunID: runID,
Kind: "report",
Path: path,
SHA256: hex.EncodeToString(sum[:]),
SizeBytes: int64(len(buf)),
}); err != nil {
log.Printf("reporting: record artifact: %v", err)
}
// Also render the operator-facing HTML summary alongside the JSON.
// Failures here are non-fatal — the JSON is the source of truth.
if host != nil {
htmlData := report.Data{
GeneratedAt: time.Now().UTC(),
Run: *run,
Host: *host,
Stages: stages,
SpecDiffs: diffs,
Aggregates: report.AggregateMeasurements(measurements),
}
if htmlBuf, err := report.RenderHTML(htmlData); err != nil {
log.Printf("reporting: render html: %v", err)
} else {
htmlPath := filepath.Join(dir, "report.html")
if err := os.WriteFile(htmlPath, htmlBuf, 0o644); err != nil {
log.Printf("reporting: write html: %v", err)
} else {
htmlSum := sha256.Sum256(htmlBuf)
if _, err := a.Artifacts.Create(ctx, store.Artifact{
RunID: runID,
Kind: "report_html",
Path: htmlPath,
SHA256: hex.EncodeToString(htmlSum[:]),
SizeBytes: int64(len(htmlBuf)),
}); err != nil {
log.Printf("reporting: record html artifact: %v", err)
}
}
}
}
summaryBuf, _ := json.Marshal(map[string]any{
"report_path": path,
"stages": len(stages),
"diffs": len(diffs),
})
if err := a.Runner.CompleteStage(ctx, runID, "Reporting", model.StagePassed, string(summaryBuf)); err != nil {
log.Printf("reporting: complete stage: %v", err)
}
if err := a.Runs.MarkCompleted(ctx, runID, path); err != nil {
log.Printf("reporting: mark completed: %v", err)
}
a.appendLog(runID, "info", "Reporting: wrote "+path+"; run completed.")
// Publish a final tile update so the dashboard flips to pass mood.
if host != nil && orchestrator.TileRenderer != nil {
latest, _ := a.Runs.Get(ctx, runID)
payload := orchestrator.TileRenderer(ctx, *host, latest)
a.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", host.ID), Payload: payload})
}
hostName := "host"
if host != nil {
hostName = host.Name
}
a.dispatchEvent(notify.Event{
Kind: notify.KindRunCompleted,
Severity: notify.SeverityInfo,
RunID: runID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s passed vetting", hostName),
Body: fmt.Sprintf("Run %d on %s completed all stages. Report: %s", runID, hostName, path),
URL: a.runLinkURL(runID),
})
}