package orchestrator import ( "context" "fmt" "log" "sync" "time" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/store" ) // Dispatcher picks Queued runs off the DB and drives them through // WaitingWoL (sending a WoL packet). Concurrency is capped at Max. // // Pre-stage log lines (picked, WoL-sent, heartbeat, agent-claimed) // are written into the per-run log via Logs so the detail page's // log pane can show what's happening before the agent is alive. // // For Phase 2 the dispatcher's job ends at WaitingWoL; further // transitions are driven by iPXE and agent callbacks. Phase 4+ will // return here and shepherd each run through stage execution. type Dispatcher struct { Max int Runs *store.Runs Hosts *store.Hosts Runner *Runner Logs *logs.Hub active chan struct{} stop chan struct{} // heartbeat tracks the last time we emitted a "still waiting" // line for a given run, so the ticker doesn't spam the log. hbMu sync.Mutex lastBeat map[int64]time.Time beatEvery time.Duration } func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher { if max < 1 { max = 1 } return &Dispatcher{ Max: max, Runs: runs, Hosts: hosts, Runner: runner, Logs: logHub, active: make(chan struct{}, max), stop: make(chan struct{}), lastBeat: map[int64]time.Time{}, beatEvery: 30 * time.Second, } } func (d *Dispatcher) Start(ctx context.Context) { go d.loop(ctx) } func (d *Dispatcher) Stop() { close(d.stop) } func (d *Dispatcher) loop(ctx context.Context) { t := time.NewTicker(2 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-d.stop: return case <-t.C: d.pickNext(ctx) d.heartbeatWaiting(ctx) } } } func (d *Dispatcher) pickNext(ctx context.Context) { select { case d.active <- struct{}{}: default: return // at capacity } released := false defer func() { if !released { <-d.active } }() runs, err := d.Runs.Active(ctx) if err != nil { log.Printf("dispatcher: list active: %v", err) return } var queued *model.Run inFlight := 0 for i := range runs { switch runs[i].State { case model.StateQueued: if queued == nil { queued = &runs[i] } case model.StateWaitingWoL, model.StateBooting, model.StateInventoryCheck, model.StateSpecValidate, model.StateSMART, model.StateCPUStress, model.StateStorage, model.StateNetwork, model.StateGPU, model.StatePSU, model.StateReporting: inFlight++ } } if inFlight >= d.Max || queued == nil { return } host, err := d.Hosts.Get(ctx, queued.HostID) if err != nil { log.Printf("dispatcher: get host %d: %v", queued.HostID, err) return } d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: picked run for host %s (mac=%s wol=%s:%d)", host.Name, host.MAC, host.WoLBroadcastIP, host.WoLPort)) if _, err := d.Runner.Transition(ctx, queued.ID, TriggerDispatched); err != nil { log.Printf("dispatcher: transition run %d: %v", queued.ID, err) d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingWoL failed: %v", err)) return } if err := SendWoL(host.MAC, host.WoLBroadcastIP, host.WoLPort); err != nil { log.Printf("dispatcher: WoL run %d host %s: %v", queued.ID, host.Name, err) d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: WoL send failed: %v — check broadcast %s:%d is reachable", err, host.WoLBroadcastIP, host.WoLPort)) // Stay in WaitingWoL; operator can retry or investigate. return } log.Printf("dispatcher: WoL sent for run %d (host=%s mac=%s)", queued.ID, host.Name, host.MAC) d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: sent WoL packet to %s via %s:%d — waiting for agent claim", host.MAC, host.WoLBroadcastIP, host.WoLPort)) // Prime the heartbeat so the first "still waiting" fires 30s after // dispatch, not immediately. d.hbMu.Lock() d.lastBeat[queued.ID] = time.Now() d.hbMu.Unlock() // Slot stays reserved until the run leaves active (Phase 4+). // Phase 2 lets the loop observe inFlight via DB state. released = true <-d.active } // heartbeatWaiting emits a "still waiting" log line every beatEvery for // each run still sitting in WaitingWoL. Helps the operator spot hangs // without having to tail journalctl on the LXC. func (d *Dispatcher) heartbeatWaiting(ctx context.Context) { if d.Logs == nil { return } runs, err := d.Runs.Active(ctx) if err != nil { return } now := time.Now() d.hbMu.Lock() defer d.hbMu.Unlock() seen := map[int64]bool{} for i := range runs { r := &runs[i] seen[r.ID] = true if r.State != model.StateWaitingWoL { continue } last, ok := d.lastBeat[r.ID] if !ok { // Run already in WaitingWoL from a previous process lifetime // — prime so we don't spam immediately. d.lastBeat[r.ID] = now continue } if now.Sub(last) < d.beatEvery { continue } elapsed := now.Sub(r.StartedAt).Truncate(time.Second) d.runLog(r.ID, "info", fmt.Sprintf( "still waiting for agent claim (%s) — check BIOS WoL, pxe.enabled, and live-image presence", elapsed)) d.lastBeat[r.ID] = now } // Garbage-collect entries for runs that have left WaitingWoL. for id := range d.lastBeat { if !seen[id] { delete(d.lastBeat, id) } } } // runLog writes a single line into the per-run log. Safe to call with a // nil hub (tests construct Dispatcher directly) — it degrades to a // stderr log line so nothing silently disappears. func (d *Dispatcher) runLog(runID int64, level, text string) { if d.Logs == nil { log.Printf("run-%d %s: %s", runID, level, text) return } w, err := d.Logs.WriterFor(runID) if err != nil { log.Printf("dispatcher: open log for run %d: %v", runID, err) return } w.Append(logs.Line{Level: level, Text: text}) }