package orchestrator_test import ( "context" "fmt" "path/filepath" "testing" "time" "vetting/internal/db" "vetting/internal/events" "vetting/internal/model" "vetting/internal/orchestrator" "vetting/internal/store" ) // setupRunner wires a real DB + stores + hub and registers a minimal // TileRenderer/PipelineRenderer so publishTileUpdate emits something // recognisable. Returns Runner plus helpers to drain events. func setupRunner(t *testing.T) (*orchestrator.Runner, *store.Hosts, *store.Runs, *events.Hub, func()) { t.Helper() conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db")) if err != nil { t.Fatalf("open db: %v", err) } hub := events.NewHub() hosts := &store.Hosts{DB: conn} runs := &store.Runs{DB: conn} stages := &store.Stages{DB: conn} runner := &orchestrator.Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub} // Deterministic renderer stubs — use known substrings so tests can // grep the published fragments without parsing HTML. prevTile := orchestrator.TileRenderer prevPipe := orchestrator.PipelineRenderer orchestrator.TileRenderer = func(_ context.Context, host model.Host, _ *model.Run) string { return fmt.Sprintf(`
tile
`, host.ID) } orchestrator.PipelineRenderer = func(run *model.Run, _ []model.Stage) string { return fmt.Sprintf(`
pipeline
`, run.ID) } cleanup := func() { orchestrator.TileRenderer = prevTile orchestrator.PipelineRenderer = prevPipe _ = conn.Close() } return runner, hosts, runs, hub, cleanup } // TestPublishesTileAndPipelineOnTransition asserts that a single // Transition call publishes both the tile-{hostID} and pipeline-{runID} // fragments — the detail-page timeline needs this to advance on every // state change without its own call site. func TestPublishesTileAndPipelineOnTransition(t *testing.T) { runner, hosts, runs, hub, cleanup := setupRunner(t) defer cleanup() ctx := context.Background() hostID, err := hosts.Create(ctx, model.Host{ Name: "runner-tile", MAC: "aa:bb:cc:dd:ee:40", WoLBroadcastIP: "10.0.0.255", WoLPort: 9, ExpectedSpecYAML: "memory:\n total_gib: 16\n", }) if err != nil { t.Fatalf("create host: %v", err) } runID, err := runs.Create(ctx, hostID, "deadbeef", false) if err != nil { t.Fatalf("create run: %v", err) } _, _, cancel := hub.Subscribe() defer cancel() // Subscribe one more time so we have a channel we can drain. _, ch, cancel2 := hub.Subscribe() defer cancel2() // Queued → WaitingWoL on Dispatched. if _, err := runner.Transition(ctx, runID, orchestrator.TriggerDispatched); err != nil { t.Fatalf("transition: %v", err) } // Collect events with a short deadline; we expect tile + pipeline // from this one Transition call. wantTile := fmt.Sprintf("tile-%d", hostID) wantPipeline := fmt.Sprintf("pipeline-%d", runID) sawTile, sawPipeline := false, false deadline := time.After(500 * time.Millisecond) loop: for { select { case ev := <-ch: if ev.Name == wantTile { sawTile = true } if ev.Name == wantPipeline { sawPipeline = true } if sawTile && sawPipeline { break loop } case <-deadline: break loop } } if !sawTile { t.Errorf("no %s event published", wantTile) } if !sawPipeline { t.Errorf("no %s event published", wantPipeline) } } // TestCompleteStagePublishesPipeline covers the stage-completion path // that used to go direct-to-Stages, bypassing the SSE refresh. The // Runner.CompleteStage wrapper exists so stage-dot advancements show up // on the detail page without waiting for the next run-state transition. func TestCompleteStagePublishesPipeline(t *testing.T) { runner, hosts, runs, hub, cleanup := setupRunner(t) defer cleanup() ctx := context.Background() hostID, err := hosts.Create(ctx, model.Host{ Name: "runner-cs", MAC: "aa:bb:cc:dd:ee:41", WoLBroadcastIP: "10.0.0.255", WoLPort: 9, ExpectedSpecYAML: "memory:\n total_gib: 8\n", }) if err != nil { t.Fatalf("create host: %v", err) } runID, err := runs.Create(ctx, hostID, "deadbeef", false) if err != nil { t.Fatalf("create run: %v", err) } // CompleteStage needs stage rows to exist first — Seed them. stages := &store.Stages{DB: runs.DB} if err := stages.Seed(ctx, runID); err != nil { t.Fatalf("seed stages: %v", err) } _, ch, cancel := hub.Subscribe() defer cancel() if err := runner.CompleteStage(ctx, runID, "Inventory", model.StagePassed, `{}`); err != nil { t.Fatalf("CompleteStage: %v", err) } wantPipeline := fmt.Sprintf("pipeline-%d", runID) deadline := time.After(500 * time.Millisecond) for { select { case ev := <-ch: if ev.Name == wantPipeline { return } case <-deadline: t.Fatalf("no %s event published by CompleteStage", wantPipeline) } } }