Files
Vetting/internal/orchestrator/runner.go
T
josh f79fe0f0db
CI / Lint + build + test (push) Successful in 1m26s
Release / release (push) Successful in 6m47s
ui: GitHub-Actions-style detail page, sub-steps, mini-tile run-view
Reshapes the detail page into a run-view: hybrid horizontal pipeline
+ expanded active-step pane with sub-steps, a per-step log pane with
line-numbered permalinks and client-side search, and a runs-history
sidebar that navigates via ?run=N. Default step is server-picked
(running → failed → Reporting) so the operator lands on the thing
that's moving.

Adds a sub_steps table + SSE topic (substep-{run}-{stage}-{ordinal})
so per-disk and per-pass work (SMART, CPUStress CPU/RAM, Storage,
GPU) is visible in the UI instead of buried in stage summary JSON.
Agent emits sub-step reports from existing per-iteration loops.

Dashboard tiles become a mini run-view with a 9-dot step strip so
the operator reads run health across the whole grid at a glance.
Register page gets the same card shell + button styling.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 19:00:11 -04:00

239 lines
8.5 KiB
Go

package orchestrator
import (
"context"
"fmt"
"log"
"time"
"vetting/internal/events"
"vetting/internal/model"
"vetting/internal/store"
)
// Runner is the authoritative mutator for run state. All state
// transitions go through (*Runner).Transition so the DB update and
// the event publication happen together.
type Runner struct {
Runs *store.Runs
Hosts *store.Hosts
Stages *store.Stages
EventHub *events.Hub
}
func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
next, err := Next(run.State, trigger)
if err != nil {
return "", err
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("persist transition: %w", err)
}
log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}
// StartStage marks a stage row running and publishes a tile refresh.
func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error {
if err := r.Stages.StartByName(ctx, runID, name); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// CompleteStage marks a stage row passed/failed/skipped and publishes a
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
// stage completion triggers an SSE update — without this, stage dots on
// the pipeline wouldn't advance until the next run-state transition.
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
return err
}
run, err := r.Runs.Get(ctx, runID)
if err == nil {
r.publishTileUpdate(ctx, run.HostID)
}
return nil
}
// PublishTileUpdate is the exported entry point for non-orchestrator
// callers (the UI heartbeat handler) that change tile-visible state
// without going through Transition.
func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) {
r.publishTileUpdate(ctx, hostID)
}
// PublishHostDetail broadcasts fresh HTML fragments for every non-log,
// non-pipeline region of the host detail page: summary header, actions
// row, spec-diffs list, and the hold-key SSH block. Callers should
// invoke this alongside PublishTileUpdate from any site that mutates
// state visible on the detail page.
//
// Safe to call when no renderer has been registered or the host has
// been deleted; the call is silently dropped.
func (r *Runner) PublishHostDetail(ctx context.Context, hostID int64) {
if HostDetailRenderer == nil || r.EventHub == nil {
return
}
f, ok := HostDetailRenderer(ctx, hostID)
if !ok {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-summary-%d", hostID),
Payload: f.Summary,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-actions-%d", hostID),
Payload: f.Actions,
})
if f.LatestRunID != 0 {
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-specdiffs-%d", f.LatestRunID),
Payload: f.SpecDiffs,
})
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("detail-hold-%d", f.LatestRunID),
Payload: f.Hold,
})
}
}
func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
host, err := r.Hosts.Get(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: get host %d: %v", hostID, err)
return
}
latest, err := r.Runs.LatestForHost(ctx, hostID)
if err != nil {
log.Printf("publishTileUpdate: latest run: %v", err)
return
}
payload := renderTileSSE(ctx, *host, latest)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
// Pipeline fragment — same call sites as the tile refresh, keyed by
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
// it up. Silently skips when no renderer is wired or no run exists.
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
stages, err := r.Stages.ListForRun(ctx, latest.ID)
if err != nil {
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
} else {
pipePayload := PipelineRenderer(latest, stages)
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
}
}
// Detail-page fragments — everything on /hosts/{id} that isn't the
// pipeline or the log pane. Co-located here so every site that
// already publishes a tile refresh also refreshes the detail page
// without the caller having to remember a second call.
r.PublishHostDetail(ctx, hostID)
}
// TileRenderer renders a single tile fragment. Registered at startup
// so the orchestrator package stays free of template / store-enrichment
// imports. The closure is expected to do any DB lookups itself (spec-
// diff count, hold-key path, …) before handing the data to the
// template package.
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
// PipelineRenderer renders the detail-page pipeline fragment for the
// given run + its stage rows. Registered alongside TileRenderer so
// orchestrator stays free of template imports.
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
// SubStepRenderer renders a single sub-step row fragment. Fires on
// every sub-step state transition (running → passed/failed) so the
// detail page's `<div sse-swap="substep-{runID}-{stage}-{ordinal}">`
// target updates without reloading.
var SubStepRenderer func(ss model.SubStep) string
// PublishSubStepUpdate broadcasts a single sub-step row. Callers give
// the just-persisted SubStep; we render + fan out. Safe to call when
// no renderer is wired; drops silently.
func (r *Runner) PublishSubStepUpdate(ctx context.Context, ss model.SubStep) {
if SubStepRenderer == nil || r.EventHub == nil {
return
}
r.EventHub.Publish(events.Event{
Name: fmt.Sprintf("substep-%d-%s-%d", ss.RunID, ss.StageName, ss.Ordinal),
Payload: SubStepRenderer(ss),
})
}
// HostDetailFragments is the pre-rendered bundle of HTML fragments a
// single PublishHostDetail call broadcasts over SSE. Summary and Actions
// are always set; SpecDiffs and Hold are empty strings when there is no
// latest run (the corresponding events are not published in that case).
type HostDetailFragments struct {
Summary string
Actions string
SpecDiffs string
Hold string
LatestRunID int64 // 0 when the host has no runs yet
}
// HostDetailRenderer produces the four fragments for a given host.
// Registered at startup by main so the orchestrator doesn't import the
// template or store-enrichment layers. Returns ok=false when the host
// cannot be loaded (deleted, DB error); caller skips publish in that
// case.
var HostDetailRenderer func(ctx context.Context, hostID int64) (HostDetailFragments, bool)
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
if TileRenderer == nil {
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
}
return TileRenderer(ctx, host, latest)
}
// TouchHeartbeat is called on every agent heartbeat so the orchestrator
// can record last-seen; Phase 2 just logs, Phase 3+ will update a
// last_seen_at column.
func (r *Runner) TouchHeartbeat(runID int64) {
_ = runID
_ = time.Now()
}
// Override re-enters a held stage after the operator has acknowledged
// the failure condition (e.g. wipe-probe override). It jumps
// FailedHolding → StateFor(failed_stage), clears the failed marker, and
// publishes a tile refresh so the UI drops the hold banner.
func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) {
run, err := r.Runs.Get(ctx, runID)
if err != nil {
return "", fmt.Errorf("get run: %w", err)
}
if run.FailedStage == "" {
return "", fmt.Errorf("override: run has no failed_stage")
}
next, err := NextForOverride(run.State, run.FailedStage)
if err != nil {
return "", err
}
if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil {
return "", fmt.Errorf("persist override flags: %w", err)
}
if err := r.Runs.SetState(ctx, runID, next); err != nil {
return "", fmt.Errorf("override transition: %w", err)
}
if err := r.Runs.ClearFailedStage(ctx, runID); err != nil {
log.Printf("override: clear failed_stage: %v", err)
}
log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON)
r.publishTileUpdate(ctx, run.HostID)
return next, nil
}