bb658a8435
CI / Lint + build + test (push) Has been cancelled
Click a tile to open /hosts/{id} — the canonical control surface per
host. Timeline renders every pre-stage, stage, and terminal node in
order, with the current one pulsing, failed ones flagged, and
downstream ones dimmed as skipped. Detail page shows summary, hold
card (when holding), all action buttons, spec diffs, a full-height
log pane, and a collapsed expected-spec YAML.
Tile slims to name, last-seen, status, and one primary action; a
CSS-overlay <a> makes the whole card clickable while buttons stay
receptive via z-index.
Runner.publishTileUpdate now also emits pipeline-{runID} fragments,
and CompleteStage wraps Stages.CompleteByName so stage completions
advance the timeline live — without this the dots only moved on
state transitions.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
159 lines
5.5 KiB
Go
159 lines
5.5 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"vetting/internal/events"
|
|
"vetting/internal/model"
|
|
"vetting/internal/store"
|
|
)
|
|
|
|
// Runner is the authoritative mutator for run state. All state
|
|
// transitions go through (*Runner).Transition so the DB update and
|
|
// the event publication happen together.
|
|
type Runner struct {
|
|
Runs *store.Runs
|
|
Hosts *store.Hosts
|
|
Stages *store.Stages
|
|
EventHub *events.Hub
|
|
}
|
|
|
|
func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) {
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get run: %w", err)
|
|
}
|
|
next, err := Next(run.State, trigger)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := r.Runs.SetState(ctx, runID, next); err != nil {
|
|
return "", fmt.Errorf("persist transition: %w", err)
|
|
}
|
|
log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger)
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
return next, nil
|
|
}
|
|
|
|
// StartStage marks a stage row running and publishes a tile refresh.
|
|
func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error {
|
|
if err := r.Stages.StartByName(ctx, runID, name); err != nil {
|
|
return err
|
|
}
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err == nil {
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteStage marks a stage row passed/failed/skipped and publishes a
|
|
// tile + pipeline refresh. Wrapper around Stages.CompleteByName so every
|
|
// stage completion triggers an SSE update — without this, stage dots on
|
|
// the pipeline wouldn't advance until the next run-state transition.
|
|
func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error {
|
|
if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil {
|
|
return err
|
|
}
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err == nil {
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PublishTileUpdate is the exported entry point for non-orchestrator
|
|
// callers (the UI heartbeat handler) that change tile-visible state
|
|
// without going through Transition.
|
|
func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) {
|
|
r.publishTileUpdate(ctx, hostID)
|
|
}
|
|
|
|
func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) {
|
|
host, err := r.Hosts.Get(ctx, hostID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: get host %d: %v", hostID, err)
|
|
return
|
|
}
|
|
latest, err := r.Runs.LatestForHost(ctx, hostID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: latest run: %v", err)
|
|
return
|
|
}
|
|
payload := renderTileSSE(ctx, *host, latest)
|
|
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload})
|
|
|
|
// Pipeline fragment — same call sites as the tile refresh, keyed by
|
|
// run ID so the detail page's <section sse-swap="pipeline-N"> picks
|
|
// it up. Silently skips when no renderer is wired or no run exists.
|
|
if latest != nil && PipelineRenderer != nil && r.Stages != nil {
|
|
stages, err := r.Stages.ListForRun(ctx, latest.ID)
|
|
if err != nil {
|
|
log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err)
|
|
return
|
|
}
|
|
pipePayload := PipelineRenderer(latest, stages)
|
|
r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload})
|
|
}
|
|
}
|
|
|
|
// TileRenderer renders a single tile fragment. Registered at startup
|
|
// so the orchestrator package stays free of template / store-enrichment
|
|
// imports. The closure is expected to do any DB lookups itself (spec-
|
|
// diff count, hold-key path, …) before handing the data to the
|
|
// template package.
|
|
var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string
|
|
|
|
// PipelineRenderer renders the detail-page pipeline fragment for the
|
|
// given run + its stage rows. Registered alongside TileRenderer so
|
|
// orchestrator stays free of template imports.
|
|
var PipelineRenderer func(run *model.Run, stages []model.Stage) string
|
|
|
|
func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string {
|
|
if TileRenderer == nil {
|
|
return fmt.Sprintf(`<article id="host-%d">state change</article>`, host.ID)
|
|
}
|
|
return TileRenderer(ctx, host, latest)
|
|
}
|
|
|
|
// TouchHeartbeat is called on every agent heartbeat so the orchestrator
|
|
// can record last-seen; Phase 2 just logs, Phase 3+ will update a
|
|
// last_seen_at column.
|
|
func (r *Runner) TouchHeartbeat(runID int64) {
|
|
_ = runID
|
|
_ = time.Now()
|
|
}
|
|
|
|
// Override re-enters a held stage after the operator has acknowledged
|
|
// the failure condition (e.g. wipe-probe override). It jumps
|
|
// FailedHolding → StateFor(failed_stage), clears the failed marker, and
|
|
// publishes a tile refresh so the UI drops the hold banner.
|
|
func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) {
|
|
run, err := r.Runs.Get(ctx, runID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get run: %w", err)
|
|
}
|
|
if run.FailedStage == "" {
|
|
return "", fmt.Errorf("override: run has no failed_stage")
|
|
}
|
|
next, err := NextForOverride(run.State, run.FailedStage)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil {
|
|
return "", fmt.Errorf("persist override flags: %w", err)
|
|
}
|
|
if err := r.Runs.SetState(ctx, runID, next); err != nil {
|
|
return "", fmt.Errorf("override transition: %w", err)
|
|
}
|
|
if err := r.Runs.ClearFailedStage(ctx, runID); err != nil {
|
|
log.Printf("override: clear failed_stage: %v", err)
|
|
}
|
|
log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON)
|
|
r.publishTileUpdate(ctx, run.HostID)
|
|
return next, nil
|
|
}
|