Files
Vetting/internal/orchestrator/runner.go
T
josh 19608bef1b
CI / Lint + build + test (push) Successful in 1m35s
Release / release (push) Successful in 23m47s
ui: split /hosts/{id} into host page + /runs/{runID} run page
Host page owns host metadata, full runs table with per-row stage strip,
in-flight banner, and empty-state CTA. Run page owns pipeline, active
step, logs, sub-steps, spec diffs, and hold banner with a breadcrumb
back to the host. Dashboard tile reverts to host-only.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 20:37:57 -04:00

294 lines
10 KiB
Go

package orchestrator
import (
"context"
"fmt"
"log"
"time"
"vetting/internal/events"
"vetting/internal/model"
"vetting/internal/store"
)
// Runner is the authoritative mutator for run state. All state
// transitions go through (*Runner).Transition so the DB update and
// the event publication happen together.
type Runner struct {
Runs *store.Runs
Hosts *store.Hosts
Stages *store.Stages
EventHub *events.Hub
}
func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
next, err := Next(run.State, trigger)
if err != nil {
return "", err
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("persist transition: %w", err)
}
log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}
// StartStage marks a stage row running and publishes a tile refresh.
func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error {
if err := r.Stages.StartByName(ctx, runID, name); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// CompleteStage marks a stage row passed/failed/skipped and publishes a
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
// stage completion triggers an SSE update — without this, stage dots on
// the pipeline wouldn't advance until the next run-state transition.
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// PublishTileUpdate is the exported entry point for non-orchestrator
// callers (the UI heartbeat handler) that change tile-visible state
// without going through Transition.
func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) {
r.publishTileUpdate(ctx, hostID)
}
// PublishHostPage broadcasts fresh HTML fragments for every host-keyed
// region on /hosts/{id}: summary card, primary-actions row, and the
// in-flight banner. It also fires a runrow-{runID} swap for every run
// whose row is affected by this state change (the active one plus any
// run that just completed). Callers should invoke this alongside
// PublishTileUpdate at every site that mutates state visible on the
// host page or its runs table.
//
// Safe to call when no renderer has been registered or the host has
// been deleted; the call is silently dropped.
func (r *Runner) PublishHostPage(ctx context.Context, hostID int64) {
if HostPageRenderer == nil || r.EventHub == nil {
return
}
f, ok := HostPageRenderer(ctx, hostID)
if !ok {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-summary-%d", hostID),
Payload: f.Summary,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-actions-%d", hostID),
Payload: f.Actions,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-inflight-%d", hostID),
Payload: f.InFlightBanner,
})
for runID, payload := range f.RunRows {
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("runrow-%d", runID),
Payload: payload,
})
}
}
// PublishRunPage broadcasts fresh HTML fragments for every run-keyed
// region on /runs/{runID}: header (with Cancel / Start-new-run /
// View-report), hold banner, and spec diffs. The pipeline is already
// fired separately from publishTileUpdate. Caller is any site that
// transitions run state or writes a spec-diff / hold row.
//
// Safe to call when no renderer has been registered or the run has
// been deleted; the call is silently dropped.
func (r *Runner) PublishRunPage(ctx context.Context, runID int64) {
if RunPageRenderer == nil || r.EventHub == nil {
return
}
f, ok := RunPageRenderer(ctx, runID)
if !ok {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("run-header-%d", runID),
Payload: f.Header,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-hold-%d", runID),
Payload: f.Hold,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-specdiffs-%d", runID),
Payload: f.SpecDiffs,
})
}
func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
host, err := r.Hosts.Get(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: get host %d: %v", hostID, err)
return
}
latest, err := r.Runs.LatestForHost(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: latest run: %v", err)
return
}
payload := renderTileSSE(ctx, *host, latest)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
// Pipeline fragment — same call sites as the tile refresh, keyed by
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
// it up. Silently skips when no renderer is wired or no run exists.
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
stages, err := r.Stages.ListForRun(ctx, latest.ID)
if err != nil {
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
} else {
pipePayload := PipelineRenderer(latest, stages)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
}
}
// Host-page fragments — everything on /hosts/{id} that isn't the
// pipeline or the log pane: summary card, primary actions, in-flight
// banner, and per-run row swaps in the runs table. Co-located here
// so every tile-refresh site also refreshes the host page without
// the caller having to remember a second call.
r.PublishHostPage(ctx, hostID)
// Run-page fragments — header (cancel button visibility), hold
// banner, spec diffs. Fires alongside the tile + pipeline refreshes
// so any state-change site covers both /hosts/{id} and /runs/{runID}.
if latest != nil {
r.PublishRunPage(ctx, latest.ID)
}
}
// TileRenderer renders a single tile fragment. Registered at startup
// so the orchestrator package stays free of template / store-enrichment
// imports. The closure is expected to do any DB lookups itself (spec-
// diff count, hold-key path, …) before handing the data to the
// template package.
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
// PipelineRenderer renders the detail-page pipeline fragment for the
// given run + its stage rows. Registered alongside TileRenderer so
// orchestrator stays free of template imports.
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
// SubStepRenderer renders a single sub-step row fragment. Fires on
// every sub-step state transition (running → passed/failed) so the
// detail page's `<div sse-swap="substep-{runID}-{stage}-{ordinal}">`
// target updates without reloading.
var SubStepRenderer func(ss model.SubStep) string
// PublishSubStepUpdate broadcasts a single sub-step row. Callers give
// the just-persisted SubStep; we render + fan out. Safe to call when
// no renderer is wired; drops silently.
func (r *Runner) PublishSubStepUpdate(ctx context.Context, ss model.SubStep) {
if SubStepRenderer == nil || r.EventHub == nil {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("substep-%d-%s-%d", ss.RunID, ss.StageName, ss.Ordinal),
Payload: SubStepRenderer(ss),
})
}
// HostPageFragments is the pre-rendered bundle a single PublishHostPage
// call broadcasts over SSE. Summary, Actions, and InFlightBanner are
// always set. RunRows is a runID → pre-rendered <tr> map so every row
// whose state just changed refreshes atomically (typically only the
// active run, plus whichever run just became terminal).
type HostPageFragments struct {
Summary string
Actions string
InFlightBanner string
RunRows map[int64]string
}
// HostPageRenderer produces the fragments for a given host. Registered
// at startup by main so the orchestrator doesn't import the template
// or store-enrichment layers. Returns ok=false when the host cannot be
// loaded (deleted, DB error); caller skips publish in that case.
var HostPageRenderer func(ctx context.Context, hostID int64) (HostPageFragments, bool)
// RunPageFragments is the pre-rendered bundle a single PublishRunPage
// call broadcasts over SSE. Header is always set; Hold and SpecDiffs
// are always set too (they emit an empty placeholder when no hold /
// diffs exist, so the first real event has a DOM target).
type RunPageFragments struct {
Header string
Hold string
SpecDiffs string
}
// RunPageRenderer produces the fragments for a given run. Registered at
// startup by main so the orchestrator doesn't import the template or
// store-enrichment layers. Returns ok=false when the run cannot be
// loaded.
var RunPageRenderer func(ctx context.Context, runID int64) (RunPageFragments, bool)
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
if TileRenderer == nil {
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
}
return TileRenderer(ctx, host, latest)
}
// TouchHeartbeat is called on every agent heartbeat so the orchestrator
// can record last-seen; Phase 2 just logs, Phase 3+ will update a
// last_seen_at column.
func (r *Runner) TouchHeartbeat(runID int64) {
_ = runID
_ = time.Now()
}
// Override re-enters a held stage after the operator has acknowledged
// the failure condition (e.g. wipe-probe override). It jumps
// FailedHolding → StateFor(failed_stage), clears the failed marker, and
// publishes a tile refresh so the UI drops the hold banner.
func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
if run.FailedStage == "" {
return "", fmt.Errorf("override: run has no failed_stage")
}
next, err := NextForOverride(run.State, run.FailedStage)
if err != nil {
return "", err
}
if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil {
return "", fmt.Errorf("persist override flags: %w", err)
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("override transition: %w", err)
}
if err := r.Runs.ClearFailedStage(ctx, runID); err != nil {
log.Printf("override: clear failed_stage: %v", err)
}
log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}