diff --git a/agent/client.go b/agent/client.go index e1195ad..313368e 100644 --- a/agent/client.go +++ b/agent/client.go @@ -118,6 +118,12 @@ type ClaimResponse struct { ExpectedDisks []ClaimExpectedDiskSpec `json:"expected_disks"` IperfPort int `json:"iperf_port"` NonDestructive bool `json:"non_destructive"` + // CurrentState is the run's current state at claim time. A fresh + // claim yields "InventoryCheck"; a re-claim after an agent crash + // yields whatever stage the run was parked at, so the agent resumes + // at the right stage instead of silently replaying Inventory and + // letting the orchestrator advance past the crashed stage. + CurrentState string `json:"current_state"` } type ClaimExpectedDiskSpec struct { diff --git a/agent/runner.go b/agent/runner.go index 7b939f8..0c482e2 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -69,7 +69,7 @@ func Run(ctx context.Context, p *bootstate.Params) error { }); err != nil { return err } - fwd.info(fmt.Sprintf("claimed run; stages=%v", claim.Stages)) + fwd.info(fmt.Sprintf("claimed run; stages=%v current_state=%s", claim.Stages, claim.CurrentState)) go thermalSidecar(ctx, c, fwd) @@ -80,7 +80,20 @@ func Run(ctx context.Context, p *bootstate.Params) error { // orchestrator (SpecValidate, Reporting) resolve inside /result and // flip next_state forward past themselves, so they simply never match // our dispatch table. - nextStage := "Inventory" + // + // Start stage comes from claim.CurrentState so a re-claim after an + // agent crash resumes at the stage the run was parked at, instead of + // blindly replaying Inventory and letting the orchestrator silently + // advance past the crashed stage (the Orion OOM bug). A fresh claim + // naturally lands on InventoryCheck, which maps back to "Inventory". + nextStage := stageForState(claim.CurrentState) + if nextStage == "" { + nextStage = "Inventory" + } + if nextStage != "Inventory" { + fwd.warn(fmt.Sprintf("resuming mid-pipeline at %s (claim current_state=%s) — likely agent restart after crash", + nextStage, claim.CurrentState)) + } for nextStage != "" { select { case <-ctx.Done(): diff --git a/agent/tests/cpustress.go b/agent/tests/cpustress.go index a68dc0f..88ff1bc 100644 --- a/agent/tests/cpustress.go +++ b/agent/tests/cpustress.go @@ -1,8 +1,11 @@ package tests import ( + "bufio" "context" "fmt" + "io" + "os" "os/exec" "runtime" "strconv" @@ -10,19 +13,34 @@ import ( "time" ) -// CPUStress runs stress-ng with CPU workers AND memory stressors. The -// memory stressors take the place of a Memtest86+ pass — per the plan, -// running under Linux gives us exit-code-based pass/fail and log -// capture we can't get from Memtest without IPMI serial redirection. +// CPUStress runs stress-ng as two serial passes. The previous shape +// (--cpu N AND --vm N --vm-bytes 90% concurrently) OOM-killed the +// agent itself on small hosts: 4 workers × 90% of an 8GiB box is 360% +// overcommit, and the kernel killed stress-ng / agent / whatever the +// OOM scorer picked. We flip it serial so only one stressor is live +// at a time and the RAM cap is computed from MemAvailable with a +// 1.5GiB headroom reserve, keeping the kernel + agent + log buffers +// alive. // -// Non-zero exit = stress-ng aborted due to a failure (bit flip, OOM -// kill, etc.) → stage fails. Exit 0 means the kernel returned sane -// pages for the full duration, which is the Phase 4 health bar. +// Other stages were audited at the same time (SMART, Storage, +// Network, GPU, PSU, Inventory, SpecValidate, Reporting) — none had +// the CPUStress pattern of unbounded concurrency, so they're +// unchanged. +// +// Pass 1 — CPU only, all methods, 3min. --verify re-runs the ALU +// work and diffs against known-good outputs so a silent miscomputation +// (rowhammered register, flaky bus) still fails the stage. +// +// Pass 2 — RAM only, single worker, 3min. --vm-bytes is +// MemAvailable − 1.5GiB, floor 256MiB. --vm-keep reuses the same +// mapping across iterations so we hit every page repeatedly within the +// window. +// +// Each pass also asserts elapsed ≥ (target − 2s). A premature clean +// exit (stress-ng killed by a signal, workload bailed quietly) now +// counts as a failure instead of falsely passing on exit-0. func CPUStress(ctx context.Context, d Deps) Outcome { if _, err := exec.LookPath("stress-ng"); err != nil { - // The live image ships stress-ng; absence is a packaging defect, - // not a benign local-dev scenario. Fail loudly so a regression - // in the image doesn't silently pass runs. d.Error("CPUStress: stress-ng not found in PATH — live image is missing required tool") return Outcome{ Passed: false, @@ -32,55 +50,172 @@ func CPUStress(ctx context.Context, d Deps) Outcome { } } - // Timeout: Deps.StageTimeout may be zero in tests; default 2 min. - timeout := d.StageTimeout - if timeout <= 0 { - timeout = 2 * time.Minute - } - cores := runtime.NumCPU() - // --vm N allocates N worker processes each touching 90% of RAM. On - // an 8-core host with 32GiB this is 8 × ~28GiB sliding windows — - // enough to exercise every DIMM row within a minute. - args := []string{ + extras := map[string]any{"cores": cores} + + // Pass 1: CPU + cpu := runStressPass(ctx, d, "CPU", cpuPassDuration, []string{ "--cpu", strconv.Itoa(cores), "--cpu-method", "all", - "--vm", strconv.Itoa(cores), - "--vm-bytes", "90%", - "--timeout", durationSeconds(timeout), + "--timeout", durationSeconds(cpuPassDuration), "--metrics-brief", "--verify", + }) + extras["cpu_pass"] = cpu + if !cpu.Passed { + return Outcome{ + Passed: false, + Message: "CPU pass failed: " + cpu.Err, + Summary: fmt.Sprintf("CPU pass failed after %ds", cpu.ElapsedSecs), + Extras: extras, + } } - d.Info(fmt.Sprintf("CPUStress: stress-ng --cpu %d --vm %d --vm-bytes 90%% --timeout %s", - cores, cores, durationSeconds(timeout))) - runCtx, cancel := context.WithTimeout(ctx, timeout+30*time.Second) + // Pass 2: memory — only after CPU has demonstrated the box is + // sane. Cap derived from /proc/meminfo so we never overcommit. + avail, err := memAvailableBytes() + if err != nil { + d.Error("CPUStress: read MemAvailable: " + err.Error()) + return Outcome{ + Passed: false, + Message: "read MemAvailable: " + err.Error(), + Summary: "failed (meminfo unreadable)", + Extras: extras, + } + } + cap := avail - memHeadroomBytes + extras["mem_available_bytes"] = avail + extras["mem_bytes_cap"] = cap + extras["mem_headroom_bytes"] = int64(memHeadroomBytes) + if cap < memFloorBytes { + msg := fmt.Sprintf("MemAvailable=%d, below %d floor after %d headroom — refusing to run memory pass", + avail, memFloorBytes, memHeadroomBytes) + d.Error("CPUStress: " + msg) + return Outcome{ + Passed: false, + Message: msg, + Summary: "failed (insufficient free RAM for memory pass)", + Extras: extras, + } + } + mem := runStressPass(ctx, d, "memory", memPassDuration, []string{ + "--vm", "1", + "--vm-bytes", strconv.FormatInt(cap, 10), + "--vm-keep", + "--timeout", durationSeconds(memPassDuration), + "--metrics-brief", + "--verify", + }) + extras["mem_pass"] = mem + if !mem.Passed { + return Outcome{ + Passed: false, + Message: "memory pass failed: " + mem.Err, + Summary: fmt.Sprintf("memory pass failed after %ds", mem.ElapsedSecs), + Extras: extras, + } + } + + return Outcome{ + Passed: true, + Summary: fmt.Sprintf("CPU+RAM PASSED (%d cores, %s cap)", + cores, humanBytes(cap)), + Extras: extras, + } +} + +const ( + cpuPassDuration = 3 * time.Minute + memPassDuration = 3 * time.Minute + // memHeadroomBytes = 1.5 GiB reserved for kernel, agent, log + // buffers, and whatever page cache is still live when the stage + // starts. Conservative but keeps us off the OOM scorer. + memHeadroomBytes int64 = 1610612736 + // memFloorBytes — if MemAvailable − headroom drops below this, + // we refuse to run the memory pass rather than stressing a tiny + // window that tells us nothing. + memFloorBytes int64 = 268435456 + passSlack = 2 * time.Second +) + +// stressPass is the per-pass result embedded in CPUStress's Extras. +// Passed==true and Elapsed close to target is the only happy path. +type stressPass struct { + Passed bool `json:"passed"` + Err string `json:"err,omitempty"` + ElapsedSecs int `json:"elapsed_secs"` + TargetSecs int `json:"target_secs"` + OutputTail string `json:"output_tail,omitempty"` +} + +// runStressPass invokes stress-ng and validates both exit code and +// elapsed time. Target is the intended --timeout; we require +// elapsed ≥ target − passSlack so a premature-but-clean exit still +// counts as failure. +func runStressPass(ctx context.Context, d Deps, label string, target time.Duration, args []string) stressPass { + d.Info(fmt.Sprintf("CPUStress: %s pass starting — stress-ng %s", label, strings.Join(args, " "))) + runCtx, cancel := context.WithTimeout(ctx, target+30*time.Second) defer cancel() cmd := exec.CommandContext(runCtx, "stress-ng", args...) start := time.Now() out, err := cmd.CombinedOutput() - elapsed := time.Since(start).Round(time.Second) + elapsed := time.Since(start) - extras := map[string]any{ - "cores": cores, - "elapsed_secs": elapsed.Seconds(), - "output_tail": tailLines(string(out), 20), + res := stressPass{ + ElapsedSecs: int(elapsed.Round(time.Second).Seconds()), + TargetSecs: int(target.Round(time.Second).Seconds()), + OutputTail: tailLines(string(out), 20), } if err != nil { - d.Error("CPUStress: stress-ng failed: " + err.Error()) - return Outcome{ - Passed: false, - Message: "stress-ng returned non-zero: " + err.Error(), - Summary: fmt.Sprintf("failed after %s", elapsed), - Extras: extras, + res.Err = err.Error() + d.Error(fmt.Sprintf("CPUStress: %s pass failed after %s: %s", + label, elapsed.Round(time.Second), err.Error())) + return res + } + if elapsed < target-passSlack { + res.Err = fmt.Sprintf("stress-ng exited cleanly after %s; expected ≥ %s (premature exit — signal or broken workload)", + elapsed.Round(time.Second), target-passSlack) + d.Error("CPUStress: " + label + " pass " + res.Err) + return res + } + res.Passed = true + d.Info(fmt.Sprintf("CPUStress: %s pass PASSED in %s", label, elapsed.Round(time.Second))) + return res +} + +// memAvailableBytes reads /proc/meminfo and returns MemAvailable in +// bytes. Split from parseMemAvailable so the parse step is testable +// without touching the real filesystem. +func memAvailableBytes() (int64, error) { + f, err := os.Open("/proc/meminfo") + if err != nil { + return 0, err + } + defer func() { _ = f.Close() }() + return parseMemAvailable(f) +} + +func parseMemAvailable(r io.Reader) (int64, error) { + sc := bufio.NewScanner(r) + for sc.Scan() { + line := sc.Text() + if !strings.HasPrefix(line, "MemAvailable:") { + continue } + fields := strings.Fields(line) + if len(fields) < 2 { + return 0, fmt.Errorf("malformed MemAvailable line: %q", line) + } + kb, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + return 0, fmt.Errorf("parse MemAvailable: %w", err) + } + return kb * 1024, nil } - d.Info(fmt.Sprintf("CPUStress: stress-ng completed cleanly in %s", elapsed)) - return Outcome{ - Passed: true, - Summary: fmt.Sprintf("stress-ng PASSED after %s (%d cores + 90%% RAM)", elapsed, cores), - Extras: extras, + if err := sc.Err(); err != nil { + return 0, err } + return 0, fmt.Errorf("MemAvailable not found in /proc/meminfo") } func durationSeconds(d time.Duration) string { @@ -99,3 +234,19 @@ func tailLines(s string, n int) string { } return strings.Join(lines, "\n") } + +func humanBytes(b int64) string { + const ( + kib = 1024 + mib = 1024 * kib + gib = 1024 * mib + ) + switch { + case b >= gib: + return fmt.Sprintf("%.1f GiB", float64(b)/float64(gib)) + case b >= mib: + return fmt.Sprintf("%d MiB", b/mib) + default: + return fmt.Sprintf("%d B", b) + } +} diff --git a/agent/tests/cpustress_test.go b/agent/tests/cpustress_test.go new file mode 100644 index 0000000..93ac06d --- /dev/null +++ b/agent/tests/cpustress_test.go @@ -0,0 +1,88 @@ +package tests + +import ( + "strings" + "testing" +) + +// TestParseMemAvailable_RealSample exercises parseMemAvailable on a +// real /proc/meminfo snippet. Units are always kB and always the +// second field; we just want to confirm we strip it correctly. +func TestParseMemAvailable_RealSample(t *testing.T) { + sample := `MemTotal: 8053292 kB +MemFree: 3205104 kB +MemAvailable: 6742180 kB +Buffers: 145332 kB +Cached: 2934064 kB +` + got, err := parseMemAvailable(strings.NewReader(sample)) + if err != nil { + t.Fatalf("parseMemAvailable: %v", err) + } + want := int64(6742180) * 1024 + if got != want { + t.Errorf("MemAvailable = %d bytes, want %d", got, want) + } +} + +func TestParseMemAvailable_Missing(t *testing.T) { + sample := "MemTotal: 8053292 kB\nMemFree: 3205104 kB\n" + if _, err := parseMemAvailable(strings.NewReader(sample)); err == nil { + t.Errorf("expected error when MemAvailable absent") + } +} + +func TestParseMemAvailable_Malformed(t *testing.T) { + sample := "MemAvailable:\n" + if _, err := parseMemAvailable(strings.NewReader(sample)); err == nil { + t.Errorf("expected error on single-field MemAvailable line") + } +} + +// TestMemCap_Normal: on a healthy 8GiB box with ~6.4GiB available, +// cap lands at ~4.9GiB — well above floor, well below total. +func TestMemCap_Normal(t *testing.T) { + avail := int64(6742180) * 1024 // ~6.4 GiB + cap := avail - memHeadroomBytes + if cap < memFloorBytes { + t.Errorf("cap=%d should be ≥ floor=%d on 6.4GiB available", cap, memFloorBytes) + } + // Sanity: headroom carved off 1.5 GiB. + if got := avail - cap; got != memHeadroomBytes { + t.Errorf("headroom = %d, want %d", got, memHeadroomBytes) + } +} + +// TestMemCap_FloorHit: a box with <1.75 GiB available should fall +// below the floor so CPUStress refuses the memory pass rather than +// running a window too small to be meaningful. +func TestMemCap_FloorHit(t *testing.T) { + avail := int64(1_500_000_000) // 1.4 GiB + cap := avail - memHeadroomBytes + if cap >= memFloorBytes { + t.Errorf("cap=%d should be < floor=%d on 1.4GiB available (cap pre-clamp)", cap, memFloorBytes) + } +} + +// TestMemCap_HugeBox: a 128 GiB box still honors the 1.5 GiB +// headroom (no weird upper clamp that would cap us at a tiny +// fraction of the RAM). +func TestMemCap_HugeBox(t *testing.T) { + avail := int64(128) * 1024 * 1024 * 1024 + cap := avail - memHeadroomBytes + if cap < avail-2*memHeadroomBytes { + t.Errorf("cap=%d unexpectedly below avail=%d − 2×headroom", cap, avail) + } + // Should be comfortably above 100 GiB. + if cap < 100*1024*1024*1024 { + t.Errorf("cap=%d should exceed 100 GiB on 128 GiB box", cap) + } +} + +// TestDurationSeconds_BelowOne floors at "1s"; stress-ng rejects 0. +func TestDurationSeconds_BelowOne(t *testing.T) { + got := durationSeconds(0) + if got != "1s" { + t.Errorf("durationSeconds(0) = %q, want 1s", got) + } +} diff --git a/internal/api/agent_handlers.go b/internal/api/agent_handlers.go index e57969a..66263b1 100644 --- a/internal/api/agent_handlers.go +++ b/internal/api/agent_handlers.go @@ -180,6 +180,15 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) { } } + // Re-fetch run state: the Transition above may have advanced us from + // Booting → InventoryCheck, and we want to hand that fresh state to + // the agent so a re-claim after a crash resumes at the stored state + // instead of silently replaying Inventory. + currentState := run.State + if fresh, err := a.Runs.Get(r.Context(), runID); err == nil && fresh != nil { + currentState = fresh.State + } + log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP) if a.Logs != nil { if w, err := a.Logs.WriterFor(runID); err == nil { @@ -213,6 +222,7 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) { "expected_disks": expectedDisks, "iperf_port": iperfPort, "non_destructive": run.NonDestructive, + "current_state": string(currentState), }) } @@ -411,6 +421,42 @@ func (a *Agent) Result(w http.ResponseWriter, r *http.Request) { return } + // Silent-skip guard. Orchestrator advances the run state via + // TriggerStageCompleted against the *current* state, not against + // body.Stage — so an Inventory result posted while the run is in + // StateCPUStress would silently advance CPUStress → Storage and mark + // CPUStress as passed without it ever running. That's exactly what + // happened on Orion when the agent OOM-crashed mid-CPUStress, + // systemd restarted it, and the restarted agent (which hardcoded + // "Inventory" as its first stage) re-ran Inventory and reported it. + // Guard: if body.Stage doesn't match the stage the run is currently + // in, park the run in FailedHolding so the operator can investigate + // rather than trusting the claim and cascading silent passes. + expectedStage := orchestrator.StageNameForState(run.State) + if expectedStage != "" && body.Stage != expectedStage { + failedLabel := fmt.Sprintf("%s (expected %s)", body.Stage, expectedStage) + if err := a.Runs.SetFailedStage(r.Context(), runID, failedLabel); err != nil { + log.Printf("result: set failed stage on mismatch run %d: %v", runID, err) + } + if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageMismatch); err != nil { + log.Printf("result: stage-mismatch transition run %d: %v", runID, err) + } + hostName := a.hostNameFor(r.Context(), run.HostID) + a.dispatchEvent(notify.Event{ + Kind: notify.KindStageFailed, + Severity: notify.SeverityCritical, + RunID: runID, + HostName: hostName, + Title: fmt.Sprintf("[vetting] %s stage mismatch: %s", hostName, body.Stage), + Body: fmt.Sprintf("Run %d reported stage %s while orchestrator expected %s — parked in FailedHolding to prevent silent skip.", + runID, body.Stage, expectedStage), + URL: a.runLinkURL(runID), + }) + log.Printf("result: stage mismatch run=%d got=%s expected=%s — parked", runID, body.Stage, expectedStage) + http.Error(w, "stage mismatch: got "+body.Stage+", expected "+expectedStage, http.StatusConflict) + return + } + stageState := model.StagePassed if !body.Passed { stageState = model.StageFailed diff --git a/internal/api/agent_handlers_test.go b/internal/api/agent_handlers_test.go index 67ee41f..c705919 100644 --- a/internal/api/agent_handlers_test.go +++ b/internal/api/agent_handlers_test.go @@ -14,6 +14,7 @@ import ( "vetting/internal/api" "vetting/internal/db" + "vetting/internal/events" "vetting/internal/model" "vetting/internal/orchestrator" "vetting/internal/store" @@ -107,7 +108,7 @@ func TestSensorRejectsBadToken(t *testing.T) { func TestHeartbeatShutdownWhenCompleted(t *testing.T) { a, runID, token := setupAgent(t) // Wire a runner so Heartbeat's TouchHeartbeat call doesn't nil-panic. - a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}} + a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()} if err := a.Runs.SetState(context.Background(), runID, model.StateCompleted); err != nil { t.Fatalf("set state: %v", err) } @@ -126,3 +127,91 @@ func TestHeartbeatShutdownWhenCompleted(t *testing.T) { t.Fatalf("cmd = %v, want shutdown", resp["cmd"]) } } + +// TestResult_RejectsMismatchedStage is the silent-skip guard's unit +// test. The Orion failure mode: agent crashes mid-CPUStress, systemd +// restarts it, restarted agent replays Inventory and /results it. +// Before the guard, the orchestrator advanced StateCPUStress → Storage +// on TriggerStageCompleted; CPUStress got marked passed without ever +// running. Guard's contract: if body.Stage doesn't match the stage the +// run is in, reject with 409 and park the run in FailedHolding with a +// failed_stage that names *what* was reported vs. what was expected. +func TestResult_RejectsMismatchedStage(t *testing.T) { + a, runID, token := setupAgent(t) + a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()} + // Park the run in CPUStress — the state Orion was in when its + // agent crashed. + if err := a.Runs.SetState(context.Background(), runID, model.StateCPUStress); err != nil { + t.Fatalf("set state: %v", err) + } + + // Restarted agent's hardcoded-Inventory-first behavior: it replays + // Inventory and posts a passed result for it. + body, _ := json.Marshal(map[string]any{ + "stage": "Inventory", + "passed": true, + }) + req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/result", body) + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + a.Result(rr, req) + + if rr.Code != http.StatusConflict { + t.Fatalf("status = %d, want 409; body = %s", rr.Code, rr.Body.String()) + } + after, err := a.Runs.Get(context.Background(), runID) + if err != nil { + t.Fatalf("get run: %v", err) + } + if after.State != model.StateFailedHolding { + t.Fatalf("run state = %q, want FailedHolding", after.State) + } + if after.FailedStage == "" { + t.Fatalf("failed_stage is empty; expected mismatch label") + } + // The label must name both sides so the operator can see the + // skew without digging through logs. + for _, want := range []string{"Inventory", "CPUStress"} { + if !bytes.Contains([]byte(after.FailedStage), []byte(want)) { + t.Errorf("failed_stage %q missing %q", after.FailedStage, want) + } + } +} + +// TestResult_AcceptsMatchingStage confirms the guard's complement: when +// the agent reports the stage the run is actually in, /result advances +// the pipeline normally. Without this, a too-strict guard could reject +// every result and freeze all runs. +func TestResult_AcceptsMatchingStage(t *testing.T) { + a, runID, token := setupAgent(t) + a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()} + stages := &store.Stages{DB: a.Runs.DB} + if err := stages.Seed(context.Background(), runID); err != nil { + t.Fatalf("seed stages: %v", err) + } + if err := a.Runs.SetState(context.Background(), runID, model.StateSMART); err != nil { + t.Fatalf("set state: %v", err) + } + + body, _ := json.Marshal(map[string]any{ + "stage": "SMART", + "passed": true, + }) + req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/result", body) + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + rr := httptest.NewRecorder() + a.Result(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body = %s", rr.Code, rr.Body.String()) + } + after, err := a.Runs.Get(context.Background(), runID) + if err != nil { + t.Fatalf("get run: %v", err) + } + if after.State != model.StateCPUStress { + t.Fatalf("run state = %q, want CPUStress after SMART pass", after.State) + } +} diff --git a/internal/orchestrator/statemachine.go b/internal/orchestrator/statemachine.go index 233b5c8..5e3e57b 100644 --- a/internal/orchestrator/statemachine.go +++ b/internal/orchestrator/statemachine.go @@ -16,6 +16,7 @@ const ( TriggerPXEObserved Trigger = "PXEObserved" // iPXE fetched cmdline for MAC TriggerAgentClaimed Trigger = "AgentClaimed" // agent POSTed /claim with valid token TriggerStageFailed Trigger = "StageFailed" // a stage reported failure + TriggerStageMismatch Trigger = "StageMismatch" // agent reported a stage that doesn't match current run state (silent-skip guard) TriggerStageCompleted Trigger = "StageCompleted" // a stage reported success → advance TriggerAllStagesPassed Trigger = "AllStagesPassed" // final stage passed TriggerOperatorReleased Trigger = "OperatorReleased" // user clicked Release on a held run @@ -65,6 +66,7 @@ var table = map[Trigger]transition{ TriggerPXEObserved: {from: []model.RunState{model.StateWaitingReboot, model.StateWaitingWoL, model.StateBooting}, to: model.StateBooting}, TriggerAgentClaimed: {from: []model.RunState{model.StateBooting, model.StateWaitingReboot, model.StateWaitingWoL}, to: model.StateInventoryCheck}, TriggerStageFailed: {from: allActiveStates(), to: model.StateFailedHolding}, + TriggerStageMismatch: {from: stageExecutionStates(), to: model.StateFailedHolding}, TriggerAllStagesPassed: {from: []model.RunState{model.StateReporting}, to: model.StateCompleted}, TriggerOperatorReleased: {from: []model.RunState{model.StateFailedHolding}, to: model.StateReleased}, TriggerOperatorCancelled: {from: allActiveStates(), to: model.StateCancelled}, @@ -111,6 +113,21 @@ func StateForStage(name string) (model.RunState, bool) { return s, ok } +// StageNameForState is the inverse of StateForStage: given a run state +// that maps to a stage, returns the stage name (e.g. StateCPUStress → +// "CPUStress"). Empty string when the state isn't a stage-execution +// state (Queued, Booting, FailedHolding, etc.). Used by /result to +// detect when an agent submitted a stage name that doesn't match where +// the orchestrator thinks the run is — the silent-skip guard. +func StageNameForState(s model.RunState) string { + for name, state := range stageStates { + if state == s { + return name + } + } + return "" +} + func nextStageState(current model.RunState) (model.RunState, error) { for i, s := range stageOrder { if s == current { @@ -131,3 +148,11 @@ func allActiveStates() []model.RunState { model.StateGPU, model.StatePSU, model.StateReporting, } } + +// stageExecutionStates returns only the stage-execution states — no +// pre-stages, no terminals. Used as the valid "from" set for +// TriggerStageMismatch: it's nonsensical to fire a stage-mismatch from +// Queued or Booting because no stage result should arrive then. +func stageExecutionStates() []model.RunState { + return append([]model.RunState(nil), stageOrder...) +} diff --git a/internal/orchestrator/statemachine_test.go b/internal/orchestrator/statemachine_test.go index b9ad812..50ecf0b 100644 --- a/internal/orchestrator/statemachine_test.go +++ b/internal/orchestrator/statemachine_test.go @@ -74,6 +74,70 @@ func TestTriggerAgentClaimedFromWaitingReboot(t *testing.T) { } } +// TestTriggerStageMismatch asserts the silent-skip guard: from every +// stage-execution state, a mismatch lands the run in FailedHolding, and +// from non-stage states (pre-stages, terminals) the trigger is rejected. +func TestTriggerStageMismatch(t *testing.T) { + stageStates := []model.RunState{ + model.StateInventoryCheck, + model.StateSpecValidate, + model.StateSMART, + model.StateCPUStress, + model.StateStorage, + model.StateNetwork, + model.StateGPU, + model.StatePSU, + model.StateReporting, + } + for _, from := range stageStates { + got, err := orchestrator.Next(from, orchestrator.TriggerStageMismatch) + if err != nil { + t.Fatalf("StageMismatch from %q: %v", from, err) + } + if got != model.StateFailedHolding { + t.Fatalf("StageMismatch from %q = %q, want FailedHolding", from, got) + } + } + for _, bad := range []model.RunState{ + model.StateRegistered, model.StateQueued, model.StateBooting, + model.StateWaitingReboot, model.StateCompleted, model.StateFailedHolding, + } { + if _, err := orchestrator.Next(bad, orchestrator.TriggerStageMismatch); err == nil { + t.Fatalf("StageMismatch from %q: expected error", bad) + } + } +} + +// TestStageNameForState round-trips the stageStates map: every name in +// StateForStage must come back from StageNameForState, and non-stage +// run states return empty. +func TestStageNameForState(t *testing.T) { + pairs := map[string]model.RunState{ + "Inventory": model.StateInventoryCheck, + "SpecValidate": model.StateSpecValidate, + "SMART": model.StateSMART, + "CPUStress": model.StateCPUStress, + "Storage": model.StateStorage, + "Network": model.StateNetwork, + "GPU": model.StateGPU, + "PSU": model.StatePSU, + "Reporting": model.StateReporting, + } + for name, state := range pairs { + if got := orchestrator.StageNameForState(state); got != name { + t.Errorf("StageNameForState(%q) = %q, want %q", state, got, name) + } + } + for _, s := range []model.RunState{ + model.StateRegistered, model.StateQueued, model.StateBooting, + model.StateWaitingReboot, model.StateCompleted, model.StateFailedHolding, + } { + if got := orchestrator.StageNameForState(s); got != "" { + t.Errorf("StageNameForState(%q) = %q, want empty", s, got) + } + } +} + func TestNextStageWalk(t *testing.T) { // Walking StageCompleted from each stage should land on the next // one in the canonical order, and from Reporting onto Completed.