Files
Vetting/internal/orchestrator/dispatcher.go
T
josh d0bfae14c8
CI / Lint + build + test (push) Has been cancelled
Heartbeat-first dispatch: retire WoL-as-default, add WaitingReboot
Every supported host runs vetting-reporter in-OS and heartbeats every
30s. WoL was never the thing that started vetting — the heartbeat
response's reboot_for_vetting command was. Firing WoL first only
crowded the run log with misleading diagnostics when the real failure
mode is "reporter isn't installed."

- StartRun 409s if the host hasn't heartbeated within 60s, pointing
  the operator at /register/quick.sh.
- Dispatcher re-checks LastSeenAt at dispatch time (run may sit in
  Queued long enough for the host to go offline); stale hosts mark
  the run Failed with failed_stage=dispatch instead of looping.
- New StateWaitingReboot + TriggerRebootCommanded capture the actual
  semantics. StateWaitingWoL kept as the hook point for a future
  manual-override button.
- Tile disables the Start button with a quick.sh tooltip when the
  host is offline, matching the server-side 409.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 01:10:34 -04:00

248 lines
7.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package orchestrator
import (
"context"
"fmt"
"log"
"sync"
"time"
"vetting/internal/logs"
"vetting/internal/model"
"vetting/internal/store"
)
// HostHeartbeatStaleAfter is how long we tolerate a host's last_seen_at
// being in the past before treating the host as offline. Set to 2× the
// default reporter heartbeat interval (30s) so a single dropped heartbeat
// doesn't block dispatch. Used by the StartRun preflight and the
// dispatcher itself — both must agree or the operator's click-time
// validation wouldn't match the dispatch-time check.
const HostHeartbeatStaleAfter = 60 * time.Second
// Dispatcher picks Queued runs off the DB and drives them to
// WaitingReboot — the happy path is heartbeat-first: we transition and
// rely on the host-mode reporter's next heartbeat to fetch the
// reboot_for_vetting command. WoL is not fired in the default flow
// because every supported host already runs the reporter in-OS.
//
// Pre-stage log lines (picked, heartbeating, agent-claimed) are
// written into the per-run log via Logs so the detail page's log pane
// can show what's happening before the agent is alive.
//
// For Phase 2 the dispatcher's job ends at WaitingReboot; further
// transitions are driven by iPXE and agent callbacks. Phase 4+ will
// return here and shepherd each run through stage execution.
type Dispatcher struct {
Max int
Runs *store.Runs
Hosts *store.Hosts
Runner *Runner
Logs *logs.Hub
active chan struct{}
stop chan struct{}
// heartbeat tracks the last time we emitted a "still waiting"
// line for a given run, so the ticker doesn't spam the log.
hbMu sync.Mutex
lastBeat map[int64]time.Time
beatEvery time.Duration
}
func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher {
if max < 1 {
max = 1
}
return &Dispatcher{
Max: max,
Runs: runs,
Hosts: hosts,
Runner: runner,
Logs: logHub,
active: make(chan struct{}, max),
stop: make(chan struct{}),
lastBeat: map[int64]time.Time{},
beatEvery: 30 * time.Second,
}
}
func (d *Dispatcher) Start(ctx context.Context) {
go d.loop(ctx)
}
func (d *Dispatcher) Stop() {
close(d.stop)
}
func (d *Dispatcher) loop(ctx context.Context) {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-d.stop:
return
case <-t.C:
d.pickNext(ctx)
d.heartbeatWaiting(ctx)
}
}
}
func (d *Dispatcher) pickNext(ctx context.Context) {
select {
case d.active <- struct{}{}:
default:
return // at capacity
}
released := false
defer func() {
if !released {
<-d.active
}
}()
runs, err := d.Runs.Active(ctx)
if err != nil {
log.Printf("dispatcher: list active: %v", err)
return
}
var queued *model.Run
inFlight := 0
for i := range runs {
switch runs[i].State {
case model.StateQueued:
if queued == nil {
queued = &runs[i]
}
case model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting,
model.StateInventoryCheck, model.StateSpecValidate, model.StateSMART,
model.StateCPUStress, model.StateStorage, model.StateNetwork,
model.StateGPU, model.StatePSU, model.StateReporting:
inFlight++
}
}
if inFlight >= d.Max || queued == nil {
return
}
host, err := d.Hosts.Get(ctx, queued.HostID)
if err != nil {
log.Printf("dispatcher: get host %d: %v", queued.HostID, err)
return
}
// Heartbeat gate: the StartRun preflight catches this at click time,
// but a run can sit in Queued long enough for the host to go offline
// between click and dispatch. Re-check here so we never fire a
// reboot command at a host that can't receive it.
if host.LastSeenAt == nil || time.Since(*host.LastSeenAt) > HostHeartbeatStaleAfter {
var ageMsg string
if host.LastSeenAt == nil {
ageMsg = "never heartbeated"
} else {
ageMsg = fmt.Sprintf("last heartbeat %s ago", time.Since(*host.LastSeenAt).Truncate(time.Second))
}
d.runLog(queued.ID, "error", fmt.Sprintf(
"dispatcher: host %s is offline (%s) — refusing to dispatch; install the reporter via /register/quick.sh on the target and retry",
host.Name, ageMsg))
if err := d.Runs.MarkDispatchFailed(ctx, queued.ID, "dispatch", "host stopped heartbeating before dispatch"); err != nil {
log.Printf("dispatcher: mark run %d dispatch-failed: %v", queued.ID, err)
}
if d.Runner != nil {
d.Runner.PublishTileUpdate(ctx, host.ID)
}
return
}
age := time.Since(*host.LastSeenAt).Truncate(time.Second)
d.runLog(queued.ID, "info", fmt.Sprintf(
"dispatcher: picked run for host %s (mac=%s, heartbeating, last seen %s ago)",
host.Name, host.MAC, age))
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerRebootCommanded); err != nil {
log.Printf("dispatcher: transition run %d: %v", queued.ID, err)
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingReboot failed: %v", err))
return
}
log.Printf("dispatcher: run %d host %s → WaitingReboot (heartbeat-driven)", queued.ID, host.Name)
d.runLog(queued.ID, "info", fmt.Sprintf(
"dispatcher: host %s heartbeating — waiting for next reporter heartbeat to deliver reboot_for_vetting",
host.Name))
// Prime the heartbeat so the first "still waiting" fires 30s after
// dispatch, not immediately.
d.hbMu.Lock()
d.lastBeat[queued.ID] = time.Now()
d.hbMu.Unlock()
// Slot stays reserved until the run leaves active (Phase 4+).
// Phase 2 lets the loop observe inFlight via DB state.
released = true
<-d.active
}
// heartbeatWaiting emits a "still waiting" log line every beatEvery for
// each run still sitting in WaitingReboot (or legacy WaitingWoL). Helps
// the operator spot hangs without having to tail journalctl on the LXC.
func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
if d.Logs == nil {
return
}
runs, err := d.Runs.Active(ctx)
if err != nil {
return
}
now := time.Now()
d.hbMu.Lock()
defer d.hbMu.Unlock()
seen := map[int64]bool{}
for i := range runs {
r := &runs[i]
seen[r.ID] = true
if r.State != model.StateWaitingReboot && r.State != model.StateWaitingWoL {
continue
}
last, ok := d.lastBeat[r.ID]
if !ok {
// Run already waiting from a previous process lifetime — prime
// so we don't spam immediately.
d.lastBeat[r.ID] = now
continue
}
if now.Sub(last) < d.beatEvery {
continue
}
elapsed := now.Sub(r.StartedAt).Truncate(time.Second)
d.runLog(r.ID, "info", fmt.Sprintf(
"waiting for reporter to reboot + PXE-boot into live image (%s) — if this exceeds 2m, verify pxe.enabled in vetting.yaml and that the reporter actually invoked systemctl reboot",
elapsed))
d.lastBeat[r.ID] = now
}
// Garbage-collect entries for runs that have left the waiting states.
for id := range d.lastBeat {
if !seen[id] {
delete(d.lastBeat, id)
}
}
}
// runLog writes a single line into the per-run log. Safe to call with a
// nil hub (tests construct Dispatcher directly) — it degrades to a
// stderr log line so nothing silently disappears.
func (d *Dispatcher) runLog(runID int64, level, text string) {
if d.Logs == nil {
log.Printf("run-%d %s: %s", runID, level, text)
return
}
w, err := d.Logs.WriterFor(runID)
if err != nil {
log.Printf("dispatcher: open log for run %d: %v", runID, err)
return
}
w.Append(logs.Line{Level: level, Text: text})
}