package orchestrator import ( "context" "fmt" "log" "sync" "time" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/store" ) // HostHeartbeatStaleAfter is how long we tolerate a host's last_seen_at // being in the past before treating the host as offline. Set to 2× the // default reporter heartbeat interval (30s) so a single dropped heartbeat // doesn't block dispatch. Used by the StartRun preflight and the // dispatcher itself — both must agree or the operator's click-time // validation wouldn't match the dispatch-time check. const HostHeartbeatStaleAfter = 60 * time.Second // Dispatcher picks Queued runs off the DB and drives them to // WaitingReboot — the happy path is heartbeat-first: we transition and // rely on the host-mode reporter's next heartbeat to fetch the // reboot_for_vetting command. WoL is not fired in the default flow // because every supported host already runs the reporter in-OS. // // Pre-stage log lines (picked, heartbeating, agent-claimed) are // written into the per-run log via Logs so the detail page's log pane // can show what's happening before the agent is alive. // // For Phase 2 the dispatcher's job ends at WaitingReboot; further // transitions are driven by iPXE and agent callbacks. Phase 4+ will // return here and shepherd each run through stage execution. type Dispatcher struct { Max int Runs *store.Runs Hosts *store.Hosts Runner *Runner Logs *logs.Hub active chan struct{} stop chan struct{} // heartbeat tracks the last time we emitted a "still waiting" // line for a given run, so the ticker doesn't spam the log. hbMu sync.Mutex lastBeat map[int64]time.Time beatEvery time.Duration } func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher { if max < 1 { max = 1 } return &Dispatcher{ Max: max, Runs: runs, Hosts: hosts, Runner: runner, Logs: logHub, active: make(chan struct{}, max), stop: make(chan struct{}), lastBeat: map[int64]time.Time{}, beatEvery: 30 * time.Second, } } func (d *Dispatcher) Start(ctx context.Context) { go d.loop(ctx) } func (d *Dispatcher) Stop() { close(d.stop) } func (d *Dispatcher) loop(ctx context.Context) { t := time.NewTicker(2 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-d.stop: return case <-t.C: d.pickNext(ctx) d.heartbeatWaiting(ctx) } } } func (d *Dispatcher) pickNext(ctx context.Context) { select { case d.active <- struct{}{}: default: return // at capacity } released := false defer func() { if !released { <-d.active } }() runs, err := d.Runs.Active(ctx) if err != nil { log.Printf("dispatcher: list active: %v", err) return } var queued *model.Run inFlight := 0 for i := range runs { switch runs[i].State { case model.StateQueued: if queued == nil { queued = &runs[i] } case model.StateWaitingWoL, model.StateWaitingReboot, model.StateBooting, model.StateInventoryCheck, model.StateSpecValidate, model.StateSMART, model.StateCPUStress, model.StateStorage, model.StateNetwork, model.StateGPU, model.StatePSU, model.StateReporting: inFlight++ } } if inFlight >= d.Max || queued == nil { return } host, err := d.Hosts.Get(ctx, queued.HostID) if err != nil { log.Printf("dispatcher: get host %d: %v", queued.HostID, err) return } // Heartbeat gate: the StartRun preflight catches this at click time, // but a run can sit in Queued long enough for the host to go offline // between click and dispatch. Re-check here so we never fire a // reboot command at a host that can't receive it. if host.LastSeenAt == nil || time.Since(*host.LastSeenAt) > HostHeartbeatStaleAfter { var ageMsg string if host.LastSeenAt == nil { ageMsg = "never heartbeated" } else { ageMsg = fmt.Sprintf("last heartbeat %s ago", time.Since(*host.LastSeenAt).Truncate(time.Second)) } d.runLog(queued.ID, "error", fmt.Sprintf( "dispatcher: host %s is offline (%s) — refusing to dispatch; install the reporter via /register/quick.sh on the target and retry", host.Name, ageMsg)) if err := d.Runs.MarkDispatchFailed(ctx, queued.ID, "dispatch", "host stopped heartbeating before dispatch"); err != nil { log.Printf("dispatcher: mark run %d dispatch-failed: %v", queued.ID, err) } if d.Runner != nil { d.Runner.PublishTileUpdate(ctx, host.ID) } return } age := time.Since(*host.LastSeenAt).Truncate(time.Second) d.runLog(queued.ID, "info", fmt.Sprintf( "dispatcher: picked run for host %s (mac=%s, heartbeating, last seen %s ago)", host.Name, host.MAC, age)) if _, err := d.Runner.Transition(ctx, queued.ID, TriggerRebootCommanded); err != nil { log.Printf("dispatcher: transition run %d: %v", queued.ID, err) d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingReboot failed: %v", err)) return } log.Printf("dispatcher: run %d host %s → WaitingReboot (heartbeat-driven)", queued.ID, host.Name) d.runLog(queued.ID, "info", fmt.Sprintf( "dispatcher: host %s heartbeating — waiting for next reporter heartbeat to deliver reboot_for_vetting", host.Name)) // Prime the heartbeat so the first "still waiting" fires 30s after // dispatch, not immediately. d.hbMu.Lock() d.lastBeat[queued.ID] = time.Now() d.hbMu.Unlock() // Slot stays reserved until the run leaves active (Phase 4+). // Phase 2 lets the loop observe inFlight via DB state. released = true <-d.active } // heartbeatWaiting emits a "still waiting" log line every beatEvery for // each run still sitting in WaitingReboot (or legacy WaitingWoL). Helps // the operator spot hangs without having to tail journalctl on the LXC. func (d *Dispatcher) heartbeatWaiting(ctx context.Context) { if d.Logs == nil { return } runs, err := d.Runs.Active(ctx) if err != nil { return } now := time.Now() d.hbMu.Lock() defer d.hbMu.Unlock() seen := map[int64]bool{} for i := range runs { r := &runs[i] seen[r.ID] = true if r.State != model.StateWaitingReboot && r.State != model.StateWaitingWoL { continue } last, ok := d.lastBeat[r.ID] if !ok { // Run already waiting from a previous process lifetime — prime // so we don't spam immediately. d.lastBeat[r.ID] = now continue } if now.Sub(last) < d.beatEvery { continue } elapsed := now.Sub(r.StartedAt).Truncate(time.Second) d.runLog(r.ID, "info", fmt.Sprintf( "waiting for reporter to reboot + PXE-boot into live image (%s) — if this exceeds 2m, verify pxe.enabled in vetting.yaml and that the reporter actually invoked systemctl reboot", elapsed)) d.lastBeat[r.ID] = now } // Garbage-collect entries for runs that have left the waiting states. for id := range d.lastBeat { if !seen[id] { delete(d.lastBeat, id) } } } // runLog writes a single line into the per-run log. Safe to call with a // nil hub (tests construct Dispatcher directly) — it degrades to a // stderr log line so nothing silently disappears. func (d *Dispatcher) runLog(runID int64, level, text string) { if d.Logs == nil { log.Printf("run-%d %s: %s", runID, level, text) return } w, err := d.Logs.WriterFor(runID) if err != nil { log.Printf("dispatcher: open log for run %d: %v", runID, err) return } w.Append(logs.Line{Level: level, Text: text}) }