Heartbeat-first dispatch: retire WoL-as-default, add WaitingReboot
CI / Lint + build + test (push) Has been cancelled
CI / Lint + build + test (push) Has been cancelled
Every supported host runs vetting-reporter in-OS and heartbeats every 30s. WoL was never the thing that started vetting — the heartbeat response's reboot_for_vetting command was. Firing WoL first only crowded the run log with misleading diagnostics when the real failure mode is "reporter isn't installed." - StartRun 409s if the host hasn't heartbeated within 60s, pointing the operator at /register/quick.sh. - Dispatcher re-checks LastSeenAt at dispatch time (run may sit in Queued long enough for the host to go offline); stale hosts mark the run Failed with failed_stage=dispatch instead of looping. - New StateWaitingReboot + TriggerRebootCommanded capture the actual semantics. StateWaitingWoL kept as the hook point for a future manual-override button. - Tile disables the Start button with a quick.sh tooltip when the host is offline, matching the server-side 409. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -12,14 +12,25 @@ import (
|
||||
"vetting/internal/store"
|
||||
)
|
||||
|
||||
// Dispatcher picks Queued runs off the DB and drives them through
|
||||
// WaitingWoL (sending a WoL packet). Concurrency is capped at Max.
|
||||
// HostHeartbeatStaleAfter is how long we tolerate a host's last_seen_at
|
||||
// being in the past before treating the host as offline. Set to 2× the
|
||||
// default reporter heartbeat interval (30s) so a single dropped heartbeat
|
||||
// doesn't block dispatch. Used by the StartRun preflight and the
|
||||
// dispatcher itself — both must agree or the operator's click-time
|
||||
// validation wouldn't match the dispatch-time check.
|
||||
const HostHeartbeatStaleAfter = 60 * time.Second
|
||||
|
||||
// Dispatcher picks Queued runs off the DB and drives them to
|
||||
// WaitingReboot — the happy path is heartbeat-first: we transition and
|
||||
// rely on the host-mode reporter's next heartbeat to fetch the
|
||||
// reboot_for_vetting command. WoL is not fired in the default flow
|
||||
// because every supported host already runs the reporter in-OS.
|
||||
//
|
||||
// Pre-stage log lines (picked, WoL-sent, heartbeat, agent-claimed)
|
||||
// are written into the per-run log via Logs so the detail page's
|
||||
// log pane can show what's happening before the agent is alive.
|
||||
// Pre-stage log lines (picked, heartbeating, agent-claimed) are
|
||||
// written into the per-run log via Logs so the detail page's log pane
|
||||
// can show what's happening before the agent is alive.
|
||||
//
|
||||
// For Phase 2 the dispatcher's job ends at WaitingWoL; further
|
||||
// For Phase 2 the dispatcher's job ends at WaitingReboot; further
|
||||
// transitions are driven by iPXE and agent callbacks. Phase 4+ will
|
||||
// return here and shepherd each run through stage execution.
|
||||
type Dispatcher struct {
|
||||
@@ -107,10 +118,10 @@ func (d *Dispatcher) pickNext(ctx context.Context) {
|
||||
if queued == nil {
|
||||
queued = &runs[i]
|
||||
}
|
||||
case model.StateWaitingWoL, model.StateBooting, model.StateInventoryCheck,
|
||||
model.StateSpecValidate, model.StateSMART, model.StateCPUStress,
|
||||
model.StateStorage, model.StateNetwork, model.StateGPU,
|
||||
model.StatePSU, model.StateReporting:
|
||||
case model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting,
|
||||
model.StateInventoryCheck, model.StateSpecValidate, model.StateSMART,
|
||||
model.StateCPUStress, model.StateStorage, model.StateNetwork,
|
||||
model.StateGPU, model.StatePSU, model.StateReporting:
|
||||
inFlight++
|
||||
}
|
||||
}
|
||||
@@ -124,23 +135,43 @@ func (d *Dispatcher) pickNext(ctx context.Context) {
|
||||
log.Printf("dispatcher: get host %d: %v", queued.HostID, err)
|
||||
return
|
||||
}
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: picked run for host %s (mac=%s wol=%s:%d)",
|
||||
host.Name, host.MAC, host.WoLBroadcastIP, host.WoLPort))
|
||||
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerDispatched); err != nil {
|
||||
|
||||
// Heartbeat gate: the StartRun preflight catches this at click time,
|
||||
// but a run can sit in Queued long enough for the host to go offline
|
||||
// between click and dispatch. Re-check here so we never fire a
|
||||
// reboot command at a host that can't receive it.
|
||||
if host.LastSeenAt == nil || time.Since(*host.LastSeenAt) > HostHeartbeatStaleAfter {
|
||||
var ageMsg string
|
||||
if host.LastSeenAt == nil {
|
||||
ageMsg = "never heartbeated"
|
||||
} else {
|
||||
ageMsg = fmt.Sprintf("last heartbeat %s ago", time.Since(*host.LastSeenAt).Truncate(time.Second))
|
||||
}
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf(
|
||||
"dispatcher: host %s is offline (%s) — refusing to dispatch; install the reporter via /register/quick.sh on the target and retry",
|
||||
host.Name, ageMsg))
|
||||
if err := d.Runs.MarkDispatchFailed(ctx, queued.ID, "dispatch", "host stopped heartbeating before dispatch"); err != nil {
|
||||
log.Printf("dispatcher: mark run %d dispatch-failed: %v", queued.ID, err)
|
||||
}
|
||||
if d.Runner != nil {
|
||||
d.Runner.PublishTileUpdate(ctx, host.ID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
age := time.Since(*host.LastSeenAt).Truncate(time.Second)
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf(
|
||||
"dispatcher: picked run for host %s (mac=%s, heartbeating, last seen %s ago)",
|
||||
host.Name, host.MAC, age))
|
||||
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerRebootCommanded); err != nil {
|
||||
log.Printf("dispatcher: transition run %d: %v", queued.ID, err)
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingWoL failed: %v", err))
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingReboot failed: %v", err))
|
||||
return
|
||||
}
|
||||
if err := SendWoL(host.MAC, host.WoLBroadcastIP, host.WoLPort); err != nil {
|
||||
log.Printf("dispatcher: WoL run %d host %s: %v", queued.ID, host.Name, err)
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: WoL send failed: %v — check broadcast %s:%d is reachable",
|
||||
err, host.WoLBroadcastIP, host.WoLPort))
|
||||
// Stay in WaitingWoL; operator can retry or investigate.
|
||||
return
|
||||
}
|
||||
log.Printf("dispatcher: WoL sent for run %d (host=%s mac=%s)", queued.ID, host.Name, host.MAC)
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: sent WoL packet to %s via %s:%d — waiting for agent claim",
|
||||
host.MAC, host.WoLBroadcastIP, host.WoLPort))
|
||||
log.Printf("dispatcher: run %d host %s → WaitingReboot (heartbeat-driven)", queued.ID, host.Name)
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf(
|
||||
"dispatcher: host %s heartbeating — waiting for next reporter heartbeat to deliver reboot_for_vetting",
|
||||
host.Name))
|
||||
|
||||
// Prime the heartbeat so the first "still waiting" fires 30s after
|
||||
// dispatch, not immediately.
|
||||
@@ -155,8 +186,8 @@ func (d *Dispatcher) pickNext(ctx context.Context) {
|
||||
}
|
||||
|
||||
// heartbeatWaiting emits a "still waiting" log line every beatEvery for
|
||||
// each run still sitting in WaitingWoL. Helps the operator spot hangs
|
||||
// without having to tail journalctl on the LXC.
|
||||
// each run still sitting in WaitingReboot (or legacy WaitingWoL). Helps
|
||||
// the operator spot hangs without having to tail journalctl on the LXC.
|
||||
func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
|
||||
if d.Logs == nil {
|
||||
return
|
||||
@@ -172,13 +203,13 @@ func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
|
||||
for i := range runs {
|
||||
r := &runs[i]
|
||||
seen[r.ID] = true
|
||||
if r.State != model.StateWaitingWoL {
|
||||
if r.State != model.StateWaitingReboot && r.State != model.StateWaitingWoL {
|
||||
continue
|
||||
}
|
||||
last, ok := d.lastBeat[r.ID]
|
||||
if !ok {
|
||||
// Run already in WaitingWoL from a previous process lifetime
|
||||
// — prime so we don't spam immediately.
|
||||
// Run already waiting from a previous process lifetime — prime
|
||||
// so we don't spam immediately.
|
||||
d.lastBeat[r.ID] = now
|
||||
continue
|
||||
}
|
||||
@@ -187,11 +218,11 @@ func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
|
||||
}
|
||||
elapsed := now.Sub(r.StartedAt).Truncate(time.Second)
|
||||
d.runLog(r.ID, "info", fmt.Sprintf(
|
||||
"still waiting for agent claim (%s) — check BIOS WoL, pxe.enabled, and live-image presence",
|
||||
"waiting for reporter to reboot + PXE-boot into live image (%s) — if this exceeds 2m, verify pxe.enabled in vetting.yaml and that the reporter actually invoked systemctl reboot",
|
||||
elapsed))
|
||||
d.lastBeat[r.ID] = now
|
||||
}
|
||||
// Garbage-collect entries for runs that have left WaitingWoL.
|
||||
// Garbage-collect entries for runs that have left the waiting states.
|
||||
for id := range d.lastBeat {
|
||||
if !seen[id] {
|
||||
delete(d.lastBeat, id)
|
||||
|
||||
@@ -1,15 +1,64 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"vetting/internal/db"
|
||||
"vetting/internal/events"
|
||||
"vetting/internal/logs"
|
||||
"vetting/internal/model"
|
||||
"vetting/internal/store"
|
||||
)
|
||||
|
||||
// setupPickNext wires a real SQLite DB so pickNext can exercise the
|
||||
// full Hosts/Runs/Runner path. Returns the dispatcher + seeded host ID +
|
||||
// a cleanup. Host starts with a fresh heartbeat stamp so the default is
|
||||
// "dispatch would succeed"; callers stale it out as needed.
|
||||
func setupPickNext(t *testing.T) (*Dispatcher, *store.Hosts, *store.Runs, int64, func()) {
|
||||
t.Helper()
|
||||
conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open db: %v", err)
|
||||
}
|
||||
hosts := &store.Hosts{DB: conn}
|
||||
runs := &store.Runs{DB: conn}
|
||||
stages := &store.Stages{DB: conn}
|
||||
hub := events.NewHub()
|
||||
runner := &Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub}
|
||||
logDir := t.TempDir()
|
||||
lh, err := logs.NewHub(logDir, hub)
|
||||
if err != nil {
|
||||
t.Fatalf("NewHub: %v", err)
|
||||
}
|
||||
d := NewDispatcher(3, runs, hosts, runner, lh)
|
||||
|
||||
ctx := context.Background()
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "pn-host",
|
||||
MAC: "aa:bb:cc:dd:ee:50",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
// Default: heartbeating now.
|
||||
if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC()); err != nil {
|
||||
t.Fatalf("stamp: %v", err)
|
||||
}
|
||||
cleanup := func() {
|
||||
lh.Close()
|
||||
_ = conn.Close()
|
||||
}
|
||||
return d, hosts, runs, hostID, cleanup
|
||||
}
|
||||
|
||||
// TestDispatcher_RunLogWritesToHub verifies the plumbing between the
|
||||
// dispatcher and the per-run log hub: runLog must persist to the on-disk
|
||||
// file so the detail page's replay + SSE fan-out see the same
|
||||
@@ -45,3 +94,105 @@ func TestDispatcher_RunLogNilHubDoesNotPanic(t *testing.T) {
|
||||
d := &Dispatcher{}
|
||||
d.runLog(1, "info", "fallback path")
|
||||
}
|
||||
|
||||
// TestDispatcher_TransitionsToWaitingRebootNoWoL: happy path. Host is
|
||||
// heartbeating, run is Queued — one pickNext tick must transition to
|
||||
// WaitingReboot via the new RebootCommanded trigger and log that the
|
||||
// host is heartbeating. No "sent WoL packet" line allowed.
|
||||
func TestDispatcher_TransitionsToWaitingRebootNoWoL(t *testing.T) {
|
||||
d, _, runs, hostID, cleanup := setupPickNext(t)
|
||||
defer cleanup()
|
||||
ctx := context.Background()
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
|
||||
d.pickNext(ctx)
|
||||
|
||||
got, err := runs.Get(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if got.State != model.StateWaitingReboot {
|
||||
t.Fatalf("state = %s, want WaitingReboot", got.State)
|
||||
}
|
||||
body, err := os.ReadFile(filepath.Join(d.Logs.PathFor(runID))) //nolint:staticcheck
|
||||
if err != nil {
|
||||
t.Fatalf("read log: %v", err)
|
||||
}
|
||||
text := string(body)
|
||||
if strings.Contains(text, "sent WoL packet") {
|
||||
t.Fatalf("dispatcher should not fire WoL on heartbeating host: %s", text)
|
||||
}
|
||||
if !strings.Contains(text, "heartbeating") {
|
||||
t.Fatalf("missing heartbeating log line: %s", text)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDispatcher_FailsStaleHeartbeat: host hasn't heartbeat for >60s.
|
||||
// Dispatcher must refuse, mark the run Failed with failed_stage=dispatch,
|
||||
// and log at error level — not loop forever on an unreachable box.
|
||||
func TestDispatcher_FailsStaleHeartbeat(t *testing.T) {
|
||||
d, hosts, runs, hostID, cleanup := setupPickNext(t)
|
||||
defer cleanup()
|
||||
ctx := context.Background()
|
||||
// Stale: 5m ago is well past the 60s cutoff.
|
||||
if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC().Add(-5*time.Minute)); err != nil {
|
||||
t.Fatalf("stamp stale: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
|
||||
d.pickNext(ctx)
|
||||
|
||||
got, err := runs.Get(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if got.State != model.StateFailed {
|
||||
t.Fatalf("state = %s, want Failed", got.State)
|
||||
}
|
||||
if got.FailedStage != "dispatch" {
|
||||
t.Fatalf("failed_stage = %q, want dispatch", got.FailedStage)
|
||||
}
|
||||
body, _ := os.ReadFile(d.Logs.PathFor(runID))
|
||||
if !strings.Contains(string(body), "quick.sh") {
|
||||
t.Fatalf("expected quick.sh hint in run log: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDispatcher_FailsNeverSeenHost mirrors the stale-heartbeat test for
|
||||
// a host that has never heartbeated at all — LastSeenAt is NULL.
|
||||
func TestDispatcher_FailsNeverSeenHost(t *testing.T) {
|
||||
d, hosts, runs, _, cleanup := setupPickNext(t)
|
||||
defer cleanup()
|
||||
ctx := context.Background()
|
||||
// Create a fresh host with no heartbeat stamp.
|
||||
neverID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "pn-never",
|
||||
MAC: "aa:bb:cc:dd:ee:51",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, neverID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
|
||||
d.pickNext(ctx)
|
||||
|
||||
got, err := runs.Get(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if got.State != model.StateFailed {
|
||||
t.Fatalf("state = %s, want Failed", got.State)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,8 @@ type Trigger string
|
||||
|
||||
const (
|
||||
TriggerStartRequested Trigger = "StartRequested" // user clicks Start Vetting
|
||||
TriggerDispatched Trigger = "Dispatched" // dispatcher picked this run
|
||||
TriggerDispatched Trigger = "Dispatched" // dispatcher picked this run (manual-WoL override path; dormant in happy path)
|
||||
TriggerRebootCommanded Trigger = "RebootCommanded" // dispatcher (or heartbeat race) told the reporter to reboot
|
||||
TriggerPXEObserved Trigger = "PXEObserved" // iPXE fetched cmdline for MAC
|
||||
TriggerAgentClaimed Trigger = "AgentClaimed" // agent POSTed /claim with valid token
|
||||
TriggerStageFailed Trigger = "StageFailed" // a stage reported failure
|
||||
@@ -59,8 +60,9 @@ type transition struct {
|
||||
var table = map[Trigger]transition{
|
||||
TriggerStartRequested: {from: []model.RunState{model.StateRegistered}, to: model.StateQueued},
|
||||
TriggerDispatched: {from: []model.RunState{model.StateQueued}, to: model.StateWaitingWoL},
|
||||
TriggerPXEObserved: {from: []model.RunState{model.StateWaitingWoL, model.StateBooting}, to: model.StateBooting},
|
||||
TriggerAgentClaimed: {from: []model.RunState{model.StateBooting, model.StateWaitingWoL}, to: model.StateInventoryCheck},
|
||||
TriggerRebootCommanded: {from: []model.RunState{model.StateQueued}, to: model.StateWaitingReboot},
|
||||
TriggerPXEObserved: {from: []model.RunState{model.StateWaitingReboot, model.StateWaitingWoL, model.StateBooting}, to: model.StateBooting},
|
||||
TriggerAgentClaimed: {from: []model.RunState{model.StateBooting, model.StateWaitingReboot, model.StateWaitingWoL}, to: model.StateInventoryCheck},
|
||||
TriggerStageFailed: {from: allActiveStates(), to: model.StateFailedHolding},
|
||||
TriggerAllStagesPassed: {from: []model.RunState{model.StateReporting}, to: model.StateCompleted},
|
||||
TriggerOperatorReleased: {from: []model.RunState{model.StateFailedHolding}, to: model.StateReleased},
|
||||
@@ -121,7 +123,7 @@ func nextStageState(current model.RunState) (model.RunState, error) {
|
||||
|
||||
func allActiveStates() []model.RunState {
|
||||
return []model.RunState{
|
||||
model.StateQueued, model.StateWaitingWoL, model.StateBooting,
|
||||
model.StateQueued, model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting,
|
||||
model.StateInventoryCheck, model.StateSpecValidate, model.StateSMART,
|
||||
model.StateCPUStress, model.StateStorage, model.StateNetwork,
|
||||
model.StateGPU, model.StatePSU, model.StateReporting,
|
||||
|
||||
@@ -40,6 +40,40 @@ func TestNextForOverride(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestTriggerRebootCommanded exercises the new heartbeat-first trigger:
|
||||
// Queued → WaitingReboot, and any other current state is an error.
|
||||
func TestTriggerRebootCommanded(t *testing.T) {
|
||||
got, err := orchestrator.Next(model.StateQueued, orchestrator.TriggerRebootCommanded)
|
||||
if err != nil {
|
||||
t.Fatalf("Queued + RebootCommanded: %v", err)
|
||||
}
|
||||
if got != model.StateWaitingReboot {
|
||||
t.Fatalf("got %q, want %q", got, model.StateWaitingReboot)
|
||||
}
|
||||
for _, bad := range []model.RunState{
|
||||
model.StateRegistered, model.StateBooting, model.StateInventoryCheck, model.StateCompleted,
|
||||
} {
|
||||
if _, err := orchestrator.Next(bad, orchestrator.TriggerRebootCommanded); err == nil {
|
||||
t.Fatalf("RebootCommanded from %q: expected error", bad)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTriggerAgentClaimedFromWaitingReboot: the agent's /claim must
|
||||
// advance the run out of WaitingReboot (new happy path) AND out of
|
||||
// legacy WaitingWoL, otherwise live boots wouldn't be recognised.
|
||||
func TestTriggerAgentClaimedFromWaitingReboot(t *testing.T) {
|
||||
for _, from := range []model.RunState{model.StateWaitingReboot, model.StateWaitingWoL, model.StateBooting} {
|
||||
got, err := orchestrator.Next(from, orchestrator.TriggerAgentClaimed)
|
||||
if err != nil {
|
||||
t.Fatalf("AgentClaimed from %q: %v", from, err)
|
||||
}
|
||||
if got != model.StateInventoryCheck {
|
||||
t.Fatalf("AgentClaimed from %q = %q, want InventoryCheck", from, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNextStageWalk(t *testing.T) {
|
||||
// Walking StageCompleted from each stage should land on the next
|
||||
// one in the canonical order, and from Reporting onto Completed.
|
||||
|
||||
Reference in New Issue
Block a user