Host detail v2: full pipeline + per-stage logs + WoL diagnostics
CI / Lint + build + test (push) Has been cancelled
CI / Lint + build + test (push) Has been cancelled
Pipeline now always renders all 13 nodes (3 pre-stage + 9 stage +
Completed), synthesising ghosts from run state when stage rows
aren't seeded yet. Makes a WaitingWoL host show the full timeline
ahead of it instead of just 4 dots.
Agent tags each log line with its stage; logs.Hub fans out to both
log-{runID} and log-{runID}-{stage} SSE events so the detail page
can show per-stage tabs with a pure-CSS radio-sibling switch. Flat
run log prepends [stage] so grep still works.
Dispatcher writes picked/sent-WoL/heartbeat lines into the per-run
log — the operator opens the detail page, sees WaitingWoL stuck,
and reads exactly what the dispatcher did and why nothing's
progressing, instead of having to tail journalctl on the LXC.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -2,9 +2,12 @@ package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"vetting/internal/logs"
|
||||
"vetting/internal/model"
|
||||
"vetting/internal/store"
|
||||
)
|
||||
@@ -12,6 +15,10 @@ import (
|
||||
// Dispatcher picks Queued runs off the DB and drives them through
|
||||
// WaitingWoL (sending a WoL packet). Concurrency is capped at Max.
|
||||
//
|
||||
// Pre-stage log lines (picked, WoL-sent, heartbeat, agent-claimed)
|
||||
// are written into the per-run log via Logs so the detail page's
|
||||
// log pane can show what's happening before the agent is alive.
|
||||
//
|
||||
// For Phase 2 the dispatcher's job ends at WaitingWoL; further
|
||||
// transitions are driven by iPXE and agent callbacks. Phase 4+ will
|
||||
// return here and shepherd each run through stage execution.
|
||||
@@ -20,22 +27,32 @@ type Dispatcher struct {
|
||||
Runs *store.Runs
|
||||
Hosts *store.Hosts
|
||||
Runner *Runner
|
||||
Logs *logs.Hub
|
||||
|
||||
active chan struct{}
|
||||
stop chan struct{}
|
||||
|
||||
// heartbeat tracks the last time we emitted a "still waiting"
|
||||
// line for a given run, so the ticker doesn't spam the log.
|
||||
hbMu sync.Mutex
|
||||
lastBeat map[int64]time.Time
|
||||
beatEvery time.Duration
|
||||
}
|
||||
|
||||
func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner) *Dispatcher {
|
||||
func NewDispatcher(max int, runs *store.Runs, hosts *store.Hosts, runner *Runner, logHub *logs.Hub) *Dispatcher {
|
||||
if max < 1 {
|
||||
max = 1
|
||||
}
|
||||
return &Dispatcher{
|
||||
Max: max,
|
||||
Runs: runs,
|
||||
Hosts: hosts,
|
||||
Runner: runner,
|
||||
active: make(chan struct{}, max),
|
||||
stop: make(chan struct{}),
|
||||
Max: max,
|
||||
Runs: runs,
|
||||
Hosts: hosts,
|
||||
Runner: runner,
|
||||
Logs: logHub,
|
||||
active: make(chan struct{}, max),
|
||||
stop: make(chan struct{}),
|
||||
lastBeat: map[int64]time.Time{},
|
||||
beatEvery: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +75,7 @@ func (d *Dispatcher) loop(ctx context.Context) {
|
||||
return
|
||||
case <-t.C:
|
||||
d.pickNext(ctx)
|
||||
d.heartbeatWaiting(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,19 +124,93 @@ func (d *Dispatcher) pickNext(ctx context.Context) {
|
||||
log.Printf("dispatcher: get host %d: %v", queued.HostID, err)
|
||||
return
|
||||
}
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: picked run for host %s (mac=%s wol=%s:%d)",
|
||||
host.Name, host.MAC, host.WoLBroadcastIP, host.WoLPort))
|
||||
if _, err := d.Runner.Transition(ctx, queued.ID, TriggerDispatched); err != nil {
|
||||
log.Printf("dispatcher: transition run %d: %v", queued.ID, err)
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: transition to WaitingWoL failed: %v", err))
|
||||
return
|
||||
}
|
||||
if err := SendWoL(host.MAC, host.WoLBroadcastIP, host.WoLPort); err != nil {
|
||||
log.Printf("dispatcher: WoL run %d host %s: %v", queued.ID, host.Name, err)
|
||||
d.runLog(queued.ID, "error", fmt.Sprintf("dispatcher: WoL send failed: %v — check broadcast %s:%d is reachable",
|
||||
err, host.WoLBroadcastIP, host.WoLPort))
|
||||
// Stay in WaitingWoL; operator can retry or investigate.
|
||||
return
|
||||
}
|
||||
log.Printf("dispatcher: WoL sent for run %d (host=%s mac=%s)", queued.ID, host.Name, host.MAC)
|
||||
d.runLog(queued.ID, "info", fmt.Sprintf("dispatcher: sent WoL packet to %s via %s:%d — waiting for agent claim",
|
||||
host.MAC, host.WoLBroadcastIP, host.WoLPort))
|
||||
|
||||
// Prime the heartbeat so the first "still waiting" fires 30s after
|
||||
// dispatch, not immediately.
|
||||
d.hbMu.Lock()
|
||||
d.lastBeat[queued.ID] = time.Now()
|
||||
d.hbMu.Unlock()
|
||||
|
||||
// Slot stays reserved until the run leaves active (Phase 4+).
|
||||
// Phase 2 lets the loop observe inFlight via DB state.
|
||||
released = true
|
||||
<-d.active
|
||||
}
|
||||
|
||||
// heartbeatWaiting emits a "still waiting" log line every beatEvery for
|
||||
// each run still sitting in WaitingWoL. Helps the operator spot hangs
|
||||
// without having to tail journalctl on the LXC.
|
||||
func (d *Dispatcher) heartbeatWaiting(ctx context.Context) {
|
||||
if d.Logs == nil {
|
||||
return
|
||||
}
|
||||
runs, err := d.Runs.Active(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
d.hbMu.Lock()
|
||||
defer d.hbMu.Unlock()
|
||||
seen := map[int64]bool{}
|
||||
for i := range runs {
|
||||
r := &runs[i]
|
||||
seen[r.ID] = true
|
||||
if r.State != model.StateWaitingWoL {
|
||||
continue
|
||||
}
|
||||
last, ok := d.lastBeat[r.ID]
|
||||
if !ok {
|
||||
// Run already in WaitingWoL from a previous process lifetime
|
||||
// — prime so we don't spam immediately.
|
||||
d.lastBeat[r.ID] = now
|
||||
continue
|
||||
}
|
||||
if now.Sub(last) < d.beatEvery {
|
||||
continue
|
||||
}
|
||||
elapsed := now.Sub(r.StartedAt).Truncate(time.Second)
|
||||
d.runLog(r.ID, "info", fmt.Sprintf(
|
||||
"still waiting for agent claim (%s) — check BIOS WoL, pxe.enabled, and live-image presence",
|
||||
elapsed))
|
||||
d.lastBeat[r.ID] = now
|
||||
}
|
||||
// Garbage-collect entries for runs that have left WaitingWoL.
|
||||
for id := range d.lastBeat {
|
||||
if !seen[id] {
|
||||
delete(d.lastBeat, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runLog writes a single line into the per-run log. Safe to call with a
|
||||
// nil hub (tests construct Dispatcher directly) — it degrades to a
|
||||
// stderr log line so nothing silently disappears.
|
||||
func (d *Dispatcher) runLog(runID int64, level, text string) {
|
||||
if d.Logs == nil {
|
||||
log.Printf("run-%d %s: %s", runID, level, text)
|
||||
return
|
||||
}
|
||||
w, err := d.Logs.WriterFor(runID)
|
||||
if err != nil {
|
||||
log.Printf("dispatcher: open log for run %d: %v", runID, err)
|
||||
return
|
||||
}
|
||||
w.Append(logs.Line{Level: level, Text: text})
|
||||
}
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"vetting/internal/events"
|
||||
"vetting/internal/logs"
|
||||
)
|
||||
|
||||
// TestDispatcher_RunLogWritesToHub verifies the plumbing between the
|
||||
// dispatcher and the per-run log hub: runLog must persist to the on-disk
|
||||
// file so the detail page's replay + SSE fan-out see the same
|
||||
// pre-stage diagnostics (picked / sent WoL / heartbeat).
|
||||
func TestDispatcher_RunLogWritesToHub(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
ev := events.NewHub()
|
||||
lh, err := logs.NewHub(dir, ev)
|
||||
if err != nil {
|
||||
t.Fatalf("NewHub: %v", err)
|
||||
}
|
||||
defer lh.Close()
|
||||
|
||||
d := &Dispatcher{Logs: lh}
|
||||
d.runLog(7, "info", "dispatcher: sent WoL packet to aa:bb:cc:dd:ee:ff via 10.0.0.255:9")
|
||||
|
||||
body, err := os.ReadFile(filepath.Join(dir, "run-7.log"))
|
||||
if err != nil {
|
||||
t.Fatalf("read run log: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(body), "dispatcher: sent WoL packet") {
|
||||
t.Fatalf("run log missing dispatcher line: %q", body)
|
||||
}
|
||||
if !strings.Contains(string(body), "INFO") {
|
||||
t.Fatalf("run log missing level: %q", body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDispatcher_RunLogNilHubDoesNotPanic: tests construct Dispatcher
|
||||
// directly without a hub. runLog must degrade to stderr rather than
|
||||
// panicking so the dispatcher loop stays alive.
|
||||
func TestDispatcher_RunLogNilHubDoesNotPanic(t *testing.T) {
|
||||
d := &Dispatcher{}
|
||||
d.runLog(1, "info", "fallback path")
|
||||
}
|
||||
Reference in New Issue
Block a user