package orchestrator import ( "context" "fmt" "log" "time" "vetting/internal/events" "vetting/internal/model" "vetting/internal/store" ) // Runner is the authoritative mutator for run state. All state // transitions go through (*Runner).Transition so the DB update and // the event publication happen together. type Runner struct { Runs *store.Runs Hosts *store.Hosts Stages *store.Stages EventHub *events.Hub } func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) { run, err := r.Runs.Get(ctx, runID) if err != nil { return "", fmt.Errorf("get run: %w", err) } next, err := Next(run.State, trigger) if err != nil { return "", err } if err := r.Runs.SetState(ctx, runID, next); err != nil { return "", fmt.Errorf("persist transition: %w", err) } log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger) r.publishTileUpdate(ctx, run.HostID) return next, nil } // StartStage marks a stage row running and publishes a tile refresh. func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error { if err := r.Stages.StartByName(ctx, runID, name); err != nil { return err } run, err := r.Runs.Get(ctx, runID) if err == nil { r.publishTileUpdate(ctx, run.HostID) } return nil } // CompleteStage marks a stage row passed/failed/skipped and publishes a // tile + pipeline refresh. Wrapper around Stages.CompleteByName so every // stage completion triggers an SSE update — without this, stage dots on // the pipeline wouldn't advance until the next run-state transition. func (r *Runner) CompleteStage(ctx context.Context, runID int64, name string, state model.StageState, summaryJSON string) error { if err := r.Stages.CompleteByName(ctx, runID, name, state, summaryJSON); err != nil { return err } run, err := r.Runs.Get(ctx, runID) if err == nil { r.publishTileUpdate(ctx, run.HostID) } return nil } // PublishTileUpdate is the exported entry point for non-orchestrator // callers (the UI heartbeat handler) that change tile-visible state // without going through Transition. func (r *Runner) PublishTileUpdate(ctx context.Context, hostID int64) { r.publishTileUpdate(ctx, hostID) } // PublishHostPage broadcasts fresh HTML fragments for every host-keyed // region on /hosts/{id}: summary card, primary-actions row, and the // in-flight banner. It also fires a runrow-{runID} swap for every run // whose row is affected by this state change (the active one plus any // run that just completed). Callers should invoke this alongside // PublishTileUpdate at every site that mutates state visible on the // host page or its runs table. // // Safe to call when no renderer has been registered or the host has // been deleted; the call is silently dropped. func (r *Runner) PublishHostPage(ctx context.Context, hostID int64) { if HostPageRenderer == nil || r.EventHub == nil { return } f, ok := HostPageRenderer(ctx, hostID) if !ok { return } r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("detail-summary-%d", hostID), Payload: f.Summary, }) r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("detail-actions-%d", hostID), Payload: f.Actions, }) r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("detail-inflight-%d", hostID), Payload: f.InFlightBanner, }) for runID, payload := range f.RunRows { r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("runrow-%d", runID), Payload: payload, }) } } // PublishRunPage broadcasts fresh HTML fragments for every run-keyed // region on /runs/{runID}: header (with Cancel / Start-new-run / // View-report), hold banner, and spec diffs. The pipeline is already // fired separately from publishTileUpdate. Caller is any site that // transitions run state or writes a spec-diff / hold row. // // Safe to call when no renderer has been registered or the run has // been deleted; the call is silently dropped. func (r *Runner) PublishRunPage(ctx context.Context, runID int64) { if RunPageRenderer == nil || r.EventHub == nil { return } f, ok := RunPageRenderer(ctx, runID) if !ok { return } r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("run-header-%d", runID), Payload: f.Header, }) r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("detail-hold-%d", runID), Payload: f.Hold, }) r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("detail-specdiffs-%d", runID), Payload: f.SpecDiffs, }) } func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) { host, err := r.Hosts.Get(ctx, hostID) if err != nil { log.Printf("publishTileUpdate: get host %d: %v", hostID, err) return } latest, err := r.Runs.LatestForHost(ctx, hostID) if err != nil { log.Printf("publishTileUpdate: latest run: %v", err) return } payload := renderTileSSE(ctx, *host, latest) r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload}) // Pipeline fragment — same call sites as the tile refresh, keyed by // run ID so the detail page's
picks // it up. Silently skips when no renderer is wired or no run exists. if latest != nil && PipelineRenderer != nil && r.Stages != nil { stages, err := r.Stages.ListForRun(ctx, latest.ID) if err != nil { log.Printf("publishTileUpdate: list stages run %d: %v", latest.ID, err) } else { pipePayload := PipelineRenderer(latest, stages) r.EventHub.Publish(events.Event{Name: fmt.Sprintf("pipeline-%d", latest.ID), Payload: pipePayload}) } } // Host-page fragments — everything on /hosts/{id} that isn't the // pipeline or the log pane: summary card, primary actions, in-flight // banner, and per-run row swaps in the runs table. Co-located here // so every tile-refresh site also refreshes the host page without // the caller having to remember a second call. r.PublishHostPage(ctx, hostID) // Run-page fragments — header (cancel button visibility), hold // banner, spec diffs. Fires alongside the tile + pipeline refreshes // so any state-change site covers both /hosts/{id} and /runs/{runID}. if latest != nil { r.PublishRunPage(ctx, latest.ID) } } // TileRenderer renders a single tile fragment. Registered at startup // so the orchestrator package stays free of template / store-enrichment // imports. The closure is expected to do any DB lookups itself (spec- // diff count, hold-key path, …) before handing the data to the // template package. var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string // PipelineRenderer renders the detail-page pipeline fragment for the // given run + its stage rows. Registered alongside TileRenderer so // orchestrator stays free of template imports. var PipelineRenderer func(run *model.Run, stages []model.Stage) string // SubStepRenderer renders a single sub-step row fragment. Fires on // every sub-step state transition (running → passed/failed) so the // detail page's `
` // target updates without reloading. var SubStepRenderer func(ss model.SubStep) string // PublishSubStepUpdate broadcasts a single sub-step row. Callers give // the just-persisted SubStep; we render + fan out. Safe to call when // no renderer is wired; drops silently. func (r *Runner) PublishSubStepUpdate(ctx context.Context, ss model.SubStep) { if SubStepRenderer == nil || r.EventHub == nil { return } r.EventHub.Publish(events.Event{ Name: fmt.Sprintf("substep-%d-%s-%d", ss.RunID, ss.StageName, ss.Ordinal), Payload: SubStepRenderer(ss), }) } // HostPageFragments is the pre-rendered bundle a single PublishHostPage // call broadcasts over SSE. Summary, Actions, and InFlightBanner are // always set. RunRows is a runID → pre-rendered map so every row // whose state just changed refreshes atomically (typically only the // active run, plus whichever run just became terminal). type HostPageFragments struct { Summary string Actions string InFlightBanner string RunRows map[int64]string } // HostPageRenderer produces the fragments for a given host. Registered // at startup by main so the orchestrator doesn't import the template // or store-enrichment layers. Returns ok=false when the host cannot be // loaded (deleted, DB error); caller skips publish in that case. var HostPageRenderer func(ctx context.Context, hostID int64) (HostPageFragments, bool) // RunPageFragments is the pre-rendered bundle a single PublishRunPage // call broadcasts over SSE. Header is always set; Hold and SpecDiffs // are always set too (they emit an empty placeholder when no hold / // diffs exist, so the first real event has a DOM target). type RunPageFragments struct { Header string Hold string SpecDiffs string } // RunPageRenderer produces the fragments for a given run. Registered at // startup by main so the orchestrator doesn't import the template or // store-enrichment layers. Returns ok=false when the run cannot be // loaded. var RunPageRenderer func(ctx context.Context, runID int64) (RunPageFragments, bool) func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string { if TileRenderer == nil { return fmt.Sprintf(`
state change
`, host.ID) } return TileRenderer(ctx, host, latest) } // TouchHeartbeat is called on every agent heartbeat so the orchestrator // can record last-seen; Phase 2 just logs, Phase 3+ will update a // last_seen_at column. func (r *Runner) TouchHeartbeat(runID int64) { _ = runID _ = time.Now() } // Override re-enters a held stage after the operator has acknowledged // the failure condition (e.g. wipe-probe override). It jumps // FailedHolding → StateFor(failed_stage), clears the failed marker, and // publishes a tile refresh so the UI drops the hold banner. func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) { run, err := r.Runs.Get(ctx, runID) if err != nil { return "", fmt.Errorf("get run: %w", err) } if run.FailedStage == "" { return "", fmt.Errorf("override: run has no failed_stage") } next, err := NextForOverride(run.State, run.FailedStage) if err != nil { return "", err } if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil { return "", fmt.Errorf("persist override flags: %w", err) } if err := r.Runs.SetState(ctx, runID, next); err != nil { return "", fmt.Errorf("override transition: %w", err) } if err := r.Runs.ClearFailedStage(ctx, runID); err != nil { log.Printf("override: clear failed_stage: %v", err) } log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON) r.publishTileUpdate(ctx, run.HostID) return next, nil }