package orchestrator import ( "context" "fmt" "log" "time" "vetting/internal/events" "vetting/internal/model" "vetting/internal/store" ) // Runner is the authoritative mutator for run state. All state // transitions go through (*Runner).Transition so the DB update and // the event publication happen together. type Runner struct { Runs *store.Runs Hosts *store.Hosts Stages *store.Stages EventHub *events.Hub } func (r *Runner) Transition(ctx context.Context, runID int64, trigger Trigger) (model.RunState, error) { run, err := r.Runs.Get(ctx, runID) if err != nil { return "", fmt.Errorf("get run: %w", err) } next, err := Next(run.State, trigger) if err != nil { return "", err } if err := r.Runs.SetState(ctx, runID, next); err != nil { return "", fmt.Errorf("persist transition: %w", err) } log.Printf("run %d: %s -> %s (%s)", runID, run.State, next, trigger) r.publishTileUpdate(ctx, run.HostID) return next, nil } // StartStage marks a stage row running and publishes a tile refresh. func (r *Runner) StartStage(ctx context.Context, runID int64, name string) error { if err := r.Stages.StartByName(ctx, runID, name); err != nil { return err } run, err := r.Runs.Get(ctx, runID) if err == nil { r.publishTileUpdate(ctx, run.HostID) } return nil } func (r *Runner) publishTileUpdate(ctx context.Context, hostID int64) { host, err := r.Hosts.Get(ctx, hostID) if err != nil { log.Printf("publishTileUpdate: get host %d: %v", hostID, err) return } latest, err := r.Runs.LatestForHost(ctx, hostID) if err != nil { log.Printf("publishTileUpdate: latest run: %v", err) return } payload := renderTileSSE(ctx, *host, latest) r.EventHub.Publish(events.Event{Name: fmt.Sprintf("tile-%d", hostID), Payload: payload}) } // TileRenderer renders a single tile fragment. Registered at startup // so the orchestrator package stays free of template / store-enrichment // imports. The closure is expected to do any DB lookups itself (spec- // diff count, hold-key path, …) before handing the data to the // template package. var TileRenderer func(ctx context.Context, host model.Host, latest *model.Run) string func renderTileSSE(ctx context.Context, host model.Host, latest *model.Run) string { if TileRenderer == nil { return fmt.Sprintf(`
state change
`, host.ID) } return TileRenderer(ctx, host, latest) } // TouchHeartbeat is called on every agent heartbeat so the orchestrator // can record last-seen; Phase 2 just logs, Phase 3+ will update a // last_seen_at column. func (r *Runner) TouchHeartbeat(runID int64) { _ = runID _ = time.Now() } // Override re-enters a held stage after the operator has acknowledged // the failure condition (e.g. wipe-probe override). It jumps // FailedHolding → StateFor(failed_stage), clears the failed marker, and // publishes a tile refresh so the UI drops the hold banner. func (r *Runner) Override(ctx context.Context, runID int64, flagsJSON string) (model.RunState, error) { run, err := r.Runs.Get(ctx, runID) if err != nil { return "", fmt.Errorf("get run: %w", err) } if run.FailedStage == "" { return "", fmt.Errorf("override: run has no failed_stage") } next, err := NextForOverride(run.State, run.FailedStage) if err != nil { return "", err } if err := r.Runs.SetOverrideFlags(ctx, runID, flagsJSON); err != nil { return "", fmt.Errorf("persist override flags: %w", err) } if err := r.Runs.SetState(ctx, runID, next); err != nil { return "", fmt.Errorf("override transition: %w", err) } if err := r.Runs.ClearFailedStage(ctx, runID); err != nil { log.Printf("override: clear failed_stage: %v", err) } log.Printf("run %d: %s -> %s (OperatorOverride stage=%s flags=%s)", runID, run.State, next, run.FailedStage, flagsJSON) r.publishTileUpdate(ctx, run.HostID) return next, nil }