package orchestrator import ( "context" "os" "path/filepath" "strings" "testing" "time" "vetting/internal/db" "vetting/internal/events" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/store" ) // setupPickNext wires a real SQLite DB so pickNext can exercise the // full Hosts/Runs/Runner path. Returns the dispatcher + seeded host ID + // a cleanup. Host starts with a fresh heartbeat stamp so the default is // "dispatch would succeed"; callers stale it out as needed. func setupPickNext(t *testing.T) (*Dispatcher, *store.Hosts, *store.Runs, int64, func()) { t.Helper() conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db")) if err != nil { t.Fatalf("open db: %v", err) } hosts := &store.Hosts{DB: conn} runs := &store.Runs{DB: conn} stages := &store.Stages{DB: conn} hub := events.NewHub() runner := &Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub} logDir := t.TempDir() lh, err := logs.NewHub(logDir, hub) if err != nil { t.Fatalf("NewHub: %v", err) } d := NewDispatcher(3, runs, hosts, runner, lh) ctx := context.Background() hostID, err := hosts.Create(ctx, model.Host{ Name: "pn-host", MAC: "aa:bb:cc:dd:ee:50", WoLBroadcastIP: "10.0.0.255", WoLPort: 9, ExpectedSpecYAML: "memory:\n total_gib: 16\n", }) if err != nil { t.Fatalf("create host: %v", err) } // Default: heartbeating now. if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC()); err != nil { t.Fatalf("stamp: %v", err) } cleanup := func() { lh.Close() _ = conn.Close() } return d, hosts, runs, hostID, cleanup } // TestDispatcher_RunLogWritesToHub verifies the plumbing between the // dispatcher and the per-run log hub: runLog must persist to the on-disk // file so the detail page's replay + SSE fan-out see the same // pre-stage diagnostics (picked / sent WoL / heartbeat). func TestDispatcher_RunLogWritesToHub(t *testing.T) { dir := t.TempDir() ev := events.NewHub() lh, err := logs.NewHub(dir, ev) if err != nil { t.Fatalf("NewHub: %v", err) } defer lh.Close() d := &Dispatcher{Logs: lh} d.runLog(7, "info", "dispatcher: sent WoL packet to aa:bb:cc:dd:ee:ff via 10.0.0.255:9") body, err := os.ReadFile(filepath.Join(dir, "run-7.log")) if err != nil { t.Fatalf("read run log: %v", err) } if !strings.Contains(string(body), "dispatcher: sent WoL packet") { t.Fatalf("run log missing dispatcher line: %q", body) } if !strings.Contains(string(body), "INFO") { t.Fatalf("run log missing level: %q", body) } } // TestDispatcher_RunLogNilHubDoesNotPanic: tests construct Dispatcher // directly without a hub. runLog must degrade to stderr rather than // panicking so the dispatcher loop stays alive. func TestDispatcher_RunLogNilHubDoesNotPanic(t *testing.T) { d := &Dispatcher{} d.runLog(1, "info", "fallback path") } // TestDispatcher_TransitionsToWaitingRebootNoWoL: happy path. Host is // heartbeating, run is Queued — one pickNext tick must transition to // WaitingReboot via the new RebootCommanded trigger and log that the // host is heartbeating. No "sent WoL packet" line allowed. func TestDispatcher_TransitionsToWaitingRebootNoWoL(t *testing.T) { d, _, runs, hostID, cleanup := setupPickNext(t) defer cleanup() ctx := context.Background() runID, err := runs.Create(ctx, hostID, "deadbeef", false) if err != nil { t.Fatalf("create run: %v", err) } d.pickNext(ctx) got, err := runs.Get(ctx, runID) if err != nil { t.Fatalf("get run: %v", err) } if got.State != model.StateWaitingReboot { t.Fatalf("state = %s, want WaitingReboot", got.State) } body, err := os.ReadFile(filepath.Join(d.Logs.PathFor(runID))) //nolint:staticcheck if err != nil { t.Fatalf("read log: %v", err) } text := string(body) if strings.Contains(text, "sent WoL packet") { t.Fatalf("dispatcher should not fire WoL on heartbeating host: %s", text) } if !strings.Contains(text, "heartbeating") { t.Fatalf("missing heartbeating log line: %s", text) } } // TestDispatcher_FailsStaleHeartbeat: host hasn't heartbeat for >60s. // Dispatcher must refuse, mark the run Failed with failed_stage=dispatch, // and log at error level — not loop forever on an unreachable box. func TestDispatcher_FailsStaleHeartbeat(t *testing.T) { d, hosts, runs, hostID, cleanup := setupPickNext(t) defer cleanup() ctx := context.Background() // Stale: 5m ago is well past the 60s cutoff. if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC().Add(-5*time.Minute)); err != nil { t.Fatalf("stamp stale: %v", err) } runID, err := runs.Create(ctx, hostID, "deadbeef", false) if err != nil { t.Fatalf("create run: %v", err) } d.pickNext(ctx) got, err := runs.Get(ctx, runID) if err != nil { t.Fatalf("get run: %v", err) } if got.State != model.StateFailed { t.Fatalf("state = %s, want Failed", got.State) } if got.FailedStage != "dispatch" { t.Fatalf("failed_stage = %q, want dispatch", got.FailedStage) } body, _ := os.ReadFile(d.Logs.PathFor(runID)) if !strings.Contains(string(body), "quick.sh") { t.Fatalf("expected quick.sh hint in run log: %s", body) } } // TestDispatcher_FailsNeverSeenHost mirrors the stale-heartbeat test for // a host that has never heartbeated at all — LastSeenAt is NULL. func TestDispatcher_FailsNeverSeenHost(t *testing.T) { d, hosts, runs, _, cleanup := setupPickNext(t) defer cleanup() ctx := context.Background() // Create a fresh host with no heartbeat stamp. neverID, err := hosts.Create(ctx, model.Host{ Name: "pn-never", MAC: "aa:bb:cc:dd:ee:51", WoLBroadcastIP: "10.0.0.255", WoLPort: 9, ExpectedSpecYAML: "memory:\n total_gib: 16\n", }) if err != nil { t.Fatalf("create host: %v", err) } runID, err := runs.Create(ctx, neverID, "deadbeef", false) if err != nil { t.Fatalf("create run: %v", err) } d.pickNext(ctx) got, err := runs.Get(ctx, runID) if err != nil { t.Fatalf("get run: %v", err) } if got.State != model.StateFailed { t.Fatalf("state = %s, want Failed", got.State) } }