Click a tile to open /hosts/{id} — the canonical control surface per
host. Timeline renders every pre-stage, stage, and terminal node in
order, with the current one pulsing, failed ones flagged, and
downstream ones dimmed as skipped. Detail page shows summary, hold
card (when holding), all action buttons, spec diffs, a full-height
log pane, and a collapsed expected-spec YAML.
Tile slims to name, last-seen, status, and one primary action; a
CSS-overlay <a> makes the whole card clickable while buttons stay
receptive via z-index.
Runner.publishTileUpdate now also emits pipeline-{runID} fragments,
and CompleteStage wraps Stages.CompleteByName so stage completions
advance the timeline live — without this the dots only moved on
state transitions.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -50,6 +50,21 @@ func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// CompleteStage marks a stage row passed/failed/skipped and publishes a
|
||||
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
|
||||
// stage completion triggers an SSE update — without this, stage dots on
|
||||
// the pipeline wouldn't advance until the next run-state transition.
|
||||
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
|
||||
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
|
||||
return err
|
||||
}
|
||||
run, err := r.Runs.Get(ctx, runID)
|
||||
if err == nil {
|
||||
r.publishTileUpdate(ctx, run.HostID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishTileUpdate is the exported entry point for non-orchestrator
|
||||
// callers (the UI heartbeat handler) that change tile-visible state
|
||||
// without going through Transition.
|
||||
@@ -70,6 +85,19 @@ func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
|
||||
}
|
||||
payload := renderTileSSE(ctx, *host, latest)
|
||||
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
|
||||
|
||||
// Pipeline fragment — same call sites as the tile refresh, keyed by
|
||||
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
|
||||
// it up. Silently skips when no renderer is wired or no run exists.
|
||||
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
|
||||
stages, err := r.Stages.ListForRun(ctx, latest.ID)
|
||||
if err != nil {
|
||||
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
|
||||
return
|
||||
}
|
||||
pipePayload := PipelineRenderer(latest, stages)
|
||||
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
|
||||
}
|
||||
}
|
||||
|
||||
// TileRenderer renders a single tile fragment. Registered at startup
|
||||
@@ -79,6 +107,11 @@ func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
|
||||
// template package.
|
||||
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
|
||||
|
||||
// PipelineRenderer renders the detail-page pipeline fragment for the
|
||||
// given run + its stage rows. Registered alongside TileRenderer so
|
||||
// orchestrator stays free of template imports.
|
||||
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
|
||||
|
||||
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
|
||||
if TileRenderer == nil {
|
||||
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
|
||||
|
||||
@@ -0,0 +1,164 @@
|
||||
package orchestrator_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"vetting/internal/db"
|
||||
"vetting/internal/events"
|
||||
"vetting/internal/model"
|
||||
"vetting/internal/orchestrator"
|
||||
"vetting/internal/store"
|
||||
)
|
||||
|
||||
// setupRunner wires a real DB + stores + hub and registers a minimal
|
||||
// TileRenderer/PipelineRenderer so publishTileUpdate emits something
|
||||
// recognisable. Returns Runner plus helpers to drain events.
|
||||
func setupRunner(t *testing.T) (*orchestrator.Runner, *store.Hosts, *store.Runs, *events.Hub, func()) {
|
||||
t.Helper()
|
||||
conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open db: %v", err)
|
||||
}
|
||||
hub := events.NewHub()
|
||||
hosts := &store.Hosts{DB: conn}
|
||||
runs := &store.Runs{DB: conn}
|
||||
stages := &store.Stages{DB: conn}
|
||||
runner := &orchestrator.Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub}
|
||||
|
||||
// Deterministic renderer stubs — use known substrings so tests can
|
||||
// grep the published fragments without parsing HTML.
|
||||
prevTile := orchestrator.TileRenderer
|
||||
prevPipe := orchestrator.PipelineRenderer
|
||||
orchestrator.TileRenderer = func(_ context.Context, host model.Host, _ *model.Run) string {
|
||||
return fmt.Sprintf(`<article id="host-%d">tile</article>`, host.ID)
|
||||
}
|
||||
orchestrator.PipelineRenderer = func(run *model.Run, _ []model.Stage) string {
|
||||
return fmt.Sprintf(`<section id="pipeline-%d">pipeline</section>`, run.ID)
|
||||
}
|
||||
cleanup := func() {
|
||||
orchestrator.TileRenderer = prevTile
|
||||
orchestrator.PipelineRenderer = prevPipe
|
||||
_ = conn.Close()
|
||||
}
|
||||
return runner, hosts, runs, hub, cleanup
|
||||
}
|
||||
|
||||
// TestPublishesTileAndPipelineOnTransition asserts that a single
|
||||
// Transition call publishes both the tile-{hostID} and pipeline-{runID}
|
||||
// fragments — the detail-page timeline needs this to advance on every
|
||||
// state change without its own call site.
|
||||
func TestPublishesTileAndPipelineOnTransition(t *testing.T) {
|
||||
runner, hosts, runs, hub, cleanup := setupRunner(t)
|
||||
defer cleanup()
|
||||
ctx := context.Background()
|
||||
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "runner-tile",
|
||||
MAC: "aa:bb:cc:dd:ee:40",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
|
||||
_, _, cancel := hub.Subscribe()
|
||||
defer cancel()
|
||||
// Subscribe one more time so we have a channel we can drain.
|
||||
_, ch, cancel2 := hub.Subscribe()
|
||||
defer cancel2()
|
||||
|
||||
// Queued → WaitingWoL on Dispatched.
|
||||
if _, err := runner.Transition(ctx, runID, orchestrator.TriggerDispatched); err != nil {
|
||||
t.Fatalf("transition: %v", err)
|
||||
}
|
||||
|
||||
// Collect events with a short deadline; we expect tile + pipeline
|
||||
// from this one Transition call.
|
||||
wantTile := fmt.Sprintf("tile-%d", hostID)
|
||||
wantPipeline := fmt.Sprintf("pipeline-%d", runID)
|
||||
sawTile, sawPipeline := false, false
|
||||
deadline := time.After(500 * time.Millisecond)
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case ev := <-ch:
|
||||
if ev.Name == wantTile {
|
||||
sawTile = true
|
||||
}
|
||||
if ev.Name == wantPipeline {
|
||||
sawPipeline = true
|
||||
}
|
||||
if sawTile && sawPipeline {
|
||||
break loop
|
||||
}
|
||||
case <-deadline:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
if !sawTile {
|
||||
t.Errorf("no %s event published", wantTile)
|
||||
}
|
||||
if !sawPipeline {
|
||||
t.Errorf("no %s event published", wantPipeline)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCompleteStagePublishesPipeline covers the stage-completion path
|
||||
// that used to go direct-to-Stages, bypassing the SSE refresh. The
|
||||
// Runner.CompleteStage wrapper exists so stage-dot advancements show up
|
||||
// on the detail page without waiting for the next run-state transition.
|
||||
func TestCompleteStagePublishesPipeline(t *testing.T) {
|
||||
runner, hosts, runs, hub, cleanup := setupRunner(t)
|
||||
defer cleanup()
|
||||
ctx := context.Background()
|
||||
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "runner-cs",
|
||||
MAC: "aa:bb:cc:dd:ee:41",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 8\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
// CompleteStage needs stage rows to exist first — Seed them.
|
||||
stages := &store.Stages{DB: runs.DB}
|
||||
if err := stages.Seed(ctx, runID); err != nil {
|
||||
t.Fatalf("seed stages: %v", err)
|
||||
}
|
||||
|
||||
_, ch, cancel := hub.Subscribe()
|
||||
defer cancel()
|
||||
|
||||
if err := runner.CompleteStage(ctx, runID, "Inventory", model.StagePassed, `{}`); err != nil {
|
||||
t.Fatalf("CompleteStage: %v", err)
|
||||
}
|
||||
|
||||
wantPipeline := fmt.Sprintf("pipeline-%d", runID)
|
||||
deadline := time.After(500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case ev := <-ch:
|
||||
if ev.Name == wantPipeline {
|
||||
return
|
||||
}
|
||||
case <-deadline:
|
||||
t.Fatalf("no %s event published by CompleteStage", wantPipeline)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user