23c689aa5b
Ships all five phases of the deep-profile overhaul together. Runs now carry a profile (quick/deep/soak); every profile walks the same 11-stage order — Inventory → Firmware → SpecValidate → SMART → CPUStress → Storage → Network → Burn → GPU → PSU → Reporting — with only per-stage durations and concurrency scaled. Phase 1: profiles.ProfileRegistry loaded from vetting.yaml; runs.profile column + CreateWithProfile; threshold table + evaluator seeded per-run from the shared vetting.thresholds block; breach flips result at /sensor + /result. Phase 2: upgraded CPUStress (stress-ng --cpu-method=all --verify + EDAC/MCE poll), Storage (fio --verify=md5 + SMART start/end delta), Network (sustained iperf + /proc/net/dev deltas) with per-profile knobs from Deps. Phase 3: Burn super-stage with goroutine fan-out for CPU + memory + fio + iperf, PSU rails sampled across the Burn window, SensorMux (2 s flush, 500-sample cap) to absorb backpressure. Phase 4: Firmware stage + firmware_snapshots table; probes dmidecode (BIOS), ipmitool (BMC), ethtool -i (NIC), nvme (sysfs + id-ctrl), lspci (HBA), /proc/cpuinfo (microcode). spec.DiffFirmware folds into SpecValidate with pin-by-identifier and fan-out-across-component matching; mismatches park the run in FailedHolding. Phase 5: profile radio on the host start form, profile chip on the run header, Firmware section in the HTML report, coverage artifact uploaded from CI, agent/tests/fakes/ scaffold with Deps.LookPath seam + stress_ng and dmidecode example fakes. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
248 lines
7.3 KiB
Go
248 lines
7.3 KiB
Go
package orchestrator
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"sync"
|
||
"time"
|
||
|
||
"vetting/internal/logs"
|
||
"vetting/internal/model"
|
||
"vetting/internal/store"
|
||
)
|
||
|
||
// HostHeartbeatStaleAfter is how long we tolerate a host's last_seen_at
|
||
// being in the past before treating the host as offline. Set to 2× the
|
||
// default reporter heartbeat interval (30s) so a single dropped heartbeat
|
||
// doesn't block dispatch. Used by the StartRun preflight and the
|
||
// dispatcher itself — both must agree or the operator's click-time
|
||
// validation wouldn't match the dispatch-time check.
|
||
const HostHeartbeatStaleAfter = 60 * time.Second
|
||
|
||
// Dispatcher picks Queued runs off the DB and drives them to
|
||
// WaitingReboot — the happy path is heartbeat-first: we transition and
|
||
// rely on the host-mode reporter's next heartbeat to fetch the
|
||
// reboot_for_vetting command. WoL is not fired in the default flow
|
||
// because every supported host already runs the reporter in-OS.
|
||
//
|
||
// Pre-stage log lines (picked, heartbeating, agent-claimed) are
|
||
// written into the per-run log via Logs so the detail page's log pane
|
||
// can show what's happening before the agent is alive.
|
||
//
|
||
// For Phase 2 the dispatcher's job ends at WaitingReboot; further
|
||
// transitions are driven by iPXE and agent callbacks. Phase 4+ will
|
||
// return here and shepherd each run through stage execution.
|
||
type Dispatcher struct {
|
||
Max int
|
||
Runs *store.Runs
|
||
Hosts *store.Hosts
|
||
Runner *Runner
|
||
Logs *logs.Hub
|
||
|
||
active chan struct{}
|
||
stop chan struct{}
|
||
|
||
// heartbeat tracks the last time we emitted a "still waiting"
|
||
// line for a given run, so the ticker doesn't spam the log.
|
||
hbMu sync.Mutex
|
||
lastBeat map[int64]time.Time
|
||
beatEvery time.Duration
|
||
}
|
||
|
||
func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher {
|
||
if max < 1 {
|
||
max = 1
|
||
}
|
||
return &Dispatcher{
|
||
Max: max,
|
||
Runs: runs,
|
||
Hosts: hosts,
|
||
Runner: runner,
|
||
Logs: logHub,
|
||
active: make(chan struct{}, max),
|
||
stop: make(chan struct{}),
|
||
lastBeat: map[int64]time.Time{},
|
||
beatEvery: 30 * time.Second,
|
||
}
|
||
}
|
||
|
||
func (d *Dispatcher) Start(ctx context.Context) {
|
||
go d.loop(ctx)
|
||
}
|
||
|
||
func (d *Dispatcher) Stop() {
|
||
close(d.stop)
|
||
}
|
||
|
||
func (d *Dispatcher) loop(ctx context.Context) {
|
||
t := time.NewTicker(2 * time.Second)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-d.stop:
|
||
return
|
||
case <-t.C:
|
||
d.pickNext(ctx)
|
||
d.heartbeatWaiting(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (d *Dispatcher) pickNext(ctx context.Context) {
|
||
select {
|
||
case d.active <- struct{}{}:
|
||
default:
|
||
return // at capacity
|
||
}
|
||
released := false
|
||
defer func() {
|
||
if !released {
|
||
<-d.active
|
||
}
|
||
}()
|
||
|
||
runs, err := d.Runs.Active(ctx)
|
||
if err != nil {
|
||
log.Printf("dispatcher: list active: %v", err)
|
||
return
|
||
}
|
||
|
||
var queued *model.Run
|
||
inFlight := 0
|
||
for i := range runs {
|
||
switch runs[i].State {
|
||
case model.StateQueued:
|
||
if queued == nil {
|
||
queued = &runs[i]
|
||
}
|
||
case model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting,
|
||
model.StateInventoryCheck, model.StateFirmware, model.StateSpecValidate, model.StateSMART,
|
||
model.StateCPUStress, model.StateStorage, model.StateNetwork,
|
||
model.StateBurn, model.StateGPU, model.StatePSU, model.StateReporting:
|
||
inFlight++
|
||
}
|
||
}
|
||
|
||
if inFlight >= d.Max || queued == nil {
|
||
return
|
||
}
|
||
|
||
host, err := d.Hosts.Get(ctx, queued.HostID)
|
||
if err != nil {
|
||
log.Printf("dispatcher: get host %d: %v", queued.HostID, err)
|
||
return
|
||
}
|
||
|
||
// Heartbeat gate: the StartRun preflight catches this at click time,
|
||
// but a run can sit in Queued long enough for the host to go offline
|
||
// between click and dispatch. Re-check here so we never fire a
|
||
// reboot command at a host that can't receive it.
|
||
if host.LastSeenAt == nil || time.Since(*host.LastSeenAt) > HostHeartbeatStaleAfter {
|
||
var ageMsg string
|
||
if host.LastSeenAt == nil {
|
||
ageMsg = "never heartbeated"
|
||
} else {
|
||
ageMsg = fmt.Sprintf("last heartbeat %s ago", time.Since(*host.LastSeenAt).Truncate(time.Second))
|
||
}
|
||
d.runLog(queued.ID, "error", fmt.Sprintf(
|
||
"dispatcher: host %s is offline (%s) — refusing to dispatch; install the reporter via /register/quick.sh on the target and retry",
|
||
host.Name, ageMsg))
|
||
if err := d.Runs.MarkDispatchFailed(ctx, queued.ID, "dispatch", "host stopped heartbeating before dispatch"); err != nil {
|
||
log.Printf("dispatcher: mark run %d dispatch-failed: %v", queued.ID, err)
|
||
}
|
||
if d.Runner != nil {
|
||
d.Runner.PublishTileUpdate(ctx, host.ID)
|
||
}
|
||
return
|
||
}
|
||
|
||
age := time.Since(*host.LastSeenAt).Truncate(time.Second)
|
||
d.runLog(queued.ID, "info", fmt.Sprintf(
|
||
"dispatcher: picked run for host %s (mac=%s, heartbeating, last seen %s ago)",
|
||
host.Name, host.MAC, age))
|
||
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerRebootCommanded); err != nil {
|
||
log.Printf("dispatcher: transition run %d: %v", queued.ID, err)
|
||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingReboot failed: %v", err))
|
||
return
|
||
}
|
||
log.Printf("dispatcher: run %d host %s → WaitingReboot (heartbeat-driven)", queued.ID, host.Name)
|
||
d.runLog(queued.ID, "info", fmt.Sprintf(
|
||
"dispatcher: host %s heartbeating — waiting for next reporter heartbeat to deliver reboot_for_vetting",
|
||
host.Name))
|
||
|
||
// Prime the heartbeat so the first "still waiting" fires 30s after
|
||
// dispatch, not immediately.
|
||
d.hbMu.Lock()
|
||
d.lastBeat[queued.ID] = time.Now()
|
||
d.hbMu.Unlock()
|
||
|
||
// Slot stays reserved until the run leaves active (Phase 4+).
|
||
// Phase 2 lets the loop observe inFlight via DB state.
|
||
released = true
|
||
<-d.active
|
||
}
|
||
|
||
// heartbeatWaiting emits a "still waiting" log line every beatEvery for
|
||
// each run still sitting in WaitingReboot (or legacy WaitingWoL). Helps
|
||
// the operator spot hangs without having to tail journalctl on the LXC.
|
||
func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
|
||
if d.Logs == nil {
|
||
return
|
||
}
|
||
runs, err := d.Runs.Active(ctx)
|
||
if err != nil {
|
||
return
|
||
}
|
||
now := time.Now()
|
||
d.hbMu.Lock()
|
||
defer d.hbMu.Unlock()
|
||
seen := map[int64]bool{}
|
||
for i := range runs {
|
||
r := &runs[i]
|
||
seen[r.ID] = true
|
||
if r.State != model.StateWaitingReboot && r.State != model.StateWaitingWoL {
|
||
continue
|
||
}
|
||
last, ok := d.lastBeat[r.ID]
|
||
if !ok {
|
||
// Run already waiting from a previous process lifetime — prime
|
||
// so we don't spam immediately.
|
||
d.lastBeat[r.ID] = now
|
||
continue
|
||
}
|
||
if now.Sub(last) < d.beatEvery {
|
||
continue
|
||
}
|
||
elapsed := now.Sub(r.StartedAt).Truncate(time.Second)
|
||
d.runLog(r.ID, "info", fmt.Sprintf(
|
||
"waiting for reporter to reboot + PXE-boot into live image (%s) — if this exceeds 2m, verify pxe.enabled in vetting.yaml and that the reporter actually invoked systemctl reboot",
|
||
elapsed))
|
||
d.lastBeat[r.ID] = now
|
||
}
|
||
// Garbage-collect entries for runs that have left the waiting states.
|
||
for id := range d.lastBeat {
|
||
if !seen[id] {
|
||
delete(d.lastBeat, id)
|
||
}
|
||
}
|
||
}
|
||
|
||
// runLog writes a single line into the per-run log. Safe to call with a
|
||
// nil hub (tests construct Dispatcher directly) — it degrades to a
|
||
// stderr log line so nothing silently disappears.
|
||
func (d *Dispatcher) runLog(runID int64, level, text string) {
|
||
if d.Logs == nil {
|
||
log.Printf("run-%d %s: %s", runID, level, text)
|
||
return
|
||
}
|
||
w, err := d.Logs.WriterFor(runID)
|
||
if err != nil {
|
||
log.Printf("dispatcher: open log for run %d: %v", runID, err)
|
||
return
|
||
}
|
||
w.Append(logs.Line{Level: level, Text: text})
|
||
}
|