// Package agent implements the in-live-image control loop. // // Phase 4 scope: after /claim, the agent walks through every stage the // orchestrator advertises, dispatching on the stage name to a function // in agent/tests. Each stage posts a /result; the response carries the // orchestrator's next_state, which the loop uses to pick the next // stage. Stages the orchestrator owns (SpecValidate, Reporting) resolve // server-side inside /result so the agent never sees them as "its turn". // // Terminal states: // - FailedHolding → request hold key, install authorized_keys, wait // on heartbeats for a retry_stage directive. // - Completed → heartbeat carries cmd=shutdown; agent runs // `systemctl poweroff` and exits. // // Thermal sidecar runs from the moment the agent claims until ctx // cancel; it posts a handful of /sys/class/hwmon samples every 5s. package agent import ( "context" "encoding/json" "fmt" "log" "net" "os" "os/exec" "path/filepath" "strings" "sync" "sync/atomic" "time" "vetting/agent/bootstate" "vetting/agent/probes" "vetting/agent/tests" "vetting/internal/spec" ) // stageCancel holds the cancel func for the in-flight stage ctx so the // heartbeat loop can fire it when the orchestrator returns // cmd=cancel_stage. Stored as an atomic.Value so the heartbeat goroutine // can read without locking; writes happen only on the main loop. var stageCancel atomic.Value // context.CancelFunc // Run is the long-lived entry point. It blocks until ctx is cancelled // or a fatal error makes progress impossible. func Run(ctx context.Context, p *bootstate.Params) error { c := NewClient(p.OrchestratorURL, p.RunID, p.Token, p.TLSCertFPR) fwd := newLogForwarder(ctx, c) defer fwd.close() ip := localIP() fwd.info(fmt.Sprintf("agent starting on %s (run=%d mac=%s)", ip, p.RunID, p.MAC)) if err := callWithBackoff(ctx, "hello", func(ctx context.Context) error { return c.Hello(ctx) }); err != nil { fwd.warn("hello never succeeded: " + err.Error()) } var claim *ClaimResponse if err := callWithBackoff(ctx, "claim", func(ctx context.Context) error { r, err := c.Claim(ctx, ip) if err != nil { return err } claim = r return nil }); err != nil { return err } fwd.info(fmt.Sprintf("claimed run; stages=%v current_state=%s", claim.Stages, claim.CurrentState)) mux := NewSensorMux(ctx, c) defer mux.Close() go thermalSidecar(ctx, mux, fwd) hbCh := make(chan HeartbeatResponse, 4) go heartbeatLoop(ctx, c, fwd, hbCh) // Run every stage the orchestrator advertises. Stages owned by the // orchestrator (SpecValidate, Reporting) resolve inside /result and // flip next_state forward past themselves, so they simply never match // our dispatch table. // // Start stage comes from claim.CurrentState so a re-claim after an // agent crash resumes at the stage the run was parked at, instead of // blindly replaying Inventory and letting the orchestrator silently // advance past the crashed stage (the Orion OOM bug). A fresh claim // naturally lands on InventoryCheck, which maps back to "Inventory". nextStage := stageForState(claim.CurrentState) if nextStage == "" { nextStage = "Inventory" } if nextStage != "Inventory" { fwd.warn(fmt.Sprintf("resuming mid-pipeline at %s (claim current_state=%s) — likely agent restart after crash", nextStage, claim.CurrentState)) } for nextStage != "" { select { case <-ctx.Done(): return ctx.Err() default: } fwd.info("stage: starting " + nextStage) outcome := runStageCancellable(ctx, nextStage, claim, fwd, c, mux, overrideFlags{}) if outcome.Cancelled { fwd.warn("stage cancelled by operator; posting result and exiting") _, _ = postResult(ctx, c, nextStage, outcome) return powerOffAndReturn(fwd) } resp, err := postResult(ctx, c, nextStage, outcome) if err != nil { fwd.error("submit result for " + nextStage + ": " + err.Error()) return err } fwd.info(fmt.Sprintf("stage %s → next_state=%s", nextStage, resp.NextState)) if resp.NextState == "FailedHolding" { if err := requestHold(ctx, c, fwd); err != nil { return err } // Park and wait for an override directive. return waitForOverride(ctx, c, fwd, mux, hbCh, claim) } if resp.NextState == "Completed" || resp.NextState == "" { fwd.info("pipeline complete") <-ctx.Done() return ctx.Err() } nextStage = stageForState(resp.NextState) if nextStage == "" { // next_state is something we don't map (e.g. SpecValidate — but // the orchestrator's /result already resolved it and handed us // back a further-along state). Defensive bail so we don't loop. fwd.warn("no stage maps to state " + resp.NextState + "; parking") <-ctx.Done() return ctx.Err() } } <-ctx.Done() return ctx.Err() } // runStage dispatches on stage name. The Inventory stage is special — // it runs the inventory probe and passes the result as the /result body // (the orchestrator persists it as an artifact). Every other stage // returns a tests.Outcome which postResult marshals generically. func runStage(ctx context.Context, stage string, claim *ClaimResponse, fwd *logForwarder, c *Client, mux *SensorMux, ovr overrideFlags) stageOutcome { fwd.SetStage(stage) defer fwd.ClearStage() deps := newDeps(ctx, c, fwd, mux, ovr, claim, stage) switch stage { case "Inventory": fwd.info("Inventory: probing host hardware") log := probes.Logger{Info: fwd.info, Warn: fwd.warn} inv := &spec.Inventory{} var subs []tests.SubStepReport runSub := func(name string, fn func()) { start := time.Now() fn() subs = append(subs, tests.SubStepReport{ Name: name, Passed: true, StartedAt: start, CompletedAt: time.Now(), }) } runSub("CPU", func() { inv.CPU = probes.CPU(log) }) runSub("Memory", func() { inv.Memory = probes.Memory(log) }) runSub("Disks", func() { inv.Disks = probes.Disks(log) }) runSub("NICs", func() { inv.NICs = probes.NICs(log) }) runSub("GPUs", func() { inv.GPUs = probes.GPUs(log) }) runSub("System", func() { inv.System = probes.System(log) }) runSub("Baseboard", func() { inv.Baseboard = probes.Baseboard(log) }) runSub("PSU", func() { inv.PSU = probes.PSU(log) }) runSub("OS", func() { inv.OS = probes.OS(log) }) summary := inventorySummary(inv) fwd.info("Inventory: " + summary) return stageOutcome{ Outcome: tests.Outcome{ Passed: true, Summary: summary, SubSteps: subs, }, Inventory: inv, } case "Firmware": fwd.info("Firmware: probing firmware versions") snaps, warns := probes.Firmware(ctx) for _, w := range warns { fwd.warn(w) } summary := firmwareSummary(snaps) fwd.info("Firmware: " + summary) return stageOutcome{ Outcome: tests.Outcome{ Passed: true, Summary: summary, Extras: map[string]any{ "warnings": warns, "snapshots": len(snaps), }, }, Firmware: snaps, } case "SMART": return stageOutcome{Outcome: tests.SMART(ctx, deps)} case "CPUStress": return stageOutcome{Outcome: tests.CPUStress(ctx, deps)} case "Storage": return stageOutcome{Outcome: tests.Storage(ctx, deps)} case "Network": duration := deps.NetworkKnobs.Duration if duration <= 0 { duration = 10 * time.Second } return stageOutcome{Outcome: tests.Network(ctx, deps, tests.NetworkConfig{ OrchestratorURL: c.BaseURL, IperfPort: claim.IperfPort, Duration: duration, })} case "Burn": return stageOutcome{Outcome: tests.Burn(ctx, deps, tests.BurnConfig{ OrchestratorURL: c.BaseURL, IperfPort: claim.IperfPort, })} case "GPU": return stageOutcome{Outcome: tests.GPU(ctx, deps)} case "PSU": return stageOutcome{Outcome: tests.PSU(ctx, deps)} } return stageOutcome{Outcome: tests.Outcome{ Passed: false, Message: "unknown stage " + stage, }} } type stageOutcome struct { Outcome tests.Outcome Inventory *spec.Inventory // only for Inventory stage Firmware []probes.FirmwareSnapshot // only for Firmware stage Cancelled bool // set when the stage was cut short by operator cancel } // runStageCancellable wraps runStage in a per-stage context so the // heartbeat loop's cancel_stage directive can kill whatever subprocess // is currently running. If the derived context was cancelled while the // stage executed, the outcome is rewritten as a cancellation record so // the orchestrator has something to persist. func runStageCancellable(parent context.Context, stage string, claim *ClaimResponse, fwd *logForwarder, c *Client, mux *SensorMux, ovr overrideFlags) stageOutcome { stageCtx, cancel := context.WithCancel(parent) stageCancel.Store(cancel) defer func() { cancel() stageCancel.Store(context.CancelFunc(nil)) }() out := runStage(stageCtx, stage, claim, fwd, c, mux, ovr) // If the parent is still live but the stage ctx was cancelled, the // operator fired a cancel — mark the outcome so the caller can exit // the pipeline cleanly. Plain ctx-cancel on ctx.Done (e.g. shutdown) // is handled elsewhere by the main loop's select. if parent.Err() == nil && stageCtx.Err() != nil { out.Cancelled = true out.Outcome.Passed = false if out.Outcome.Message == "" { out.Outcome.Message = "stage cancelled by operator" } out.Outcome.Summary = "cancelled" } return out } // powerOffAndReturn shuts the host down after an operator cancel. Same // best-effort poweroff path as the shutdown heartbeat cmd. func powerOffAndReturn(fwd *logForwarder) error { fwd.info("cancel: powering off host") if err := exec.Command("systemctl", "poweroff").Run(); err != nil { fwd.warn("systemctl poweroff failed: " + err.Error()) _ = exec.Command("shutdown", "-h", "now").Run() } return nil } type overrideFlags struct { Wipe bool `json:"wipe"` } func newDeps(ctx context.Context, c *Client, fwd *logForwarder, mux *SensorMux, ovr overrideFlags, claim *ClaimResponse, stage string) tests.Deps { var expected []tests.ExpectedDisk for _, e := range claim.ExpectedDisks { expected = append(expected, tests.ExpectedDisk{Serial: e.Serial, SizeGB: e.SizeGB}) } return tests.Deps{ Info: fwd.info, Warn: fwd.warn, Error: fwd.error, OverrideWipe: ovr.Wipe, NonDestructive: claim.NonDestructive, ExpectedDisks: expected, StageTimeout: stageTimeout(claim, stage), CPUStressKnobs: tests.CPUStressKnobs{ CPUPass: parseDur(claim.StageConfig.CPUStress.CPUPass), MemPass: parseDur(claim.StageConfig.CPUStress.MemPass), EDACPoll: parseDur(claim.StageConfig.CPUStress.EDACPoll), }, StorageKnobs: tests.StorageKnobs{ Mode: claim.StageConfig.Storage.Mode, FioSize: claim.StageConfig.Storage.FioSize, FioTime: parseDur(claim.StageConfig.Storage.FioTime), FioBS: claim.StageConfig.Storage.FioBS, FioRW: claim.StageConfig.Storage.FioRW, Verify: claim.StageConfig.Storage.Verify, }, NetworkKnobs: tests.NetworkKnobs{ Duration: parseDur(claim.StageConfig.Network.Duration), }, BurnKnobs: tests.BurnKnobs{ Duration: parseDur(claim.StageConfig.Burn.Duration), CPUWorkers: claim.StageConfig.Burn.CPUWorkers, MemPct: claim.StageConfig.Burn.MemPct, FioOnSpare: claim.StageConfig.Burn.FioOnSpare, IperfParallel: claim.StageConfig.Burn.IperfParallel, }, Sensor: func(_ context.Context, samples []tests.Sample) error { out := make([]SensorSample, 0, len(samples)) for _, s := range samples { out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit}) } mux.Send(out) return nil }, } } // stageTimeout reads claim.StageConfig.StageTimeouts[stage] and falls // back to 2 minutes (the pre-Phase-2 default). Malformed entries log and // fall back — we'd rather run the stage than refuse on a typo. func stageTimeout(claim *ClaimResponse, stage string) time.Duration { if claim == nil || claim.StageConfig.StageTimeouts == nil { return 2 * time.Minute } raw, ok := claim.StageConfig.StageTimeouts[stage] if !ok || raw == "" { return 2 * time.Minute } d, err := time.ParseDuration(raw) if err != nil || d <= 0 { return 2 * time.Minute } return d } // parseDur is the permissive duration parser for the knob wire shape. // Empty strings / parse failures yield 0 so callers can treat a zero // value as "use the compile-time default" without a nil-check dance. func parseDur(s string) time.Duration { if s == "" { return 0 } d, err := time.ParseDuration(s) if err != nil || d < 0 { return 0 } return d } // postResult marshals stageOutcome for the /result endpoint. The // Inventory shape is special-cased: it includes the inventory blob so // the orchestrator can persist it and run server-side spec diff. func postResult(ctx context.Context, c *Client, stage string, s stageOutcome) (*ResultResponse, error) { summary, _ := s.Outcome.MarshalSummary() body := map[string]any{ "stage": stage, "passed": s.Outcome.Passed, } if len(summary) > 2 { body["summary"] = json.RawMessage(summary) } if s.Outcome.Message != "" { body["message"] = s.Outcome.Message } if s.Inventory != nil { body["inventory"] = s.Inventory } if len(s.Firmware) > 0 { body["firmware"] = s.Firmware } if len(s.Outcome.SubSteps) > 0 { wire := make([]SubStepReport, 0, len(s.Outcome.SubSteps)) for _, ss := range s.Outcome.SubSteps { w := SubStepReport{ Name: ss.Name, Passed: ss.Passed, Skipped: ss.Skipped, Summary: ss.SummaryJSON, } if !ss.StartedAt.IsZero() { w.StartedAt = ss.StartedAt.UTC().Format(time.RFC3339Nano) } if !ss.CompletedAt.IsZero() { w.CompletedAt = ss.CompletedAt.UTC().Format(time.RFC3339Nano) } wire = append(wire, w) } body["sub_steps"] = wire } return c.Result(ctx, body) } // stageForState maps a RunState string back to the stage executor name. // Every stage-name is the same as its state except Inventory↔InventoryCheck. func stageForState(state string) string { switch state { case "InventoryCheck": return "Inventory" case "Firmware", "SMART", "CPUStress", "Storage", "Network", "Burn", "GPU", "PSU": return state } // SpecValidate and Reporting are orchestrator-owned; we never see // them as next_state because /result resolves past them. return "" } // waitForOverride parks the agent in FailedHolding. It listens for a // heartbeat directive that tells it to retry a stage (e.g. Storage // with wipe-override armed) and re-enters runStage from that point. func waitForOverride(ctx context.Context, c *Client, fwd *logForwarder, mux *SensorMux, hb <-chan HeartbeatResponse, claim *ClaimResponse) error { fwd.info("holding: awaiting operator decision (heartbeat directive or ctx cancel)") for { select { case <-ctx.Done(): return ctx.Err() case cmd, ok := <-hb: if !ok { return nil } if cmd.Cmd != "retry_stage" || cmd.Stage == "" { continue } fwd.info("operator override: retrying stage " + cmd.Stage) var ovr overrideFlags if len(cmd.OverrideFlags) > 0 { _ = json.Unmarshal(cmd.OverrideFlags, &ovr) } outcome := runStageCancellable(ctx, cmd.Stage, claim, fwd, c, mux, ovr) if outcome.Cancelled { fwd.warn("stage cancelled by operator; posting result and exiting") _, _ = postResult(ctx, c, cmd.Stage, outcome) return powerOffAndReturn(fwd) } resp, err := postResult(ctx, c, cmd.Stage, outcome) if err != nil { fwd.error("override: submit result: " + err.Error()) continue } fwd.info(fmt.Sprintf("override stage %s → next_state=%s", cmd.Stage, resp.NextState)) if resp.NextState == "FailedHolding" { // Still broken; keep holding. continue } if resp.NextState == "Completed" { return nil } // Successful retry — continue walking the pipeline from the // state the orchestrator advanced us into. if nextStage := stageForState(resp.NextState); nextStage != "" { for nextStage != "" { select { case <-ctx.Done(): return ctx.Err() default: } fwd.info("stage: starting " + nextStage) out := runStageCancellable(ctx, nextStage, claim, fwd, c, mux, overrideFlags{}) if out.Cancelled { fwd.warn("stage cancelled by operator; posting result and exiting") _, _ = postResult(ctx, c, nextStage, out) return powerOffAndReturn(fwd) } rr, err := postResult(ctx, c, nextStage, out) if err != nil { return err } if rr.NextState == "FailedHolding" || rr.NextState == "Completed" || rr.NextState == "" { return nil } nextStage = stageForState(rr.NextState) } } return nil } } } // requestHold fetches the per-run pubkey and installs it into // /root/.ssh/authorized_keys so the operator can SSH in. func requestHold(ctx context.Context, c *Client, fwd *logForwarder) error { fwd.warn("entering FailedHolding; requesting hold key") resp, err := c.Hold(ctx, localIP()) if err != nil { fwd.error("hold request failed: " + err.Error()) return err } authPath := "/root/.ssh/authorized_keys" if err := os.MkdirAll(filepath.Dir(authPath), 0o700); err != nil { fwd.error("mkdir .ssh: " + err.Error()) return err } f, err := os.OpenFile(authPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) if err != nil { fwd.error("open authorized_keys: " + err.Error()) return err } defer func() { _ = f.Close() }() if _, err := fmt.Fprintln(f, resp.AuthorizedKey); err != nil { fwd.error("write authorized_keys: " + err.Error()) return err } fwd.info("hold key installed; SSH is available to root@" + localIP()) return nil } func inventorySummary(inv *spec.Inventory) string { populated := 0 for _, d := range inv.Memory.Modules { if d.Populated { populated++ } } slots := "" if len(inv.Memory.Modules) > 0 { slots = fmt.Sprintf(" (%d/%d slots)", populated, len(inv.Memory.Modules)) } return fmt.Sprintf("cpu=%q cores=%d ram=%dGiB%s disks=%d nics=%d gpus=%d psu=%d", inv.CPU.Model, inv.CPU.LogicalCores, inv.Memory.TotalGiB, slots, len(inv.Disks), len(inv.NICs), len(inv.GPUs), len(inv.PSU)) } // firmwareSummary renders the one-liner surfaced in the stage tile: // per-component counts so an operator can see "bios=1 nic=2 nvme_fw=1" // without opening the report. func firmwareSummary(snaps []probes.FirmwareSnapshot) string { counts := map[string]int{} for _, s := range snaps { counts[s.Component]++ } if len(counts) == 0 { return "no firmware readable" } keys := []string{"bios", "bmc", "nic", "hba", "nvme_fw", "microcode"} parts := make([]string, 0, len(keys)) for _, k := range keys { if n := counts[k]; n > 0 { parts = append(parts, fmt.Sprintf("%s=%d", k, n)) } } return strings.Join(parts, " ") } // thermalSidecar posts a batch of /sys/class/hwmon samples every 5s. // Idempotent: a dead sensor just drops out of the next batch. Errors // are logged but never fatal — we'd rather have a run with partial // thermal data than kill the agent over an I/O hiccup. func thermalSidecar(ctx context.Context, mux *SensorMux, fwd *logForwarder) { t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: samples := probes.Thermals() if len(samples) == 0 { continue } out := make([]SensorSample, 0, len(samples)) for _, s := range samples { out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit}) } mux.Send(out) } } } func heartbeatLoop(ctx context.Context, c *Client, fwd *logForwarder, out chan<- HeartbeatResponse) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: hbCtx, cancel := context.WithTimeout(ctx, 5*time.Second) resp, err := c.Heartbeat(hbCtx) cancel() if err != nil { fwd.warn("heartbeat error: " + err.Error()) continue } if resp.Cmd == "abort" { fwd.warn("orchestrator said abort; stopping loop") return } if resp.Cmd == "shutdown" { fwd.info("orchestrator said shutdown; powering off host") // Best effort: systemd then sysvinit fallback. Either way, // return so the agent process stops issuing heartbeats. if err := exec.Command("systemctl", "poweroff").Run(); err != nil { fwd.warn("systemctl poweroff failed: " + err.Error()) _ = exec.Command("shutdown", "-h", "now").Run() } return } if resp.Cmd == "cancel_stage" { fwd.warn("orchestrator said cancel_stage; cancelling in-flight stage ctx") if v := stageCancel.Load(); v != nil { if fn, ok := v.(context.CancelFunc); ok && fn != nil { fn() } } continue } if resp.Cmd == "retry_stage" { select { case out <- *resp: default: } } } } } func callWithBackoff(ctx context.Context, label string, f func(context.Context) error) error { backoff := 2 * time.Second for attempt := 1; ; attempt++ { callCtx, cancel := context.WithTimeout(ctx, 10*time.Second) err := f(callCtx) cancel() if err == nil { return nil } if attempt > 20 { return err } log.Printf("agent: %s attempt %d failed: %v (retry in %s)", label, attempt, err, backoff) select { case <-ctx.Done(): return ctx.Err() case <-time.After(backoff): } if backoff < 30*time.Second { backoff *= 2 } } } func localIP() string { addrs, err := net.InterfaceAddrs() if err != nil { return "" } for _, a := range addrs { ipnet, ok := a.(*net.IPNet) if !ok || ipnet.IP.IsLoopback() { continue } v4 := ipnet.IP.To4() if v4 != nil { return v4.String() } } return "" } // ----- log forwarder ----------------------------------------------------- type logForwarder struct { c *Client mu sync.Mutex buf []LogLine stage string // set via SetStage; empties via ClearStage wg sync.WaitGroup cancel context.CancelFunc } func newLogForwarder(parent context.Context, c *Client) *logForwarder { ctx, cancel := context.WithCancel(parent) f := &logForwarder{c: c, cancel: cancel} f.wg.Add(1) go f.loop(ctx) return f } func (f *logForwarder) loop(ctx context.Context) { defer f.wg.Done() t := time.NewTicker(2 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): f.flush() return case <-t.C: f.flush() } } } func (f *logForwarder) push(level, text string) { stamp := time.Now().UTC().Format(time.RFC3339Nano) log.Printf("[%s] %s", level, text) f.mu.Lock() f.buf = append(f.buf, LogLine{TS: stamp, Level: level, Stage: f.stage, Text: text}) f.mu.Unlock() } func (f *logForwarder) info(s string) { f.push("info", s) } func (f *logForwarder) warn(s string) { f.push("warn", s) } func (f *logForwarder) error(s string) { f.push("error", s) } // SetStage tags subsequent log lines with a stage name so the orchestrator // can fan them out on a per-stage SSE event. Safe to call concurrently // with push — we take the same mutex. func (f *logForwarder) SetStage(stage string) { f.mu.Lock() f.stage = stage f.mu.Unlock() } // ClearStage reverts to untagged (framing-level) logging. Defer this // on entry to runStage so hold/override paths don't leak stage context. func (f *logForwarder) ClearStage() { f.mu.Lock() f.stage = "" f.mu.Unlock() } func (f *logForwarder) flush() { f.mu.Lock() if len(f.buf) == 0 { f.mu.Unlock() return } lines := f.buf f.buf = nil f.mu.Unlock() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := f.c.Log(ctx, lines); err != nil { log.Printf("log forward failed: %v", err) } } func (f *logForwarder) close() { f.cancel() f.wg.Wait() }