package api import ( "context" "crypto/sha256" "crypto/subtle" "encoding/hex" "encoding/json" "errors" "fmt" "log" "net" "net/http" "os" "path/filepath" "strconv" "strings" "time" "github.com/go-chi/chi/v5" "vetting/internal/events" "vetting/internal/hold" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/notify" "vetting/internal/orchestrator" "vetting/internal/pxe" "vetting/internal/report" "vetting/internal/spec" "vetting/internal/store" ) // Agent collects the collaborators used by agent-facing HTTP routes: // the iPXE chainload endpoint and the /api/v1/runs/:id/* endpoints. type Agent struct { Hosts *store.Hosts Runs *store.Runs Stages *store.Stages Artifacts *store.Artifacts SpecDiffs *store.SpecDiffs Measurements *store.Measurements Runner *orchestrator.Runner EventHub *events.Hub Logs *logs.Hub Notify *notify.Registry ArtifactsDir string // ./var/artifacts OrchestratorURL string // baked into iPXE cmdline PublicURL string // user-visible URL base for notification click-throughs LiveKernelURL string LiveInitrdURL string TLSCertFPR string // optional; empty = skip pinning IperfPort int // orchestrator-supervised iperf3 port; 0 = 5201 } // IPXEScript serves a per-MAC iPXE script. Called by iPXE itself after // dnsmasq hands it the chainload URL. Unknown MAC → halt script. // Known MAC with no active run → poweroff script. Known MAC with active // run → real boot script; the fetch triggers PXEObserved. func (a *Agent) IPXEScript(w http.ResponseWriter, r *http.Request) { mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac"))) w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Cache-Control", "no-store") if !macRe.MatchString(mac) { log.Printf("ipxe: rejected malformed mac %q from %s", mac, r.RemoteAddr) _, _ = w.Write([]byte(pxe.NotRegisteredScript(mac))) return } run, err := a.Runs.FindActiveByMAC(r.Context(), mac) if err != nil { log.Printf("ipxe: find run by mac %s: %v", mac, err) http.Error(w, "internal error", http.StatusInternalServerError) return } if run == nil { _, _ = w.Write([]byte(pxe.NoActiveRunScript(mac))) return } // The token hash in the DB is the sha256 of the plaintext. The // plaintext itself cannot be recovered from the hash — we issued it // once when the run was created. For iPXE we re-issue a fresh token // on every PXE fetch: this is safe because the hash in the DB is // rewritten to match and only the most recent PXE can be claimed. plain, hash, err := orchestrator.IssueRunToken() if err != nil { http.Error(w, "token", http.StatusInternalServerError) return } if err := a.Runs.RotateTokenHash(r.Context(), run.ID, hash); err != nil { log.Printf("ipxe: rotate token run %d: %v", run.ID, err) http.Error(w, "token", http.StatusInternalServerError) return } script := pxe.BuildScript(pxe.IPXEParams{ OrchestratorURL: a.OrchestratorURL, LiveKernelURL: a.LiveKernelURL, LiveInitrdURL: a.LiveInitrdURL, TLSCertFPR: a.TLSCertFPR, RunID: run.ID, MAC: mac, Token: plain, }) _, _ = w.Write([]byte(script)) // iPXE has now fetched the script — treat this as PXEObserved. If we // were already in Booting the transition table allows staying. if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerPXEObserved); err != nil { // Non-fatal: the agent may still claim via /claim. log.Printf("ipxe: PXEObserved for run %d: %v", run.ID, err) } } // Hello is the first call an agent makes once userspace is up. It's // idempotent and only writes a log line; the authoritative transition // comes from /claim. The agent sends Hello early so operators see a // signal in the tile even before the token is validated. func (a *Agent) Hello(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } log.Printf("agent hello: run=%d remote=%s", runID, r.RemoteAddr) writeJSON(w, http.StatusOK, map[string]any{"ok": true, "run_id": runID}) } // Claim is the binding call: the agent proves it holds the plaintext // token for this run, and in return the orchestrator transitions to // InventoryCheck and seeds the stage rows. All destructive actions the // agent takes later require a prior successful claim. func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } var body struct { AgentIP string `json:"agent_ip"` } if r.Body != nil { // agent_ip is informational; if missing fall back to RemoteAddr. _ = json.NewDecoder(r.Body).Decode(&body) } agentIP := strings.TrimSpace(body.AgentIP) if agentIP == "" { if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { agentIP = host } else { agentIP = r.RemoteAddr } } // First claim seeds the stage rows; subsequent claims are a no-op // so agent retries after transient network failures stay safe. if len(mustListStages(a.Stages, r, runID)) == 0 { if err := a.Stages.Seed(r.Context(), runID); err != nil { log.Printf("claim: seed stages run %d: %v", runID, err) http.Error(w, "seed stages", http.StatusInternalServerError) return } } // Drive the transition. If we're already past Booting this returns // an error — treat as "already claimed" and report OK, don't 500. if run.State == model.StateWaitingWoL || run.State == model.StateBooting { if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerAgentClaimed); err != nil { log.Printf("claim: transition run %d: %v", runID, err) http.Error(w, "transition", http.StatusConflict) return } } log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP) // Stage-driven agent needs a bit of per-run config: the device // allowlist (serial + expected size) for Storage, and the iperf3 // server port for Network. Parse the host's expected spec here so // the agent doesn't need to read YAML. expectedDisks := []map[string]any{} if host, err := a.Hosts.Get(r.Context(), run.HostID); err == nil && host != nil { if parsed, err := spec.Parse(host.ExpectedSpecYAML); err == nil && parsed != nil { for _, dd := range parsed.Disks { expectedDisks = append(expectedDisks, map[string]any{ "serial": dd.Serial, "size_gb": dd.SizeGB, }) } } } iperfPort := a.IperfPort if iperfPort == 0 { iperfPort = 5201 } writeJSON(w, http.StatusOK, map[string]any{ "ok": true, "run_id": runID, "stages": store.DefaultStageOrder, "expected_disks": expectedDisks, "iperf_port": iperfPort, }) } // Heartbeat is the agent's periodic liveness ping. The response body // acts as a control channel: cmd=continue is the normal case; cmd=abort // once the run enters FailedHolding/Released; cmd=retry_stage when the // operator has overridden a failed stage (wipe-probe override). func (a *Agent) Heartbeat(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } a.Runner.TouchHeartbeat(runID) cmd := "continue" resp := map[string]any{"state": run.State} switch { case run.State == model.StateCompleted: // Pipeline succeeded — agent should power the host down. cmd = "shutdown" case run.State == model.StateFailedHolding || run.State == model.StateReleased: cmd = "abort" case run.FailedStage == "Storage" && overrideWipeSet(run.OverrideFlagsJSON): // Operator pressed "Override wipe & retry". Agent should // re-enter Storage with the wipe-probe bypass armed. cmd = "retry_stage" resp["stage"] = "Storage" resp["override_flags"] = json.RawMessage(run.OverrideFlagsJSON) } resp["cmd"] = cmd writeJSON(w, http.StatusOK, resp) } // overrideWipeSet inspects a Run.OverrideFlagsJSON blob for the wipe flag. // Malformed JSON is ignored — the operator has to reapply the override if // it didn't round-trip correctly. func overrideWipeSet(blob string) bool { if blob == "" { return false } var flags struct { Wipe bool `json:"wipe"` } _ = json.Unmarshal([]byte(blob), &flags) return flags.Wipe } // authenticate verifies the Bearer token against the run's stored hash // and returns the Run for downstream handlers. Responds 401/404 on // failure and returns ok=false so the caller can bail early. func (a *Agent) authenticate(w http.ResponseWriter, r *http.Request, runID int64) (*model.Run, bool) { run, err := a.Runs.Get(r.Context(), runID) if err != nil { if errors.Is(err, store.ErrNotFound) { http.Error(w, "run not found", http.StatusNotFound) return nil, false } http.Error(w, "internal error", http.StatusInternalServerError) return nil, false } token := bearerToken(r) if token == "" { http.Error(w, "missing bearer", http.StatusUnauthorized) return nil, false } presented := orchestrator.HashRunToken(token) if subtle.ConstantTimeCompare([]byte(presented), []byte(run.AgentTokenHash)) != 1 { http.Error(w, "bad token", http.StatusUnauthorized) return nil, false } return run, true } func bearerToken(r *http.Request) string { h := r.Header.Get("Authorization") if !strings.HasPrefix(h, "Bearer ") { return "" } return strings.TrimSpace(strings.TrimPrefix(h, "Bearer ")) } func runIDFromURL(w http.ResponseWriter, r *http.Request) (int64, bool) { idStr := chi.URLParam(r, "id") id, err := strconv.ParseInt(idStr, 10, 64) if err != nil || id <= 0 { http.Error(w, "bad run id", http.StatusBadRequest) return 0, false } return id, true } func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) } // mustListStages is a small wrapper that hides the error path from // /claim — a DB read failure just pretends there are zero stages, and // the subsequent Seed will surface the real error. func mustListStages(s *store.Stages, r *http.Request, runID int64) []model.Stage { rows, err := s.ListForRun(r.Context(), runID) if err != nil { return nil } return rows } // ===== Phase 3 endpoints ================================================= // LogBatch is what the agent POSTs to /log: zero or more lines with // timestamp + level + text. Lines are written in order to the per-run // file and fanned out on the SSE hub. type LogBatch struct { Lines []LogLine `json:"lines"` } type LogLine struct { TS string `json:"ts,omitempty"` // RFC3339Nano; server clock used if empty Level string `json:"level,omitempty"` // info|warn|error|debug Text string `json:"text"` } // Log accepts a batch of log lines from the agent. Empty batches are // legal (useful for agent-side flush ping). func (a *Agent) Log(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } var batch LogBatch if err := json.NewDecoder(r.Body).Decode(&batch); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } writer, err := a.Logs.WriterFor(runID) if err != nil { http.Error(w, "open log: "+err.Error(), http.StatusInternalServerError) return } for _, l := range batch.Lines { ts, _ := time.Parse(time.RFC3339Nano, l.TS) writer.Append(logs.Line{TS: ts, Level: l.Level, Text: l.Text}) } writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(batch.Lines)}) } // StageResult is the body of /result. Kind is the stage name (from // DefaultStageOrder); Passed drives StageCompleted vs StageFailed. // Inventory is optional and only set when kind == "Inventory" — the // orchestrator persists it as an artifact and feeds it to spec.Diff. type StageResult struct { Stage string `json:"stage"` Passed bool `json:"passed"` Summary json.RawMessage `json:"summary,omitempty"` Inventory *spec.Inventory `json:"inventory,omitempty"` Message string `json:"message,omitempty"` } // Result receives a stage's outcome. Flow: // 1. Mark the stage row passed/failed + record summary JSON. // 2. For Inventory: persist the inventory artifact. // 3. For Inventory (on pass): run spec diff server-side, persist rows, // bump the run into SpecValidate and immediately resolve SpecValidate // from that diff — the agent isn't involved in SpecValidate at all. // 4. Transition the run via StageCompleted/StageFailed. func (a *Agent) Result(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } run, ok := a.authenticate(w, r, runID) if !ok { return } var body StageResult if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } body.Stage = strings.TrimSpace(body.Stage) if _, ok := orchestrator.StateForStage(body.Stage); !ok { http.Error(w, "unknown stage: "+body.Stage, http.StatusBadRequest) return } stageState := model.StagePassed if !body.Passed { stageState = model.StageFailed } summaryJSON := "" if len(body.Summary) > 0 { summaryJSON = string(body.Summary) } if err := a.Runner.CompleteStage(r.Context(), runID, body.Stage, stageState, summaryJSON); err != nil { http.Error(w, "complete stage: "+err.Error(), http.StatusInternalServerError) return } // Inventory-specific: persist artifact + compute spec diff. if body.Stage == "Inventory" && body.Inventory != nil { if err := a.persistInventory(r, run, body.Inventory); err != nil { log.Printf("persist inventory run %d: %v", runID, err) } } if !body.Passed { if err := a.Runs.SetFailedStage(r.Context(), runID, body.Stage); err != nil { log.Printf("set failed stage: %v", err) } if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("result: failed-transition run %d: %v", runID, err) http.Error(w, "transition", http.StatusConflict) return } hostName := a.hostNameFor(r.Context(), run.HostID) detail := body.Message if detail == "" { detail = "stage reported failure" } a.dispatchEvent(notify.Event{ Kind: notify.KindStageFailed, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s FAILED: %s", hostName, body.Stage), Body: fmt.Sprintf("Run %d on %s failed at stage %s.\n%s", runID, hostName, body.Stage, detail), URL: a.runLinkURL(runID), }) writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": "FailedHolding"}) return } // Passed: advance to the next stage in the pipeline. next, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted) if err != nil { http.Error(w, "advance: "+err.Error(), http.StatusConflict) return } log.Printf("result: run %d stage %s passed → %s", runID, body.Stage, next) // If the just-advanced-into state is SpecValidate or Reporting, the // orchestrator owns those stages entirely. The resolve function may // transition further (→ next stage on pass, → FailedHolding on fail, // → Completed for Reporting), so we re-read the run after each. if next == model.StateSpecValidate { a.resolveSpecValidate(r, runID) if after, err := a.Runs.Get(r.Context(), runID); err == nil { next = after.State } } if next == model.StateReporting { a.resolveReporting(r, runID) if after, err := a.Runs.Get(r.Context(), runID); err == nil { next = after.State } } writeJSON(w, http.StatusOK, map[string]any{"ok": true, "next_state": string(next)}) } func (a *Agent) persistInventory(r *http.Request, run *model.Run, inv *spec.Inventory) error { dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", run.ID)) if err := os.MkdirAll(dir, 0o755); err != nil { return err } path := filepath.Join(dir, "inventory.json") buf, err := json.MarshalIndent(inv, "", " ") if err != nil { return err } if err := os.WriteFile(path, buf, 0o644); err != nil { return err } sum := sha256.Sum256(buf) _, err = a.Artifacts.Create(r.Context(), store.Artifact{ RunID: run.ID, Kind: "inventory", Path: path, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(buf)), }) return err } // resolveSpecValidate runs the expected-vs-actual diff against the // just-stored inventory artifact, persists spec_diffs rows, and drives // the state machine — all on the server. The agent does nothing for // this stage. func (a *Agent) resolveSpecValidate(r *http.Request, runID int64) { run, err := a.Runs.Get(r.Context(), runID) if err != nil { log.Printf("specvalidate: get run: %v", err) return } host, err := a.Hosts.Get(r.Context(), run.HostID) if err != nil { log.Printf("specvalidate: get host: %v", err) return } expected, err := spec.Parse(host.ExpectedSpecYAML) if err != nil { log.Printf("specvalidate: parse expected yaml: %v", err) a.failStage(r, runID, "SpecValidate", "malformed expected spec: "+err.Error()) return } inv, err := a.readInventoryArtifact(r, runID) if err != nil { log.Printf("specvalidate: read inventory: %v", err) a.failStage(r, runID, "SpecValidate", "missing inventory artifact") return } diffs := spec.Diff(expected, inv) if err := a.SpecDiffs.ReplaceForRun(r.Context(), runID, diffs); err != nil { log.Printf("specvalidate: write diffs: %v", err) } if err := a.Stages.StartByName(r.Context(), runID, "SpecValidate"); err != nil { log.Printf("specvalidate: start stage: %v", err) } critical := 0 for _, d := range diffs { if d.Severity == "critical" && !d.Ignored { critical++ } } summaryBuf, _ := json.Marshal(map[string]any{ "diffs": len(diffs), "critical": critical, }) if critical > 0 { _ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StageFailed, string(summaryBuf)) _ = a.Runs.SetFailedStage(r.Context(), runID, "SpecValidate") if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("specvalidate: failed-transition: %v", err) } a.appendLog(runID, "error", fmt.Sprintf("SpecValidate: %d critical diff(s) — holding host", critical)) hostName := a.hostNameFor(r.Context(), run.HostID) a.dispatchEvent(notify.Event{ Kind: notify.KindSpecMismatch, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s spec mismatch (%d critical)", hostName, critical), Body: fmt.Sprintf("SpecValidate found %d critical diff(s) on %s. Host is held for inspection.", critical, hostName), URL: a.runLinkURL(runID), }) } else { _ = a.Runner.CompleteStage(r.Context(), runID, "SpecValidate", model.StagePassed, string(summaryBuf)) if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageCompleted); err != nil { log.Printf("specvalidate: advance: %v", err) } a.appendLog(runID, "info", "SpecValidate: all fields match expected spec") } } func (a *Agent) readInventoryArtifact(r *http.Request, runID int64) (*spec.Inventory, error) { arts, err := a.Artifacts.ListForRun(r.Context(), runID) if err != nil { return nil, err } for i := len(arts) - 1; i >= 0; i-- { if arts[i].Kind == "inventory" { buf, err := os.ReadFile(arts[i].Path) if err != nil { return nil, err } var inv spec.Inventory if err := json.Unmarshal(buf, &inv); err != nil { return nil, err } return &inv, nil } } return nil, errors.New("no inventory artifact") } func (a *Agent) failStage(r *http.Request, runID int64, stage, message string) { _ = a.Runner.CompleteStage(r.Context(), runID, stage, model.StageFailed, fmt.Sprintf(`{"error":%q}`, message)) _ = a.Runs.SetFailedStage(r.Context(), runID, stage) if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageFailed); err != nil { log.Printf("failStage: transition run %d: %v", runID, err) } a.appendLog(runID, "error", stage+": "+message) } func (a *Agent) appendLog(runID int64, level, text string) { if a.Logs == nil { return } w, err := a.Logs.WriterFor(runID) if err != nil { log.Printf("appendLog: %v", err) return } w.Append(logs.Line{Level: level, Text: text}) } // Hold issues the per-run ephemeral ed25519 keypair: the agent gets // the authorized_keys line, the orchestrator keeps the privkey on disk. // Hold also records the agent's reported IP so the tile can print the // ssh invocation. type HoldRequest struct { AgentIP string `json:"agent_ip"` } type HoldResponse struct { AuthorizedKey string `json:"authorized_key"` RunID int64 `json:"run_id"` } func (a *Agent) Hold(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } var body HoldRequest _ = json.NewDecoder(r.Body).Decode(&body) agentIP := strings.TrimSpace(body.AgentIP) if agentIP == "" { if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { agentIP = host } } if agentIP != "" { if err := a.Runs.SetHoldIP(r.Context(), runID, agentIP); err != nil { log.Printf("hold: set hold_ip: %v", err) } } kp, err := hold.Issue(runID) if err != nil { http.Error(w, "generate key: "+err.Error(), http.StatusInternalServerError) return } keyPath := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID), "hold.key") abs, err := kp.WritePrivateTo(keyPath) if err != nil { http.Error(w, "write key: "+err.Error(), http.StatusInternalServerError) return } sum := sha256.Sum256(kp.PrivatePEM) if _, err := a.Artifacts.Create(r.Context(), store.Artifact{ RunID: runID, Kind: "hold_key", Path: abs, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(kp.PrivatePEM)), }); err != nil { log.Printf("hold: record artifact: %v", err) } a.appendLog(runID, "info", fmt.Sprintf("Hold key issued. SSH in with: ssh -i %s root@%s", abs, agentIP)) hostID := mustHostID(a, r, runID) if hostID != 0 { hostName := a.hostNameFor(r.Context(), hostID) a.dispatchEvent(notify.Event{ Kind: notify.KindHoldingOpened, Severity: notify.SeverityCritical, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s holding — SSH ready", hostName), Body: fmt.Sprintf("Host %s is holding at %s.\nssh -i %s root@%s", hostName, agentIP, abs, agentIP), URL: a.runLinkURL(runID), }) } // Refresh the tile so the operator sees the ssh command. host, _ := a.Hosts.Get(r.Context(), mustHostID(a, r, runID)) if host != nil { latest, _ := a.Runs.Get(r.Context(), runID) if orchestrator.TileRenderer != nil { payload := orchestrator.TileRenderer(r.Context(), *host, latest) a.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", host.ID), Payload: payload}) } } writeJSON(w, http.StatusOK, HoldResponse{AuthorizedKey: kp.AuthorizedKey, RunID: runID}) } // dispatchEvent hands an already-populated Event to the notify Registry // if one is wired. Handler code uses hostNameFor to resolve the host // name for the event payload; this keeps call sites terse. func (a *Agent) dispatchEvent(ev notify.Event) { if a.Notify == nil { return } a.Notify.Dispatch(ev) } // hostNameFor returns a human-readable host name for a run, or "host-N" // if the lookup fails — notifications should never fail silently over a // missing name. func (a *Agent) hostNameFor(ctx context.Context, hostID int64) string { if host, err := a.Hosts.Get(ctx, hostID); err == nil && host != nil { return host.Name } return fmt.Sprintf("host-%d", hostID) } func (a *Agent) runLinkURL(runID int64) string { if a.PublicURL == "" { return "" } return strings.TrimRight(a.PublicURL, "/") + "/reports/" + fmt.Sprintf("%d", runID) } func mustHostID(a *Agent, r *http.Request, runID int64) int64 { run, err := a.Runs.Get(r.Context(), runID) if err != nil || run == nil { return 0 } return run.HostID } // ===== Phase 4 endpoints ================================================= // SensorBatch is what the agent POSTs to /sensor: a stream of numeric // samples (temps, fan rpm, PSU rails, iperf throughput). Each sample is // (kind, key, value, unit). Timestamps default to server-now when empty // so the thermal sidecar doesn't have to carry a clock. type SensorBatch struct { Samples []SensorSample `json:"samples"` } type SensorSample struct { TS string `json:"ts,omitempty"` Kind string `json:"kind"` // temp|fan|psu_volt|iperf|fio|smart_attr Key string `json:"key"` Value float64 `json:"value"` Unit string `json:"unit,omitempty"` } // Sensor persists a batch of numeric samples. The thermal sidecar hits // this on a tick; stage executors (iperf, fio) also drop here. func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) { runID, ok := runIDFromURL(w, r) if !ok { return } if _, ok := a.authenticate(w, r, runID); !ok { return } if a.Measurements == nil { http.Error(w, "measurements store not wired", http.StatusInternalServerError) return } var body SensorBatch if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } rows := make([]model.Measurement, 0, len(body.Samples)) for _, s := range body.Samples { ts, _ := time.Parse(time.RFC3339Nano, s.TS) rows = append(rows, model.Measurement{ RunID: runID, TS: ts, Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit, }) } if err := a.Measurements.CreateBatch(r.Context(), rows); err != nil { http.Error(w, "write samples: "+err.Error(), http.StatusInternalServerError) return } writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(rows)}) } // resolveReporting runs when the pipeline advances into StateReporting. // It's an orchestrator-owned stage like SpecValidate: no agent action. // Writes a JSON report bundling run + stages + diffs + measurements, // then advances the run to Completed. Heartbeat will then return abort // and the agent will power the host off in Phase 5. func (a *Agent) resolveReporting(r *http.Request, runID int64) { ctx := r.Context() if err := a.Stages.StartByName(ctx, runID, "Reporting"); err != nil { log.Printf("reporting: start stage: %v", err) } run, err := a.Runs.Get(ctx, runID) if err != nil { log.Printf("reporting: get run: %v", err) return } host, err := a.Hosts.Get(ctx, run.HostID) if err != nil { log.Printf("reporting: get host: %v", err) return } stages, err := a.Stages.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list stages: %v", err) } diffs, err := a.SpecDiffs.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list diffs: %v", err) } var measurements []model.Measurement if a.Measurements != nil { measurements, err = a.Measurements.ListForRun(ctx, runID) if err != nil { log.Printf("reporting: list measurements: %v", err) } } bundle := map[string]any{ "run": run, "host": host, "stages": stages, "spec_diffs": diffs, "measurements": measurements, "generated_at": time.Now().UTC().Format(time.RFC3339), } buf, err := json.MarshalIndent(bundle, "", " ") if err != nil { log.Printf("reporting: marshal: %v", err) a.failStage(r, runID, "Reporting", "marshal report: "+err.Error()) return } dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", runID)) if err := os.MkdirAll(dir, 0o755); err != nil { a.failStage(r, runID, "Reporting", "mkdir: "+err.Error()) return } path := filepath.Join(dir, "report.json") if err := os.WriteFile(path, buf, 0o644); err != nil { a.failStage(r, runID, "Reporting", "write: "+err.Error()) return } sum := sha256.Sum256(buf) if _, err := a.Artifacts.Create(ctx, store.Artifact{ RunID: runID, Kind: "report", Path: path, SHA256: hex.EncodeToString(sum[:]), SizeBytes: int64(len(buf)), }); err != nil { log.Printf("reporting: record artifact: %v", err) } // Also render the operator-facing HTML summary alongside the JSON. // Failures here are non-fatal — the JSON is the source of truth. if host != nil { htmlData := report.Data{ GeneratedAt: time.Now().UTC(), Run: *run, Host: *host, Stages: stages, SpecDiffs: diffs, Aggregates: report.AggregateMeasurements(measurements), } if htmlBuf, err := report.RenderHTML(htmlData); err != nil { log.Printf("reporting: render html: %v", err) } else { htmlPath := filepath.Join(dir, "report.html") if err := os.WriteFile(htmlPath, htmlBuf, 0o644); err != nil { log.Printf("reporting: write html: %v", err) } else { htmlSum := sha256.Sum256(htmlBuf) if _, err := a.Artifacts.Create(ctx, store.Artifact{ RunID: runID, Kind: "report_html", Path: htmlPath, SHA256: hex.EncodeToString(htmlSum[:]), SizeBytes: int64(len(htmlBuf)), }); err != nil { log.Printf("reporting: record html artifact: %v", err) } } } } summaryBuf, _ := json.Marshal(map[string]any{ "report_path": path, "stages": len(stages), "diffs": len(diffs), }) if err := a.Runner.CompleteStage(ctx, runID, "Reporting", model.StagePassed, string(summaryBuf)); err != nil { log.Printf("reporting: complete stage: %v", err) } if err := a.Runs.MarkCompleted(ctx, runID, path); err != nil { log.Printf("reporting: mark completed: %v", err) } a.appendLog(runID, "info", "Reporting: wrote "+path+"; run completed.") // Publish a final tile update so the dashboard flips to pass mood. if host != nil && orchestrator.TileRenderer != nil { latest, _ := a.Runs.Get(ctx, runID) payload := orchestrator.TileRenderer(ctx, *host, latest) a.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", host.ID), Payload: payload}) } hostName := "host" if host != nil { hostName = host.Name } a.dispatchEvent(notify.Event{ Kind: notify.KindRunCompleted, Severity: notify.SeverityInfo, RunID: runID, HostName: hostName, Title: fmt.Sprintf("[vetting] %s passed vetting", hostName), Body: fmt.Sprintf("Run %d on %s completed all stages. Report: %s", runID, hostName, path), URL: a.runLinkURL(runID), }) }