package tests import ( "bufio" "context" "encoding/json" "fmt" "io" "os" "os/exec" "runtime" "strconv" "strings" "sync" "time" "vetting/agent/probes" ) // CPUStress runs stress-ng as two serial passes. The previous shape // (--cpu N AND --vm N --vm-bytes 90% concurrently) OOM-killed the // agent itself on small hosts: 4 workers × 90% of an 8GiB box is 360% // overcommit, and the kernel killed stress-ng / agent / whatever the // OOM scorer picked. We flip it serial so only one stressor is live // at a time and the RAM cap is computed from MemAvailable with a // 1.5GiB headroom reserve, keeping the kernel + agent + log buffers // alive. // // Other stages were audited at the same time (SMART, Storage, // Network, GPU, PSU, Inventory, SpecValidate, Reporting) — none had // the CPUStress pattern of unbounded concurrency, so they're // unchanged. // // Pass 1 — CPU only, all methods, 3min. --verify re-runs the ALU // work and diffs against known-good outputs so a silent miscomputation // (rowhammered register, flaky bus) still fails the stage. // // Pass 2 — RAM only, single worker, 3min. --vm-bytes is // MemAvailable − 1.5GiB, floor 256MiB. --vm-keep reuses the same // mapping across iterations so we hit every page repeatedly within the // window. // // Each pass also asserts elapsed ≥ (target − 2s). A premature clean // exit (stress-ng killed by a signal, workload bailed quietly) now // counts as a failure instead of falsely passing on exit-0. func CPUStress(ctx context.Context, d Deps) Outcome { if _, err := exec.LookPath("stress-ng"); err != nil { d.Error("CPUStress: stress-ng not found in PATH — live image is missing required tool") return Outcome{ Passed: false, Message: "stress-ng binary missing from live image", Summary: "failed (stress-ng missing)", Extras: map[string]any{"reason": "stress_ng_missing"}, } } cores := runtime.NumCPU() extras := map[string]any{"cores": cores} var subs []SubStepReport // EDAC sidecar runs for the lifetime of the stage; cancelled on // return. It polls /sys/devices/system/edac/mc/*/{ce,ue}_count and // posts the current counters so the server-side threshold evaluator // can gate edac_ue > 0 → fail the run. Zero-valued poll falls back // to 10s — the same cadence rasdaemon uses by default. sideCtx, sideCancel := context.WithCancel(ctx) defer sideCancel() var sideWG sync.WaitGroup sideWG.Add(1) go runEDACSidecar(sideCtx, &sideWG, d) // Per-profile durations come from Deps; zero values (missing knobs // or legacy orchestrator) fall back to the package default so the // stage always has a defined budget. cpuDur := nonzeroDur(d.CPUStressKnobs.CPUPass, cpuPassDuration) memDur := nonzeroDur(d.CPUStressKnobs.MemPass, memPassDuration) // Pass 1: CPU cpu := runStressPass(ctx, d, "CPU", cpuDur, []string{ "--cpu", strconv.Itoa(cores), "--cpu-method", "all", "--timeout", durationSeconds(cpuDur), "--metrics-brief", "--verify", }) extras["cpu_pass"] = cpu subs = append(subs, subStepFromPass("CPU pass", cpu)) if !cpu.Passed { return Outcome{ Passed: false, Message: "CPU pass failed: " + cpu.Err, Summary: fmt.Sprintf("CPU pass failed after %ds", cpu.ElapsedSecs), Extras: extras, SubSteps: subs, } } // Pass 2: memory — only after CPU has demonstrated the box is // sane. Cap derived from /proc/meminfo so we never overcommit. avail, err := memAvailableBytes() if err != nil { d.Error("CPUStress: read MemAvailable: " + err.Error()) return Outcome{ Passed: false, Message: "read MemAvailable: " + err.Error(), Summary: "failed (meminfo unreadable)", Extras: extras, SubSteps: subs, } } cap := avail - memHeadroomBytes extras["mem_available_bytes"] = avail extras["mem_bytes_cap"] = cap extras["mem_headroom_bytes"] = int64(memHeadroomBytes) if cap < memFloorBytes { msg := fmt.Sprintf("MemAvailable=%d, below %d floor after %d headroom — refusing to run memory pass", avail, memFloorBytes, memHeadroomBytes) d.Error("CPUStress: " + msg) return Outcome{ Passed: false, Message: msg, Summary: "failed (insufficient free RAM for memory pass)", Extras: extras, SubSteps: subs, } } mem := runStressPass(ctx, d, "memory", memDur, []string{ "--vm", "1", "--vm-bytes", strconv.FormatInt(cap, 10), "--vm-keep", "--timeout", durationSeconds(memDur), "--metrics-brief", "--verify", }) extras["mem_pass"] = mem subs = append(subs, subStepFromPass(fmt.Sprintf("Memory pass (cap %s)", humanBytes(cap)), mem)) if !mem.Passed { return Outcome{ Passed: false, Message: "memory pass failed: " + mem.Err, Summary: fmt.Sprintf("memory pass failed after %ds", mem.ElapsedSecs), Extras: extras, SubSteps: subs, } } return Outcome{ Passed: true, Summary: fmt.Sprintf("CPU+RAM PASSED (%d cores, %s cap)", cores, humanBytes(cap)), Extras: extras, SubSteps: subs, } } // runEDACSidecar polls /sys EDAC counters on d.CPUStressKnobs.EDACPoll // cadence (or 10s fallback) for the lifetime of the stage ctx, emitting // one sample per (memory-controller × {ce,ue}) pair on each tick. A // single failing read is tolerated: the next tick picks up the counter. // // This is where the critical edac_ue threshold becomes a hard-fail: as // soon as a UE counter advances past 0, the server-side evaluator trips // and flips the run into FailedHolding. The sidecar emits whether or // not stress-ng is still running; that keeps the signal live during // inter-pass gaps. // // MCE counts are intentionally not sampled here — they require // rasdaemon or mcelog and vary by live-image packaging. The threshold // rule for mce stays seeded (so the DB shape is stable) but only fires // once a matching kind lands, which is a follow-up. func runEDACSidecar(ctx context.Context, wg *sync.WaitGroup, d Deps) { defer wg.Done() if d.Sensor == nil { return } poll := d.CPUStressKnobs.EDACPoll if poll <= 0 { poll = 10 * time.Second } t := time.NewTicker(poll) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: edac := probes.EDAC() if len(edac) == 0 { continue } batch := make([]Sample, 0, len(edac)) for _, s := range edac { batch = append(batch, Sample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit}) } sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) if err := d.Sensor(sendCtx, batch); err != nil { d.Warn("CPUStress: edac sample post: " + err.Error()) } cancel() } } } // nonzeroDur picks override over fallback, but only when override is // strictly positive. Lets callers pass a zero-value duration to mean // "no override; use fallback" without a separate ok return. func nonzeroDur(override, fallback time.Duration) time.Duration { if override > 0 { return override } return fallback } // subStepFromPass projects a stressPass into a SubStepReport — shared by // both passes and by the mid-stage early-return paths so the UI always // sees exactly one row per pass, even on failure. func subStepFromPass(name string, p stressPass) SubStepReport { summary, _ := json.Marshal(map[string]any{ "elapsed_secs": p.ElapsedSecs, "target_secs": p.TargetSecs, "err": p.Err, }) return SubStepReport{ Name: name, Passed: p.Passed, StartedAt: p.StartedAt, CompletedAt: p.CompletedAt, SummaryJSON: summary, } } const ( cpuPassDuration = 3 * time.Minute memPassDuration = 3 * time.Minute // memHeadroomBytes = 1.5 GiB reserved for kernel, agent, log // buffers, and whatever page cache is still live when the stage // starts. Conservative but keeps us off the OOM scorer. memHeadroomBytes int64 = 1610612736 // memFloorBytes — if MemAvailable − headroom drops below this, // we refuse to run the memory pass rather than stressing a tiny // window that tells us nothing. memFloorBytes int64 = 268435456 passSlack = 2 * time.Second ) // stressPass is the per-pass result embedded in CPUStress's Extras. // Passed==true and Elapsed close to target is the only happy path. // StartedAt/CompletedAt are not serialized (the summary already has // ElapsedSecs) but are used by the caller to emit SubStepReport rows. type stressPass struct { Passed bool `json:"passed"` Err string `json:"err,omitempty"` ElapsedSecs int `json:"elapsed_secs"` TargetSecs int `json:"target_secs"` OutputTail string `json:"output_tail,omitempty"` StartedAt time.Time `json:"-"` CompletedAt time.Time `json:"-"` } // runStressPass invokes stress-ng and validates both exit code and // elapsed time. Target is the intended --timeout; we require // elapsed ≥ target − passSlack so a premature-but-clean exit still // counts as failure. func runStressPass(ctx context.Context, d Deps, label string, target time.Duration, args []string) stressPass { d.Info(fmt.Sprintf("CPUStress: %s pass starting — stress-ng %s", label, strings.Join(args, " "))) runCtx, cancel := context.WithTimeout(ctx, target+30*time.Second) defer cancel() cmd := exec.CommandContext(runCtx, "stress-ng", args...) start := time.Now() out, err := cmd.CombinedOutput() end := time.Now() elapsed := end.Sub(start) res := stressPass{ ElapsedSecs: int(elapsed.Round(time.Second).Seconds()), TargetSecs: int(target.Round(time.Second).Seconds()), OutputTail: tailLines(string(out), 20), StartedAt: start, CompletedAt: end, } if err != nil { res.Err = err.Error() d.Error(fmt.Sprintf("CPUStress: %s pass failed after %s: %s", label, elapsed.Round(time.Second), err.Error())) return res } if elapsed < target-passSlack { res.Err = fmt.Sprintf("stress-ng exited cleanly after %s; expected ≥ %s (premature exit — signal or broken workload)", elapsed.Round(time.Second), target-passSlack) d.Error("CPUStress: " + label + " pass " + res.Err) return res } res.Passed = true d.Info(fmt.Sprintf("CPUStress: %s pass PASSED in %s", label, elapsed.Round(time.Second))) return res } // memAvailableBytes reads /proc/meminfo and returns MemAvailable in // bytes. Split from parseMemAvailable so the parse step is testable // without touching the real filesystem. func memAvailableBytes() (int64, error) { f, err := os.Open("/proc/meminfo") if err != nil { return 0, err } defer func() { _ = f.Close() }() return parseMemAvailable(f) } func parseMemAvailable(r io.Reader) (int64, error) { sc := bufio.NewScanner(r) for sc.Scan() { line := sc.Text() if !strings.HasPrefix(line, "MemAvailable:") { continue } fields := strings.Fields(line) if len(fields) < 2 { return 0, fmt.Errorf("malformed MemAvailable line: %q", line) } kb, err := strconv.ParseInt(fields[1], 10, 64) if err != nil { return 0, fmt.Errorf("parse MemAvailable: %w", err) } return kb * 1024, nil } if err := sc.Err(); err != nil { return 0, err } return 0, fmt.Errorf("MemAvailable not found in /proc/meminfo") } func durationSeconds(d time.Duration) string { s := int(d.Seconds()) if s < 1 { s = 1 } return strconv.Itoa(s) + "s" } // tailLines returns the last n non-empty lines of s, for the summary. func tailLines(s string, n int) string { lines := strings.Split(strings.TrimRight(s, "\n"), "\n") if len(lines) > n { lines = lines[len(lines)-n:] } return strings.Join(lines, "\n") } func humanBytes(b int64) string { const ( kib = 1024 mib = 1024 * kib gib = 1024 * mib ) switch { case b >= gib: return fmt.Sprintf("%.1f GiB", float64(b)/float64(gib)) case b >= mib: return fmt.Sprintf("%d MiB", b/mib) default: return fmt.Sprintf("%d B", b) } }