Files
Vetting/internal/orchestrator/runner.go
T
josh 0db790ae3e
CI / Lint + build + test (push) Successful in 1m29s
Release / release (push) Has been cancelled
ui: stream host-detail fragments over SSE so the page updates live
The detail page was only partly live: Pipeline + LogTabs subscribed to
SSE, but the summary header, actions row, spec-diffs list and hold-key
block all froze at page-load and required a manual refresh to catch up
with state changes.

Extract each of those four regions into its own named templ component
with a stable id and sse-swap target, add Render*String helpers so the
orchestrator can publish pre-rendered fragments, and register a
HostDetailRenderer alongside the existing Tile/Pipeline renderers.
PublishHostDetail is folded into publishTileUpdate so every call site
that already refreshes a tile now also refreshes the detail page —
keeps the fan-out honest without scattering new publish calls.

The empty-state wrappers for spec-diffs and hold are load-bearing:
without the <section id=... sse-swap=...> present at initial GET, the
first live event after SpecValidate or Hold writes would have no DOM
node to swap into.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 16:36:13 -04:00

220 lines
7.7 KiB
Go

package orchestrator
import (
"context"
"fmt"
"log"
"time"
"vetting/internal/events"
"vetting/internal/model"
"vetting/internal/store"
)
// Runner is the authoritative mutator for run state. All state
// transitions go through (*Runner).Transition so the DB update and
// the event publication happen together.
type Runner struct {
Runs *store.Runs
Hosts *store.Hosts
Stages *store.Stages
EventHub *events.Hub
}
func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
next, err := Next(run.State, trigger)
if err != nil {
return "", err
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("persist transition: %w", err)
}
log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}
// StartStage marks a stage row running and publishes a tile refresh.
func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error {
if err := r.Stages.StartByName(ctx, runID, name); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// CompleteStage marks a stage row passed/failed/skipped and publishes a
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
// stage completion triggers an SSE update — without this, stage dots on
// the pipeline wouldn't advance until the next run-state transition.
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// PublishTileUpdate is the exported entry point for non-orchestrator
// callers (the UI heartbeat handler) that change tile-visible state
// without going through Transition.
func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) {
r.publishTileUpdate(ctx, hostID)
}
// PublishHostDetail broadcasts fresh HTML fragments for every non-log,
// non-pipeline region of the host detail page: summary header, actions
// row, spec-diffs list, and the hold-key SSH block. Callers should
// invoke this alongside PublishTileUpdate from any site that mutates
// state visible on the detail page.
//
// Safe to call when no renderer has been registered or the host has
// been deleted; the call is silently dropped.
func (r *Runner) PublishHostDetail(ctx context.Context, hostID int64) {
if HostDetailRenderer == nil || r.EventHub == nil {
return
}
f, ok := HostDetailRenderer(ctx, hostID)
if !ok {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-summary-%d", hostID),
Payload: f.Summary,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-actions-%d", hostID),
Payload: f.Actions,
})
if f.LatestRunID != 0 {
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-specdiffs-%d", f.LatestRunID),
Payload: f.SpecDiffs,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-hold-%d", f.LatestRunID),
Payload: f.Hold,
})
}
}
func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
host, err := r.Hosts.Get(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: get host %d: %v", hostID, err)
return
}
latest, err := r.Runs.LatestForHost(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: latest run: %v", err)
return
}
payload := renderTileSSE(ctx, *host, latest)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
// Pipeline fragment — same call sites as the tile refresh, keyed by
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
// it up. Silently skips when no renderer is wired or no run exists.
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
stages, err := r.Stages.ListForRun(ctx, latest.ID)
if err != nil {
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
} else {
pipePayload := PipelineRenderer(latest, stages)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
}
}
// Detail-page fragments — everything on /hosts/{id} that isn't the
// pipeline or the log pane. Co-located here so every site that
// already publishes a tile refresh also refreshes the detail page
// without the caller having to remember a second call.
r.PublishHostDetail(ctx, hostID)
}
// TileRenderer renders a single tile fragment. Registered at startup
// so the orchestrator package stays free of template / store-enrichment
// imports. The closure is expected to do any DB lookups itself (spec-
// diff count, hold-key path, …) before handing the data to the
// template package.
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
// PipelineRenderer renders the detail-page pipeline fragment for the
// given run + its stage rows. Registered alongside TileRenderer so
// orchestrator stays free of template imports.
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
// HostDetailFragments is the pre-rendered bundle of HTML fragments a
// single PublishHostDetail call broadcasts over SSE. Summary and Actions
// are always set; SpecDiffs and Hold are empty strings when there is no
// latest run (the corresponding events are not published in that case).
type HostDetailFragments struct {
Summary string
Actions string
SpecDiffs string
Hold string
LatestRunID int64 // 0 when the host has no runs yet
}
// HostDetailRenderer produces the four fragments for a given host.
// Registered at startup by main so the orchestrator doesn't import the
// template or store-enrichment layers. Returns ok=false when the host
// cannot be loaded (deleted, DB error); caller skips publish in that
// case.
var HostDetailRenderer func(ctx context.Context, hostID int64) (HostDetailFragments, bool)
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
if TileRenderer == nil {
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
}
return TileRenderer(ctx, host, latest)
}
// TouchHeartbeat is called on every agent heartbeat so the orchestrator
// can record last-seen; Phase 2 just logs, Phase 3+ will update a
// last_seen_at column.
func (r *Runner) TouchHeartbeat(runID int64) {
_ = runID
_ = time.Now()
}
// Override re-enters a held stage after the operator has acknowledged
// the failure condition (e.g. wipe-probe override). It jumps
// FailedHolding → StateFor(failed_stage), clears the failed marker, and
// publishes a tile refresh so the UI drops the hold banner.
func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
if run.FailedStage == "" {
return "", fmt.Errorf("override: run has no failed_stage")
}
next, err := NextForOverride(run.State, run.FailedStage)
if err != nil {
return "", err
}
if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil {
return "", fmt.Errorf("persist override flags: %w", err)
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("override transition: %w", err)
}
if err := r.Runs.ClearFailedStage(ctx, runID); err != nil {
log.Printf("override: clear failed_stage: %v", err)
}
log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}