deep profile + threshold gating + firmware stage + Burn super-stage
CI / Lint + build + test (push) Failing after 1m57s
Release / release (push) Has been cancelled

Ships all five phases of the deep-profile overhaul together. Runs now
carry a profile (quick/deep/soak); every profile walks the same
11-stage order — Inventory → Firmware → SpecValidate → SMART →
CPUStress → Storage → Network → Burn → GPU → PSU → Reporting —
with only per-stage durations and concurrency scaled.

Phase 1: profiles.ProfileRegistry loaded from vetting.yaml; runs.profile
column + CreateWithProfile; threshold table + evaluator seeded per-run
from the shared vetting.thresholds block; breach flips result at
/sensor + /result.

Phase 2: upgraded CPUStress (stress-ng --cpu-method=all --verify +
EDAC/MCE poll), Storage (fio --verify=md5 + SMART start/end delta),
Network (sustained iperf + /proc/net/dev deltas) with per-profile
knobs from Deps.

Phase 3: Burn super-stage with goroutine fan-out for CPU + memory +
fio + iperf, PSU rails sampled across the Burn window, SensorMux
(2 s flush, 500-sample cap) to absorb backpressure.

Phase 4: Firmware stage + firmware_snapshots table; probes dmidecode
(BIOS), ipmitool (BMC), ethtool -i (NIC), nvme (sysfs + id-ctrl),
lspci (HBA), /proc/cpuinfo (microcode). spec.DiffFirmware folds into
SpecValidate with pin-by-identifier and fan-out-across-component
matching; mismatches park the run in FailedHolding.

Phase 5: profile radio on the host start form, profile chip on the
run header, Firmware section in the HTML report, coverage artifact
uploaded from CI, agent/tests/fakes/ scaffold with Deps.LookPath
seam + stress_ng and dmidecode example fakes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 22:50:57 -04:00
parent fbb21cbafd
commit 23c689aa5b
60 changed files with 5911 additions and 527 deletions
+259 -3
View File
@@ -19,6 +19,7 @@ import (
"github.com/go-chi/chi/v5"
"vetting/internal/config"
"vetting/internal/events"
"vetting/internal/hold"
"vetting/internal/logs"
@@ -41,6 +42,9 @@ type Agent struct {
Artifacts *store.Artifacts
SpecDiffs *store.SpecDiffs
Measurements *store.Measurements
Thresholds *store.Thresholds // Phase 1: seeded per run; consulted on each /sensor batch
Firmware *store.Firmware // Phase 4: firmware snapshots (unused before then)
Profiles *config.ProfileRegistry // Phase 2: /claim resolves the run's profile → stage knobs
Runner *orchestrator.Runner
EventHub *events.Hub
Logs *logs.Hub
@@ -216,6 +220,21 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
if iperfPort == 0 {
iperfPort = 5201
}
// Resolve the run's profile → agent-visible stage knobs. The agent
// reads these to size CPUStress / Storage / Network work. An empty
// profile (legacy runs seeded before Phase 1) falls back to "quick".
profileName := run.Profile
if profileName == "" {
profileName = config.ProfileQuick
}
var stageCfg config.StageConfig
if a.Profiles != nil {
stageCfg = a.Profiles.ResolveStageConfig(profileName)
} else {
stageCfg = config.StageConfig{Profile: profileName}
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"run_id": runID,
@@ -224,6 +243,7 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
"iperf_port": iperfPort,
"non_destructive": run.NonDestructive,
"current_state": string(currentState),
"stage_config": stageCfg,
})
}
@@ -398,10 +418,24 @@ type StageResult struct {
Passed bool `json:"passed"`
Summary json.RawMessage `json:"summary,omitempty"`
Inventory *spec.Inventory `json:"inventory,omitempty"`
Firmware []FirmwareLine `json:"firmware,omitempty"`
Message string `json:"message,omitempty"`
SubSteps []SubStepResultLine `json:"sub_steps,omitempty"`
}
// FirmwareLine is a single firmware snapshot POSTed alongside the
// Firmware stage's /result body. Mirrors agent/probes.FirmwareSnapshot.
// The server converts each line to a store.FirmwareSnapshot and persists
// it under the run — SpecValidate reads these back to diff against the
// host's expected_firmware.
type FirmwareLine struct {
Component string `json:"component"`
Identifier string `json:"identifier"`
Version string `json:"version"`
Vendor string `json:"vendor,omitempty"`
Raw map[string]string `json:"raw,omitempty"`
}
// SubStepResultLine is one entry in StageResult.SubSteps. Ordinal is
// assigned from slice index server-side; the agent doesn't set it.
type SubStepResultLine struct {
@@ -476,6 +510,20 @@ func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
return
}
// Aggregate threshold gate: flip Passed=false server-side when any
// critical breach landed for this stage. The agent's verdict is
// advisory — a stage-executor can miss a runaway sample that the
// sidecar caught. We check this *before* writing the stage state
// so the DB reflects the server-side decision.
thresholdDetail := ""
if body.Passed {
if breached, detail := a.stageHadCriticalBreach(r.Context(), runID, body.Stage); breached {
body.Passed = false
thresholdDetail = detail
a.appendLog(runID, "error", fmt.Sprintf("%s reported passed but %s — flipping to failed", body.Stage, detail))
}
}
stageState := model.StagePassed
if !body.Passed {
stageState = model.StageFailed
@@ -488,6 +536,9 @@ func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
http.Error(w, "complete stage: "+err.Error(), http.StatusInternalServerError)
return
}
if thresholdDetail != "" && body.Message == "" {
body.Message = thresholdDetail
}
// Agent-authored sub-steps: persist in slice order (ordinal = index)
// and fan out a per-row SSE event each so the detail pane shows them
@@ -502,6 +553,14 @@ func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
}
}
// Firmware-specific: persist each snapshot into firmware_snapshots.
// SpecValidate reads them back to diff against expected_firmware.
if body.Stage == "Firmware" && len(body.Firmware) > 0 {
if err := a.persistFirmware(r.Context(), runID, body.Firmware); err != nil {
log.Printf("persist firmware run %d: %v", runID, err)
}
}
if !body.Passed {
if err := a.Runs.SetFailedStage(r.Context(), runID, body.Stage); err != nil {
log.Printf("set failed stage: %v", err)
@@ -615,6 +674,34 @@ func parseResultTime(s string) *time.Time {
return nil
}
// persistFirmware writes the reported snapshots. A nil/unset a.Firmware
// store is a no-op so tests that don't wire it up stay green; a mid-run
// persist error is logged but doesn't fail the stage (Firmware is
// advisory — SpecValidate is the gate).
func (a *Agent) persistFirmware(ctx context.Context, runID int64, lines []FirmwareLine) error {
if a.Firmware == nil || len(lines) == 0 {
return nil
}
rows := make([]store.FirmwareSnapshot, 0, len(lines))
for _, l := range lines {
raw := "{}"
if len(l.Raw) > 0 {
if b, err := json.Marshal(l.Raw); err == nil {
raw = string(b)
}
}
rows = append(rows, store.FirmwareSnapshot{
RunID: runID,
Component: l.Component,
Identifier: l.Identifier,
Version: l.Version,
Vendor: l.Vendor,
RawJSON: raw,
})
}
return a.Firmware.CreateBatch(ctx, rows)
}
func (a *Agent) persistInventory(r *http.Request, run *model.Run, inv *spec.Inventory) error {
dir := filepath.Join(a.ArtifactsDir, fmt.Sprintf("run-%d", run.ID))
if err := os.MkdirAll(dir, 0o755); err != nil {
@@ -667,6 +754,22 @@ func (a *Agent) resolveSpecValidate(r *http.Request, runID int64) {
return
}
diffs := spec.Diff(expected, inv)
if a.Firmware != nil && len(expected.Firmware) > 0 {
snaps, err := a.Firmware.ListForRun(r.Context(), runID)
if err != nil {
log.Printf("specvalidate: list firmware: %v", err)
} else {
observed := make([]spec.FirmwareObserved, 0, len(snaps))
for _, s := range snaps {
observed = append(observed, spec.FirmwareObserved{
Component: s.Component,
Identifier: s.Identifier,
Version: s.Version,
})
}
diffs = append(diffs, spec.DiffFirmware(expected.Firmware, observed)...)
}
}
if err := a.SpecDiffs.ReplaceForRun(r.Context(), runID, diffs); err != nil {
log.Printf("specvalidate: write diffs: %v", err)
}
@@ -884,13 +987,17 @@ type SensorSample struct {
}
// Sensor persists a batch of numeric samples. The thermal sidecar hits
// this on a tick; stage executors (iperf, fio) also drop here.
// this on a tick; stage executors (iperf, fio) also drop here. Each
// sample is evaluated against the run's seeded thresholds — critical
// breaches fail the run immediately (thermal runaway, EDAC UE, voltage
// sag); warning breaches are recorded for the report only.
func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) {
runID, ok := runIDFromURL(w, r)
if !ok {
return
}
if _, ok := a.authenticate(w, r, runID); !ok {
run, ok := a.authenticate(w, r, runID)
if !ok {
return
}
if a.Measurements == nil {
@@ -903,8 +1010,12 @@ func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) {
return
}
rows := make([]model.Measurement, 0, len(body.Samples))
sampleStages := make([]string, 0, len(body.Samples))
for _, s := range body.Samples {
ts, _ := time.Parse(time.RFC3339Nano, s.TS)
if ts.IsZero() {
ts = time.Now().UTC()
}
rows = append(rows, model.Measurement{
RunID: runID,
TS: ts,
@@ -913,12 +1024,139 @@ func (a *Agent) Sensor(w http.ResponseWriter, r *http.Request) {
Value: s.Value,
Unit: s.Unit,
})
// Stage the sample belongs to drives threshold selector
// matching. We use the run's current state — the agent does
// not tag samples with a stage.
sampleStages = append(sampleStages, orchestrator.StageNameForState(run.State))
}
if err := a.Measurements.CreateBatch(r.Context(), rows); err != nil {
http.Error(w, "write samples: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "written": len(rows)})
critical := a.evaluateSensorBatch(r.Context(), runID, rows, sampleStages)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"written": len(rows),
"breach": critical != "",
"breach_kind": critical,
})
if critical != "" {
a.failRunOnCriticalBreach(r, run, critical)
}
}
// evaluateSensorBatch runs each sample through the run's thresholds,
// persists evaluations, and returns a short human-readable label for
// the first critical breach it sees (empty when all samples pass or
// only hit warning-severity rules).
func (a *Agent) evaluateSensorBatch(ctx context.Context, runID int64, rows []model.Measurement, sampleStages []string) string {
if a.Thresholds == nil || len(rows) == 0 {
return ""
}
rules, err := a.Thresholds.ListForRun(ctx, runID)
if err != nil {
log.Printf("sensor: list thresholds run %d: %v", runID, err)
return ""
}
if len(rules) == 0 {
return ""
}
evalRules := make([]orchestrator.Threshold, 0, len(rules))
for _, r := range rules {
evalRules = append(evalRules, orchestrator.Threshold{
ID: r.ID,
Stage: r.Stage,
Kind: r.Kind,
Key: r.Key,
Op: orchestrator.ThresholdOp(r.Op),
Value: r.Threshold,
Nominal: r.Nominal,
Severity: orchestrator.ThresholdSeverity(r.Severity),
})
}
evals := make([]store.ThresholdEvaluation, 0, len(rows))
critical := ""
for i, m := range rows {
sample := orchestrator.Sample{
Stage: sampleStages[i],
Kind: m.Kind,
Key: m.Key,
Value: m.Value,
}
for _, res := range orchestrator.Evaluate(sample, evalRules) {
evals = append(evals, store.ThresholdEvaluation{
RunID: runID,
ThresholdID: res.Threshold.ID,
Stage: sample.Stage,
Kind: sample.Kind,
Key: sample.Key,
TS: m.TS,
Observed: res.Observed,
Passed: res.Passed,
})
if critical == "" && res.CriticalBreach() {
critical = fmt.Sprintf("%s %s=%g breached %s %g",
res.Threshold.Kind, sample.Key, res.Observed, res.Threshold.Op, res.Threshold.Value)
}
}
}
if err := a.Thresholds.RecordBatch(ctx, evals); err != nil {
log.Printf("sensor: record evals run %d: %v", runID, err)
}
return critical
}
// stageHadCriticalBreach returns true if any critical-severity
// threshold evaluation for this run matched samples attributed to the
// given stage (stage selector "*" or exact). Called at /result close
// so even an agent that reports Passed=true gets overridden when the
// aggregate view says the stage tripped a gate.
func (a *Agent) stageHadCriticalBreach(ctx context.Context, runID int64, stage string) (bool, string) {
if a.Thresholds == nil {
return false, ""
}
breaches, err := a.Thresholds.CriticalBreaches(ctx, runID)
if err != nil {
log.Printf("result: list breaches run %d: %v", runID, err)
return false, ""
}
for _, b := range breaches {
if b.Stage == stage || b.Stage == "" || b.Stage == "*" {
return true, fmt.Sprintf("critical threshold breach: %s %s=%g", b.Kind, b.Key, b.Observed)
}
}
return false, ""
}
// failRunOnCriticalBreach flips the run to FailedHolding in response
// to a live threshold breach (thermal runaway, EDAC UE, rail sag).
// The agent's pending /result for the current stage may still arrive —
// the silent-skip guard handles that by refusing to double-transition.
func (a *Agent) failRunOnCriticalBreach(r *http.Request, run *model.Run, detail string) {
stage := orchestrator.StageNameForState(run.State)
if stage == "" {
stage = "threshold"
}
if err := a.Runs.SetFailedStage(r.Context(), run.ID, stage+" (threshold)"); err != nil {
log.Printf("sensor: set failed stage run %d: %v", run.ID, err)
}
if _, err := a.Runner.Transition(r.Context(), run.ID, orchestrator.TriggerStageFailed); err != nil {
// If we're already in FailedHolding the transition errors —
// that's fine, the first breach wins.
log.Printf("sensor: fail-transition run %d: %v", run.ID, err)
return
}
hostName := a.hostNameFor(r.Context(), run.HostID)
a.dispatchEvent(notify.Event{
Kind: notify.KindStageFailed,
Severity: notify.SeverityCritical,
RunID: run.ID,
HostName: hostName,
Title: fmt.Sprintf("[vetting] %s FAILED: %s (threshold)", hostName, stage),
Body: fmt.Sprintf("Run %d on %s tripped a critical threshold during %s: %s", run.ID, hostName, stage, detail),
URL: a.runLinkURL(run.ID),
})
a.appendLog(run.ID, "error", fmt.Sprintf("threshold breach during %s: %s — run parked in FailedHolding", stage, detail))
}
// resolveReporting runs when the pipeline advances into StateReporting.
@@ -956,12 +1194,20 @@ func (a *Agent) resolveReporting(r *http.Request, runID int64) {
log.Printf("reporting: list measurements: %v", err)
}
}
var firmware []store.FirmwareSnapshot
if a.Firmware != nil {
firmware, err = a.Firmware.ListForRun(ctx, runID)
if err != nil {
log.Printf("reporting: list firmware: %v", err)
}
}
bundle := map[string]any{
"run": run,
"host": host,
"stages": stages,
"spec_diffs": diffs,
"measurements": measurements,
"firmware": firmware,
"generated_at": time.Now().UTC().Format(time.RFC3339),
}
buf, err := json.MarshalIndent(bundle, "", " ")
@@ -993,6 +1239,15 @@ func (a *Agent) resolveReporting(r *http.Request, runID int64) {
// Also render the operator-facing HTML summary alongside the JSON.
// Failures here are non-fatal — the JSON is the source of truth.
if host != nil {
fwRows := make([]report.FirmwareSnapshot, 0, len(firmware))
for _, f := range firmware {
fwRows = append(fwRows, report.FirmwareSnapshot{
Component: f.Component,
Identifier: f.Identifier,
Version: f.Version,
Vendor: f.Vendor,
})
}
htmlData := report.Data{
GeneratedAt: time.Now().UTC(),
Run: *run,
@@ -1000,6 +1255,7 @@ func (a *Agent) resolveReporting(r *http.Request, runID int64) {
Stages: stages,
SpecDiffs: diffs,
Aggregates: report.AggregateMeasurements(measurements),
Firmware: fwRows,
}
if htmlBuf, err := report.RenderHTML(htmlData); err != nil {
log.Printf("reporting: render html: %v", err)