Post-repair hardware validation pipeline for Proxmox cluster hosts. Go orchestrator + in-image agent + mkosi live image + bundled dnsmasq PXE + SQLite + HTMX/SSE UI + notify registry + janitor + full docs.
This commit is contained in:
+498
@@ -0,0 +1,498 @@
|
||||
// Package agent implements the in-live-image control loop.
|
||||
//
|
||||
// Phase 4 scope: after /claim, the agent walks through every stage the
|
||||
// orchestrator advertises, dispatching on the stage name to a function
|
||||
// in agent/tests. Each stage posts a /result; the response carries the
|
||||
// orchestrator's next_state, which the loop uses to pick the next
|
||||
// stage. Stages the orchestrator owns (SpecValidate, Reporting) resolve
|
||||
// server-side inside /result so the agent never sees them as "its turn".
|
||||
//
|
||||
// Terminal states:
|
||||
// - FailedHolding → request hold key, install authorized_keys, wait
|
||||
// on heartbeats for a retry_stage directive.
|
||||
// - Completed → heartbeat carries cmd=shutdown; agent runs
|
||||
// `systemctl poweroff` and exits.
|
||||
//
|
||||
// Thermal sidecar runs from the moment the agent claims until ctx
|
||||
// cancel; it posts a handful of /sys/class/hwmon samples every 5s.
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"vetting/agent/bootstate"
|
||||
"vetting/agent/probes"
|
||||
"vetting/agent/tests"
|
||||
"vetting/internal/spec"
|
||||
)
|
||||
|
||||
// Run is the long-lived entry point. It blocks until ctx is cancelled
|
||||
// or a fatal error makes progress impossible.
|
||||
func Run(ctx context.Context, p *bootstate.Params) error {
|
||||
c := NewClient(p.OrchestratorURL, p.RunID, p.Token, p.TLSCertFPR)
|
||||
fwd := newLogForwarder(ctx, c)
|
||||
defer fwd.close()
|
||||
|
||||
ip := localIP()
|
||||
fwd.info(fmt.Sprintf("agent starting on %s (run=%d mac=%s)", ip, p.RunID, p.MAC))
|
||||
|
||||
if err := callWithBackoff(ctx, "hello", func(ctx context.Context) error {
|
||||
return c.Hello(ctx)
|
||||
}); err != nil {
|
||||
fwd.warn("hello never succeeded: " + err.Error())
|
||||
}
|
||||
|
||||
var claim *ClaimResponse
|
||||
if err := callWithBackoff(ctx, "claim", func(ctx context.Context) error {
|
||||
r, err := c.Claim(ctx, ip)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
claim = r
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
fwd.info(fmt.Sprintf("claimed run; stages=%v", claim.Stages))
|
||||
|
||||
go thermalSidecar(ctx, c, fwd)
|
||||
|
||||
hbCh := make(chan HeartbeatResponse, 4)
|
||||
go heartbeatLoop(ctx, c, fwd, hbCh)
|
||||
|
||||
// Run every stage the orchestrator advertises. Stages owned by the
|
||||
// orchestrator (SpecValidate, Reporting) resolve inside /result and
|
||||
// flip next_state forward past themselves, so they simply never match
|
||||
// our dispatch table.
|
||||
nextStage := "Inventory"
|
||||
for nextStage != "" {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
fwd.info("stage: starting " + nextStage)
|
||||
outcome := runStage(ctx, nextStage, claim, fwd, c, overrideFlags{})
|
||||
resp, err := postResult(ctx, c, nextStage, outcome)
|
||||
if err != nil {
|
||||
fwd.error("submit result for " + nextStage + ": " + err.Error())
|
||||
return err
|
||||
}
|
||||
fwd.info(fmt.Sprintf("stage %s → next_state=%s", nextStage, resp.NextState))
|
||||
|
||||
if resp.NextState == "FailedHolding" {
|
||||
if err := requestHold(ctx, c, fwd); err != nil {
|
||||
return err
|
||||
}
|
||||
// Park and wait for an override directive.
|
||||
return waitForOverride(ctx, c, fwd, hbCh, claim)
|
||||
}
|
||||
if resp.NextState == "Completed" || resp.NextState == "" {
|
||||
fwd.info("pipeline complete")
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
nextStage = stageForState(resp.NextState)
|
||||
if nextStage == "" {
|
||||
// next_state is something we don't map (e.g. SpecValidate — but
|
||||
// the orchestrator's /result already resolved it and handed us
|
||||
// back a further-along state). Defensive bail so we don't loop.
|
||||
fwd.warn("no stage maps to state " + resp.NextState + "; parking")
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// runStage dispatches on stage name. The Inventory stage is special —
|
||||
// it runs the inventory probe and passes the result as the /result body
|
||||
// (the orchestrator persists it as an artifact). Every other stage
|
||||
// returns a tests.Outcome which postResult marshals generically.
|
||||
func runStage(ctx context.Context, stage string, claim *ClaimResponse, fwd *logForwarder, c *Client, ovr overrideFlags) stageOutcome {
|
||||
deps := newDeps(ctx, c, fwd, ovr, claim)
|
||||
switch stage {
|
||||
case "Inventory":
|
||||
fwd.info("Inventory: probing host hardware")
|
||||
inv, err := probes.Collect()
|
||||
if err != nil {
|
||||
return stageOutcome{Outcome: tests.Outcome{Passed: false, Message: err.Error(), Summary: "probe error"}}
|
||||
}
|
||||
fwd.info("Inventory: " + inventorySummary(inv))
|
||||
return stageOutcome{
|
||||
Outcome: tests.Outcome{
|
||||
Passed: true,
|
||||
Summary: inventorySummary(inv),
|
||||
},
|
||||
Inventory: inv,
|
||||
}
|
||||
case "SMART":
|
||||
return stageOutcome{Outcome: tests.SMART(ctx, deps)}
|
||||
case "CPUStress":
|
||||
return stageOutcome{Outcome: tests.CPUStress(ctx, deps)}
|
||||
case "Storage":
|
||||
return stageOutcome{Outcome: tests.Storage(ctx, deps)}
|
||||
case "Network":
|
||||
return stageOutcome{Outcome: tests.Network(ctx, deps, tests.NetworkConfig{
|
||||
OrchestratorURL: c.BaseURL,
|
||||
IperfPort: claim.IperfPort,
|
||||
Duration: 10 * time.Second,
|
||||
})}
|
||||
case "GPU":
|
||||
return stageOutcome{Outcome: tests.GPU(ctx, deps)}
|
||||
case "PSU":
|
||||
return stageOutcome{Outcome: tests.PSU(ctx, deps)}
|
||||
}
|
||||
return stageOutcome{Outcome: tests.Outcome{
|
||||
Passed: false,
|
||||
Message: "unknown stage " + stage,
|
||||
}}
|
||||
}
|
||||
|
||||
type stageOutcome struct {
|
||||
Outcome tests.Outcome
|
||||
Inventory *spec.Inventory // only for Inventory stage
|
||||
}
|
||||
|
||||
type overrideFlags struct {
|
||||
Wipe bool `json:"wipe"`
|
||||
}
|
||||
|
||||
func newDeps(ctx context.Context, c *Client, fwd *logForwarder, ovr overrideFlags, claim *ClaimResponse) tests.Deps {
|
||||
var expected []tests.ExpectedDisk
|
||||
for _, e := range claim.ExpectedDisks {
|
||||
expected = append(expected, tests.ExpectedDisk{Serial: e.Serial, SizeGB: e.SizeGB})
|
||||
}
|
||||
return tests.Deps{
|
||||
Info: fwd.info,
|
||||
Warn: fwd.warn,
|
||||
Error: fwd.error,
|
||||
OverrideWipe: ovr.Wipe,
|
||||
ExpectedDisks: expected,
|
||||
StageTimeout: 2 * time.Minute,
|
||||
Sensor: func(ctx context.Context, samples []tests.Sample) error {
|
||||
out := make([]SensorSample, 0, len(samples))
|
||||
for _, s := range samples {
|
||||
out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit})
|
||||
}
|
||||
return c.Sensor(ctx, out)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// postResult marshals stageOutcome for the /result endpoint. The
|
||||
// Inventory shape is special-cased: it includes the inventory blob so
|
||||
// the orchestrator can persist it and run server-side spec diff.
|
||||
func postResult(ctx context.Context, c *Client, stage string, s stageOutcome) (*ResultResponse, error) {
|
||||
summary, _ := s.Outcome.MarshalSummary()
|
||||
body := map[string]any{
|
||||
"stage": stage,
|
||||
"passed": s.Outcome.Passed,
|
||||
}
|
||||
if len(summary) > 2 {
|
||||
body["summary"] = json.RawMessage(summary)
|
||||
}
|
||||
if s.Outcome.Message != "" {
|
||||
body["message"] = s.Outcome.Message
|
||||
}
|
||||
if s.Inventory != nil {
|
||||
body["inventory"] = s.Inventory
|
||||
}
|
||||
return c.Result(ctx, body)
|
||||
}
|
||||
|
||||
// stageForState maps a RunState string back to the stage executor name.
|
||||
// Every stage-name is the same as its state except Inventory↔InventoryCheck.
|
||||
func stageForState(state string) string {
|
||||
switch state {
|
||||
case "InventoryCheck":
|
||||
return "Inventory"
|
||||
case "SMART", "CPUStress", "Storage", "Network", "GPU", "PSU":
|
||||
return state
|
||||
}
|
||||
// SpecValidate and Reporting are orchestrator-owned; we never see
|
||||
// them as next_state because /result resolves past them.
|
||||
return ""
|
||||
}
|
||||
|
||||
// waitForOverride parks the agent in FailedHolding. It listens for a
|
||||
// heartbeat directive that tells it to retry a stage (e.g. Storage
|
||||
// with wipe-override armed) and re-enters runStage from that point.
|
||||
func waitForOverride(ctx context.Context, c *Client, fwd *logForwarder, hb <-chan HeartbeatResponse, claim *ClaimResponse) error {
|
||||
fwd.info("holding: awaiting operator decision (heartbeat directive or ctx cancel)")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case cmd, ok := <-hb:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if cmd.Cmd != "retry_stage" || cmd.Stage == "" {
|
||||
continue
|
||||
}
|
||||
fwd.info("operator override: retrying stage " + cmd.Stage)
|
||||
var ovr overrideFlags
|
||||
if len(cmd.OverrideFlags) > 0 {
|
||||
_ = json.Unmarshal(cmd.OverrideFlags, &ovr)
|
||||
}
|
||||
outcome := runStage(ctx, cmd.Stage, claim, fwd, c, ovr)
|
||||
resp, err := postResult(ctx, c, cmd.Stage, outcome)
|
||||
if err != nil {
|
||||
fwd.error("override: submit result: " + err.Error())
|
||||
continue
|
||||
}
|
||||
fwd.info(fmt.Sprintf("override stage %s → next_state=%s", cmd.Stage, resp.NextState))
|
||||
if resp.NextState == "FailedHolding" {
|
||||
// Still broken; keep holding.
|
||||
continue
|
||||
}
|
||||
if resp.NextState == "Completed" {
|
||||
return nil
|
||||
}
|
||||
// Successful retry — continue walking the pipeline from the
|
||||
// state the orchestrator advanced us into.
|
||||
if nextStage := stageForState(resp.NextState); nextStage != "" {
|
||||
for nextStage != "" {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
fwd.info("stage: starting " + nextStage)
|
||||
out := runStage(ctx, nextStage, claim, fwd, c, overrideFlags{})
|
||||
rr, err := postResult(ctx, c, nextStage, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rr.NextState == "FailedHolding" || rr.NextState == "Completed" || rr.NextState == "" {
|
||||
return nil
|
||||
}
|
||||
nextStage = stageForState(rr.NextState)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// requestHold fetches the per-run pubkey and installs it into
|
||||
// /root/.ssh/authorized_keys so the operator can SSH in.
|
||||
func requestHold(ctx context.Context, c *Client, fwd *logForwarder) error {
|
||||
fwd.warn("entering FailedHolding; requesting hold key")
|
||||
resp, err := c.Hold(ctx, localIP())
|
||||
if err != nil {
|
||||
fwd.error("hold request failed: " + err.Error())
|
||||
return err
|
||||
}
|
||||
authPath := "/root/.ssh/authorized_keys"
|
||||
if err := os.MkdirAll(filepath.Dir(authPath), 0o700); err != nil {
|
||||
fwd.error("mkdir .ssh: " + err.Error())
|
||||
return err
|
||||
}
|
||||
f, err := os.OpenFile(authPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
fwd.error("open authorized_keys: " + err.Error())
|
||||
return err
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
if _, err := fmt.Fprintln(f, resp.AuthorizedKey); err != nil {
|
||||
fwd.error("write authorized_keys: " + err.Error())
|
||||
return err
|
||||
}
|
||||
fwd.info("hold key installed; SSH is available to root@" + localIP())
|
||||
return nil
|
||||
}
|
||||
|
||||
func inventorySummary(inv *spec.Inventory) string {
|
||||
return fmt.Sprintf("cpu=%q cores=%d ram=%dGiB disks=%d nics=%d gpus=%d",
|
||||
inv.CPU.Model, inv.CPU.LogicalCores, inv.Memory.TotalGiB,
|
||||
len(inv.Disks), len(inv.NICs), len(inv.GPUs))
|
||||
}
|
||||
|
||||
// thermalSidecar posts a batch of /sys/class/hwmon samples every 5s.
|
||||
// Idempotent: a dead sensor just drops out of the next batch. Errors
|
||||
// are logged but never fatal — we'd rather have a run with partial
|
||||
// thermal data than kill the agent over an I/O hiccup.
|
||||
func thermalSidecar(ctx context.Context, c *Client, fwd *logForwarder) {
|
||||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
samples := probes.Thermals()
|
||||
if len(samples) == 0 {
|
||||
continue
|
||||
}
|
||||
out := make([]SensorSample, 0, len(samples))
|
||||
for _, s := range samples {
|
||||
out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit})
|
||||
}
|
||||
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
if err := c.Sensor(sendCtx, out); err != nil {
|
||||
fwd.warn("thermal sidecar: " + err.Error())
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func heartbeatLoop(ctx context.Context, c *Client, fwd *logForwarder, out chan<- HeartbeatResponse) {
|
||||
t := time.NewTicker(10 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
hbCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
resp, err := c.Heartbeat(hbCtx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
fwd.warn("heartbeat error: " + err.Error())
|
||||
continue
|
||||
}
|
||||
if resp.Cmd == "abort" {
|
||||
fwd.warn("orchestrator said abort; stopping loop")
|
||||
return
|
||||
}
|
||||
if resp.Cmd == "shutdown" {
|
||||
fwd.info("orchestrator said shutdown; powering off host")
|
||||
// Best effort: systemd then sysvinit fallback. Either way,
|
||||
// return so the agent process stops issuing heartbeats.
|
||||
if err := exec.Command("systemctl", "poweroff").Run(); err != nil {
|
||||
fwd.warn("systemctl poweroff failed: " + err.Error())
|
||||
_ = exec.Command("shutdown", "-h", "now").Run()
|
||||
}
|
||||
return
|
||||
}
|
||||
if resp.Cmd == "retry_stage" {
|
||||
select {
|
||||
case out <- *resp:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func callWithBackoff(ctx context.Context, label string, f func(context.Context) error) error {
|
||||
backoff := 2 * time.Second
|
||||
for attempt := 1; ; attempt++ {
|
||||
callCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
err := f(callCtx)
|
||||
cancel()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if attempt > 20 {
|
||||
return err
|
||||
}
|
||||
log.Printf("agent: %s attempt %d failed: %v (retry in %s)", label, attempt, err, backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
if backoff < 30*time.Second {
|
||||
backoff *= 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func localIP() string {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
for _, a := range addrs {
|
||||
ipnet, ok := a.(*net.IPNet)
|
||||
if !ok || ipnet.IP.IsLoopback() {
|
||||
continue
|
||||
}
|
||||
v4 := ipnet.IP.To4()
|
||||
if v4 != nil {
|
||||
return v4.String()
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ----- log forwarder -----------------------------------------------------
|
||||
|
||||
type logForwarder struct {
|
||||
c *Client
|
||||
mu sync.Mutex
|
||||
buf []LogLine
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newLogForwarder(parent context.Context, c *Client) *logForwarder {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
f := &logForwarder{c: c, cancel: cancel}
|
||||
f.wg.Add(1)
|
||||
go f.loop(ctx)
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *logForwarder) loop(ctx context.Context) {
|
||||
defer f.wg.Done()
|
||||
t := time.NewTicker(2 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
f.flush()
|
||||
return
|
||||
case <-t.C:
|
||||
f.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *logForwarder) push(level, text string) {
|
||||
stamp := time.Now().UTC().Format(time.RFC3339Nano)
|
||||
log.Printf("[%s] %s", level, text)
|
||||
f.mu.Lock()
|
||||
f.buf = append(f.buf, LogLine{TS: stamp, Level: level, Text: text})
|
||||
f.mu.Unlock()
|
||||
}
|
||||
|
||||
func (f *logForwarder) info(s string) { f.push("info", s) }
|
||||
func (f *logForwarder) warn(s string) { f.push("warn", s) }
|
||||
func (f *logForwarder) error(s string) { f.push("error", s) }
|
||||
|
||||
func (f *logForwarder) flush() {
|
||||
f.mu.Lock()
|
||||
if len(f.buf) == 0 {
|
||||
f.mu.Unlock()
|
||||
return
|
||||
}
|
||||
lines := f.buf
|
||||
f.buf = nil
|
||||
f.mu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := f.c.Log(ctx, lines); err != nil {
|
||||
log.Printf("log forward failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *logForwarder) close() {
|
||||
f.cancel()
|
||||
f.wg.Wait()
|
||||
}
|
||||
Reference in New Issue
Block a user