Files
Vetting/internal/orchestrator/dispatcher.go
T
josh 17ec55cb85
CI / Lint + build + test (push) Successful in 1m34s
Release / detect (push) Successful in 4s
Release / build-live-image (push) Has been skipped
Release / bundle (push) Successful in 1m5s
chore: cleanup sprint — dead CSS, dedup helpers, handler refactor
Remove ~126 lines of orphaned CSS from tile slim-down and old detail
layout. Consolidate 4 duplicate duration formatters into shared
elapsed()/fmtElapsed() helpers. Break 160-line Result handler into
focused sub-functions. Implement real Hub.Shutdown() (was a no-op).
Standardize agent error responses to JSON. Replace panic() in router
init with error return. Extract magic numbers as named constants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-21 20:39:38 -04:00

251 lines
7.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package orchestrator
import (
"context"
"fmt"
"log"
"sync"
"time"
"vetting/internal/logs"
"vetting/internal/model"
"vetting/internal/store"
)
// HostHeartbeatStaleAfter is how long we tolerate a host's last_seen_at
// being in the past before treating the host as offline. Set to 2× the
// default reporter heartbeat interval (30s) so a single dropped heartbeat
// doesn't block dispatch. Used by the StartRun preflight and the
// dispatcher itself — both must agree or the operator's click-time
// validation wouldn't match the dispatch-time check.
const (
HostHeartbeatStaleAfter = 60 * time.Second
dispatchTickInterval = 2 * time.Second
)
// Dispatcher picks Queued runs off the DB and drives them to
// WaitingReboot — the happy path is heartbeat-first: we transition and
// rely on the host-mode reporter's next heartbeat to fetch the
// reboot_for_vetting command. WoL is not fired in the default flow
// because every supported host already runs the reporter in-OS.
//
// Pre-stage log lines (picked, heartbeating, agent-claimed) are
// written into the per-run log via Logs so the detail page's log pane
// can show what's happening before the agent is alive.
//
// For Phase 2 the dispatcher's job ends at WaitingReboot; further
// transitions are driven by iPXE and agent callbacks. Phase 4+ will
// return here and shepherd each run through stage execution.
type Dispatcher struct {
Max int
Runs *store.Runs
Hosts *store.Hosts
Runner *Runner
Logs *logs.Hub
active chan struct{}
stop chan struct{}
// heartbeat tracks the last time we emitted a "still waiting"
// line for a given run, so the ticker doesn't spam the log.
hbMu sync.Mutex
lastBeat map[int64]time.Time
beatEvery time.Duration
}
func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher {
if max < 1 {
max = 1
}
return &Dispatcher{
Max: max,
Runs: runs,
Hosts: hosts,
Runner: runner,
Logs: logHub,
active: make(chan struct{}, max),
stop: make(chan struct{}),
lastBeat: map[int64]time.Time{},
beatEvery: 30 * time.Second,
}
}
func (d *Dispatcher) Start(ctx context.Context) {
go d.loop(ctx)
}
func (d *Dispatcher) Stop() {
close(d.stop)
}
func (d *Dispatcher) loop(ctx context.Context) {
t := time.NewTicker(dispatchTickInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-d.stop:
return
case <-t.C:
d.pickNext(ctx)
d.heartbeatWaiting(ctx)
}
}
}
func (d *Dispatcher) pickNext(ctx context.Context) {
select {
case d.active <- struct{}{}:
default:
return // at capacity
}
released := false
defer func() {
if !released {
<-d.active
}
}()
runs, err := d.Runs.Active(ctx)
if err != nil {
log.Printf("dispatcher: list active: %v", err)
return
}
var queued *model.Run
inFlight := 0
for i := range runs {
switch runs[i].State {
case model.StateQueued:
if queued == nil {
queued = &runs[i]
}
case model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting,
model.StateInventoryCheck, model.StateFirmware, model.StateSpecValidate, model.StateSMART,
model.StateCPUStress, model.StateStorage, model.StateNetwork,
model.StateBurn, model.StateGPU, model.StatePSU, model.StateReporting:
inFlight++
}
}
if inFlight >= d.Max || queued == nil {
return
}
host, err := d.Hosts.Get(ctx, queued.HostID)
if err != nil {
log.Printf("dispatcher: get host %d: %v", queued.HostID, err)
return
}
// Heartbeat gate: the StartRun preflight catches this at click time,
// but a run can sit in Queued long enough for the host to go offline
// between click and dispatch. Re-check here so we never fire a
// reboot command at a host that can't receive it.
if host.LastSeenAt == nil || time.Since(*host.LastSeenAt) > HostHeartbeatStaleAfter {
var ageMsg string
if host.LastSeenAt == nil {
ageMsg = "never heartbeated"
} else {
ageMsg = fmt.Sprintf("last heartbeat %s ago", time.Since(*host.LastSeenAt).Truncate(time.Second))
}
d.runLog(queued.ID, "error", fmt.Sprintf(
"dispatcher: host %s is offline (%s) — refusing to dispatch; install the reporter via /register/quick.sh on the target and retry",
host.Name, ageMsg))
if err := d.Runs.MarkDispatchFailed(ctx, queued.ID, "dispatch", "host stopped heartbeating before dispatch"); err != nil {
log.Printf("dispatcher: mark run %d dispatch-failed: %v", queued.ID, err)
}
if d.Runner != nil {
d.Runner.PublishTileUpdate(ctx, host.ID)
}
return
}
age := time.Since(*host.LastSeenAt).Truncate(time.Second)
d.runLog(queued.ID, "info", fmt.Sprintf(
"dispatcher: picked run for host %s (mac=%s, heartbeating, last seen %s ago)",
host.Name, host.MAC, age))
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerRebootCommanded); err != nil {
log.Printf("dispatcher: transition run %d: %v", queued.ID, err)
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingReboot failed: %v", err))
return
}
log.Printf("dispatcher: run %d host %s → WaitingReboot (heartbeat-driven)", queued.ID, host.Name)
d.runLog(queued.ID, "info", fmt.Sprintf(
"dispatcher: host %s heartbeating — waiting for next reporter heartbeat to deliver reboot_for_vetting",
host.Name))
// Prime the heartbeat so the first "still waiting" fires 30s after
// dispatch, not immediately.
d.hbMu.Lock()
d.lastBeat[queued.ID] = time.Now()
d.hbMu.Unlock()
// Slot stays reserved until the run leaves active (Phase 4+).
// Phase 2 lets the loop observe inFlight via DB state.
released = true
<-d.active
}
// heartbeatWaiting emits a "still waiting" log line every beatEvery for
// each run still sitting in WaitingReboot (or legacy WaitingWoL). Helps
// the operator spot hangs without having to tail journalctl on the LXC.
func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
if d.Logs == nil {
return
}
runs, err := d.Runs.Active(ctx)
if err != nil {
return
}
now := time.Now()
d.hbMu.Lock()
defer d.hbMu.Unlock()
seen := map[int64]bool{}
for i := range runs {
r := &runs[i]
seen[r.ID] = true
if r.State != model.StateWaitingReboot && r.State != model.StateWaitingWoL {
continue
}
last, ok := d.lastBeat[r.ID]
if !ok {
// Run already waiting from a previous process lifetime — prime
// so we don't spam immediately.
d.lastBeat[r.ID] = now
continue
}
if now.Sub(last) < d.beatEvery {
continue
}
elapsed := now.Sub(r.StartedAt).Truncate(time.Second)
d.runLog(r.ID, "info", fmt.Sprintf(
"waiting for reporter to reboot + PXE-boot into live image (%s) — if this exceeds 2m, verify pxe.enabled in vetting.yaml and that the reporter actually invoked systemctl reboot",
elapsed))
d.lastBeat[r.ID] = now
}
// Garbage-collect entries for runs that have left the waiting states.
for id := range d.lastBeat {
if !seen[id] {
delete(d.lastBeat, id)
}
}
}
// runLog writes a single line into the per-run log. Safe to call with a
// nil hub (tests construct Dispatcher directly) — it degrades to a
// stderr log line so nothing silently disappears.
func (d *Dispatcher) runLog(runID int64, level, text string) {
if d.Logs == nil {
log.Printf("run-%d %s: %s", runID, level, text)
return
}
w, err := d.Logs.WriterFor(runID)
if err != nil {
log.Printf("dispatcher: open log for run %d: %v", runID, err)
return
}
w.Append(logs.Line{Level: level, Text: text})
}