cpustress+orchestrator: serial CPU/RAM passes + silent-skip guard
Orion's run (log 20:49 → 20:54) shipped GREEN while silently skipping CPUStress. Two compounding bugs: 1. CPUStress ran --cpu N AND --vm N --vm-bytes 90% concurrently. On a 4-core 8 GiB N95, that's 360% RAM overcommit; the OOM-killer fired, usually on the agent itself. Replaced with two sequential passes — CPU (all methods, --verify) for 3 min, then RAM (--vm 1, --vm-bytes capped to MemAvailable − 1.5 GiB, floor 256 MiB, --verify) for 3 min. Each pass now also asserts elapsed ≥ target − 2s so a premature clean exit counts as failure instead of a silent pass. 2. On systemd-restart after the OOM, the agent hardcoded nextStage := "Inventory" and re-ran it. The orchestrator's /result handler advances run state via TriggerStageCompleted against the *current* RunState, not against body.Stage — so an Inventory result posted while the run was in StateCPUStress silently advanced CPUStress → Storage and marked CPUStress passed without it ever running. Two-layer defense for #2: - agent-side: /claim response now carries current_state; agent resumes at the matching stage on a re-claim (happy path). - server-side: new TriggerStageMismatch + StageNameForState helper backstop. If body.Stage doesn't match the run's current stage, /result parks the run in FailedHolding with failed_stage labeled "<got> (expected <expected>)" and returns 409. Other stages audited for similar unbounded concurrency — none found; only CPUStress was unsafe. Tests: - cpustress_test.go — parseMemAvailable parses real meminfo, errors on missing/malformed; cap calc hits floor on tiny boxes, uses 1.5 GiB headroom on normal/huge boxes. - statemachine_test.go — TriggerStageMismatch lands at FailedHolding from every stage state and is rejected from pre-stage/terminal states; StageNameForState round-trips the stageStates map. - agent_handlers_test.go — TestResult_RejectsMismatchedStage proves the Orion scenario now 409s + FailedHolding; TestResult_AcceptsMatchingStage proves the guard doesn't break the happy path. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -180,6 +180,15 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// Re-fetch run state: the Transition above may have advanced us from
|
||||
// Booting → InventoryCheck, and we want to hand that fresh state to
|
||||
// the agent so a re-claim after a crash resumes at the stored state
|
||||
// instead of silently replaying Inventory.
|
||||
currentState := run.State
|
||||
if fresh, err := a.Runs.Get(r.Context(), runID); err == nil && fresh != nil {
|
||||
currentState = fresh.State
|
||||
}
|
||||
|
||||
log.Printf("agent claimed: run=%d agent_ip=%s", runID, agentIP)
|
||||
if a.Logs != nil {
|
||||
if w, err := a.Logs.WriterFor(runID); err == nil {
|
||||
@@ -213,6 +222,7 @@ func (a *Agent) Claim(w http.ResponseWriter, r *http.Request) {
|
||||
"expected_disks": expectedDisks,
|
||||
"iperf_port": iperfPort,
|
||||
"non_destructive": run.NonDestructive,
|
||||
"current_state": string(currentState),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -411,6 +421,42 @@ func (a *Agent) Result(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Silent-skip guard. Orchestrator advances the run state via
|
||||
// TriggerStageCompleted against the *current* state, not against
|
||||
// body.Stage — so an Inventory result posted while the run is in
|
||||
// StateCPUStress would silently advance CPUStress → Storage and mark
|
||||
// CPUStress as passed without it ever running. That's exactly what
|
||||
// happened on Orion when the agent OOM-crashed mid-CPUStress,
|
||||
// systemd restarted it, and the restarted agent (which hardcoded
|
||||
// "Inventory" as its first stage) re-ran Inventory and reported it.
|
||||
// Guard: if body.Stage doesn't match the stage the run is currently
|
||||
// in, park the run in FailedHolding so the operator can investigate
|
||||
// rather than trusting the claim and cascading silent passes.
|
||||
expectedStage := orchestrator.StageNameForState(run.State)
|
||||
if expectedStage != "" && body.Stage != expectedStage {
|
||||
failedLabel := fmt.Sprintf("%s (expected %s)", body.Stage, expectedStage)
|
||||
if err := a.Runs.SetFailedStage(r.Context(), runID, failedLabel); err != nil {
|
||||
log.Printf("result: set failed stage on mismatch run %d: %v", runID, err)
|
||||
}
|
||||
if _, err := a.Runner.Transition(r.Context(), runID, orchestrator.TriggerStageMismatch); err != nil {
|
||||
log.Printf("result: stage-mismatch transition run %d: %v", runID, err)
|
||||
}
|
||||
hostName := a.hostNameFor(r.Context(), run.HostID)
|
||||
a.dispatchEvent(notify.Event{
|
||||
Kind: notify.KindStageFailed,
|
||||
Severity: notify.SeverityCritical,
|
||||
RunID: runID,
|
||||
HostName: hostName,
|
||||
Title: fmt.Sprintf("[vetting] %s stage mismatch: %s", hostName, body.Stage),
|
||||
Body: fmt.Sprintf("Run %d reported stage %s while orchestrator expected %s — parked in FailedHolding to prevent silent skip.",
|
||||
runID, body.Stage, expectedStage),
|
||||
URL: a.runLinkURL(runID),
|
||||
})
|
||||
log.Printf("result: stage mismatch run=%d got=%s expected=%s — parked", runID, body.Stage, expectedStage)
|
||||
http.Error(w, "stage mismatch: got "+body.Stage+", expected "+expectedStage, http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
stageState := model.StagePassed
|
||||
if !body.Passed {
|
||||
stageState = model.StageFailed
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"vetting/internal/api"
|
||||
"vetting/internal/db"
|
||||
"vetting/internal/events"
|
||||
"vetting/internal/model"
|
||||
"vetting/internal/orchestrator"
|
||||
"vetting/internal/store"
|
||||
@@ -107,7 +108,7 @@ func TestSensorRejectsBadToken(t *testing.T) {
|
||||
func TestHeartbeatShutdownWhenCompleted(t *testing.T) {
|
||||
a, runID, token := setupAgent(t)
|
||||
// Wire a runner so Heartbeat's TouchHeartbeat call doesn't nil-panic.
|
||||
a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}}
|
||||
a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()}
|
||||
if err := a.Runs.SetState(context.Background(), runID, model.StateCompleted); err != nil {
|
||||
t.Fatalf("set state: %v", err)
|
||||
}
|
||||
@@ -126,3 +127,91 @@ func TestHeartbeatShutdownWhenCompleted(t *testing.T) {
|
||||
t.Fatalf("cmd = %v, want shutdown", resp["cmd"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestResult_RejectsMismatchedStage is the silent-skip guard's unit
|
||||
// test. The Orion failure mode: agent crashes mid-CPUStress, systemd
|
||||
// restarts it, restarted agent replays Inventory and /results it.
|
||||
// Before the guard, the orchestrator advanced StateCPUStress → Storage
|
||||
// on TriggerStageCompleted; CPUStress got marked passed without ever
|
||||
// running. Guard's contract: if body.Stage doesn't match the stage the
|
||||
// run is in, reject with 409 and park the run in FailedHolding with a
|
||||
// failed_stage that names *what* was reported vs. what was expected.
|
||||
func TestResult_RejectsMismatchedStage(t *testing.T) {
|
||||
a, runID, token := setupAgent(t)
|
||||
a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()}
|
||||
// Park the run in CPUStress — the state Orion was in when its
|
||||
// agent crashed.
|
||||
if err := a.Runs.SetState(context.Background(), runID, model.StateCPUStress); err != nil {
|
||||
t.Fatalf("set state: %v", err)
|
||||
}
|
||||
|
||||
// Restarted agent's hardcoded-Inventory-first behavior: it replays
|
||||
// Inventory and posts a passed result for it.
|
||||
body, _ := json.Marshal(map[string]any{
|
||||
"stage": "Inventory",
|
||||
"passed": true,
|
||||
})
|
||||
req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/result", body)
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rr := httptest.NewRecorder()
|
||||
a.Result(rr, req)
|
||||
|
||||
if rr.Code != http.StatusConflict {
|
||||
t.Fatalf("status = %d, want 409; body = %s", rr.Code, rr.Body.String())
|
||||
}
|
||||
after, err := a.Runs.Get(context.Background(), runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if after.State != model.StateFailedHolding {
|
||||
t.Fatalf("run state = %q, want FailedHolding", after.State)
|
||||
}
|
||||
if after.FailedStage == "" {
|
||||
t.Fatalf("failed_stage is empty; expected mismatch label")
|
||||
}
|
||||
// The label must name both sides so the operator can see the
|
||||
// skew without digging through logs.
|
||||
for _, want := range []string{"Inventory", "CPUStress"} {
|
||||
if !bytes.Contains([]byte(after.FailedStage), []byte(want)) {
|
||||
t.Errorf("failed_stage %q missing %q", after.FailedStage, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestResult_AcceptsMatchingStage confirms the guard's complement: when
|
||||
// the agent reports the stage the run is actually in, /result advances
|
||||
// the pipeline normally. Without this, a too-strict guard could reject
|
||||
// every result and freeze all runs.
|
||||
func TestResult_AcceptsMatchingStage(t *testing.T) {
|
||||
a, runID, token := setupAgent(t)
|
||||
a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}, EventHub: events.NewHub()}
|
||||
stages := &store.Stages{DB: a.Runs.DB}
|
||||
if err := stages.Seed(context.Background(), runID); err != nil {
|
||||
t.Fatalf("seed stages: %v", err)
|
||||
}
|
||||
if err := a.Runs.SetState(context.Background(), runID, model.StateSMART); err != nil {
|
||||
t.Fatalf("set state: %v", err)
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(map[string]any{
|
||||
"stage": "SMART",
|
||||
"passed": true,
|
||||
})
|
||||
req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/result", body)
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rr := httptest.NewRecorder()
|
||||
a.Result(rr, req)
|
||||
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Fatalf("status = %d, want 200; body = %s", rr.Code, rr.Body.String())
|
||||
}
|
||||
after, err := a.Runs.Get(context.Background(), runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if after.State != model.StateCPUStress {
|
||||
t.Fatalf("run state = %q, want CPUStress after SMART pass", after.State)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user