f79fe0f0db
Reshapes the detail page into a run-view: hybrid horizontal pipeline
+ expanded active-step pane with sub-steps, a per-step log pane with
line-numbered permalinks and client-side search, and a runs-history
sidebar that navigates via ?run=N. Default step is server-picked
(running → failed → Reporting) so the operator lands on the thing
that's moving.
Adds a sub_steps table + SSE topic (substep-{run}-{stage}-{ordinal})
so per-disk and per-pass work (SMART, CPUStress CPU/RAM, Storage,
GPU) is visible in the UI instead of buried in stage summary JSON.
Agent emits sub-step reports from existing per-iteration loops.
Dashboard tiles become a mini run-view with a 9-dot step strip so
the operator reads run health across the whole grid at a glance.
Register page gets the same card shell + button styling.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
239 lines
8.5 KiB
Go
239 lines
8.5 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"vetting/internal/events"
|
|
"vetting/internal/model"
|
|
"vetting/internal/store"
|
|
)
|
|
|
|
// Runner is the authoritative mutator for run state. All state
|
|
// transitions go through (*Runner).Transition so the DB update and
|
|
// the event publication happen together.
|
|
type Runner struct {
|
|
Runs *store.Runs
|
|
Hosts *store.Hosts
|
|
Stages *store.Stages
|
|
EventHub *events.Hub
|
|
}
|
|
|
|
func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) {
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get run: %w", err)
|
|
}
|
|
next, err := Next(run.State, trigger)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := r.Runs.SetState(ctx, runID, next); err != nil {
|
|
return "", fmt.Errorf("persist transition: %w", err)
|
|
}
|
|
log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger)
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
return next, nil
|
|
}
|
|
|
|
// StartStage marks a stage row running and publishes a tile refresh.
|
|
func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error {
|
|
if err := r.Stages.StartByName(ctx, runID, name); err != nil {
|
|
return err
|
|
}
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err == nil {
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteStage marks a stage row passed/failed/skipped and publishes a
|
|
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
|
|
// stage completion triggers an SSE update — without this, stage dots on
|
|
// the pipeline wouldn't advance until the next run-state transition.
|
|
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
|
|
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
|
|
return err
|
|
}
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err == nil {
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PublishTileUpdate is the exported entry point for non-orchestrator
|
|
// callers (the UI heartbeat handler) that change tile-visible state
|
|
// without going through Transition.
|
|
func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) {
|
|
r.publishTileUpdate(ctx, hostID)
|
|
}
|
|
|
|
// PublishHostDetail broadcasts fresh HTML fragments for every non-log,
|
|
// non-pipeline region of the host detail page: summary header, actions
|
|
// row, spec-diffs list, and the hold-key SSH block. Callers should
|
|
// invoke this alongside PublishTileUpdate from any site that mutates
|
|
// state visible on the detail page.
|
|
//
|
|
// Safe to call when no renderer has been registered or the host has
|
|
// been deleted; the call is silently dropped.
|
|
func (r *Runner) PublishHostDetail(ctx context.Context, hostID int64) {
|
|
if HostDetailRenderer == nil || r.EventHub == nil {
|
|
return
|
|
}
|
|
f, ok := HostDetailRenderer(ctx, hostID)
|
|
if !ok {
|
|
return
|
|
}
|
|
r.EventHub.Publish(events.Event{
|
|
Name: fmt.Sprintf("detail-summary-%d", hostID),
|
|
Payload: f.Summary,
|
|
})
|
|
r.EventHub.Publish(events.Event{
|
|
Name: fmt.Sprintf("detail-actions-%d", hostID),
|
|
Payload: f.Actions,
|
|
})
|
|
if f.LatestRunID != 0 {
|
|
r.EventHub.Publish(events.Event{
|
|
Name: fmt.Sprintf("detail-specdiffs-%d", f.LatestRunID),
|
|
Payload: f.SpecDiffs,
|
|
})
|
|
r.EventHub.Publish(events.Event{
|
|
Name: fmt.Sprintf("detail-hold-%d", f.LatestRunID),
|
|
Payload: f.Hold,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
|
|
host, err := r.Hosts.Get(ctx, hostID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: get host %d: %v", hostID, err)
|
|
return
|
|
}
|
|
latest, err := r.Runs.LatestForHost(ctx, hostID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: latest run: %v", err)
|
|
return
|
|
}
|
|
payload := renderTileSSE(ctx, *host, latest)
|
|
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
|
|
|
|
// Pipeline fragment — same call sites as the tile refresh, keyed by
|
|
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
|
|
// it up. Silently skips when no renderer is wired or no run exists.
|
|
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
|
|
stages, err := r.Stages.ListForRun(ctx, latest.ID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
|
|
} else {
|
|
pipePayload := PipelineRenderer(latest, stages)
|
|
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
|
|
}
|
|
}
|
|
|
|
// Detail-page fragments — everything on /hosts/{id} that isn't the
|
|
// pipeline or the log pane. Co-located here so every site that
|
|
// already publishes a tile refresh also refreshes the detail page
|
|
// without the caller having to remember a second call.
|
|
r.PublishHostDetail(ctx, hostID)
|
|
}
|
|
|
|
// TileRenderer renders a single tile fragment. Registered at startup
|
|
// so the orchestrator package stays free of template / store-enrichment
|
|
// imports. The closure is expected to do any DB lookups itself (spec-
|
|
// diff count, hold-key path, …) before handing the data to the
|
|
// template package.
|
|
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
|
|
|
|
// PipelineRenderer renders the detail-page pipeline fragment for the
|
|
// given run + its stage rows. Registered alongside TileRenderer so
|
|
// orchestrator stays free of template imports.
|
|
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
|
|
|
|
// SubStepRenderer renders a single sub-step row fragment. Fires on
|
|
// every sub-step state transition (running → passed/failed) so the
|
|
// detail page's `<div sse-swap="substep-{runID}-{stage}-{ordinal}">`
|
|
// target updates without reloading.
|
|
var SubStepRenderer func(ss model.SubStep) string
|
|
|
|
// PublishSubStepUpdate broadcasts a single sub-step row. Callers give
|
|
// the just-persisted SubStep; we render + fan out. Safe to call when
|
|
// no renderer is wired; drops silently.
|
|
func (r *Runner) PublishSubStepUpdate(ctx context.Context, ss model.SubStep) {
|
|
if SubStepRenderer == nil || r.EventHub == nil {
|
|
return
|
|
}
|
|
r.EventHub.Publish(events.Event{
|
|
Name: fmt.Sprintf("substep-%d-%s-%d", ss.RunID, ss.StageName, ss.Ordinal),
|
|
Payload: SubStepRenderer(ss),
|
|
})
|
|
}
|
|
|
|
// HostDetailFragments is the pre-rendered bundle of HTML fragments a
|
|
// single PublishHostDetail call broadcasts over SSE. Summary and Actions
|
|
// are always set; SpecDiffs and Hold are empty strings when there is no
|
|
// latest run (the corresponding events are not published in that case).
|
|
type HostDetailFragments struct {
|
|
Summary string
|
|
Actions string
|
|
SpecDiffs string
|
|
Hold string
|
|
LatestRunID int64 // 0 when the host has no runs yet
|
|
}
|
|
|
|
// HostDetailRenderer produces the four fragments for a given host.
|
|
// Registered at startup by main so the orchestrator doesn't import the
|
|
// template or store-enrichment layers. Returns ok=false when the host
|
|
// cannot be loaded (deleted, DB error); caller skips publish in that
|
|
// case.
|
|
var HostDetailRenderer func(ctx context.Context, hostID int64) (HostDetailFragments, bool)
|
|
|
|
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
|
|
if TileRenderer == nil {
|
|
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
|
|
}
|
|
return TileRenderer(ctx, host, latest)
|
|
}
|
|
|
|
// TouchHeartbeat is called on every agent heartbeat so the orchestrator
|
|
// can record last-seen; Phase 2 just logs, Phase 3+ will update a
|
|
// last_seen_at column.
|
|
func (r *Runner) TouchHeartbeat(runID int64) {
|
|
_ = runID
|
|
_ = time.Now()
|
|
}
|
|
|
|
// Override re-enters a held stage after the operator has acknowledged
|
|
// the failure condition (e.g. wipe-probe override). It jumps
|
|
// FailedHolding → StateFor(failed_stage), clears the failed marker, and
|
|
// publishes a tile refresh so the UI drops the hold banner.
|
|
func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) {
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get run: %w", err)
|
|
}
|
|
if run.FailedStage == "" {
|
|
return "", fmt.Errorf("override: run has no failed_stage")
|
|
}
|
|
next, err := NextForOverride(run.State, run.FailedStage)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil {
|
|
return "", fmt.Errorf("persist override flags: %w", err)
|
|
}
|
|
if err := r.Runs.SetState(ctx, runID, next); err != nil {
|
|
return "", fmt.Errorf("override transition: %w", err)
|
|
}
|
|
if err := r.Runs.ClearFailedStage(ctx, runID); err != nil {
|
|
log.Printf("override: clear failed_stage: %v", err)
|
|
}
|
|
log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON)
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
return next, nil
|
|
}
|