Files
josh f79fe0f0db
CI / Lint + build + test (push) Successful in 1m26s
Release / release (push) Successful in 6m47s
ui: GitHub-Actions-style detail page, sub-steps, mini-tile run-view
Reshapes the detail page into a run-view: hybrid horizontal pipeline
+ expanded active-step pane with sub-steps, a per-step log pane with
line-numbered permalinks and client-side search, and a runs-history
sidebar that navigates via ?run=N. Default step is server-picked
(running → failed → Reporting) so the operator lands on the thing
that's moving.

Adds a sub_steps table + SSE topic (substep-{run}-{stage}-{ordinal})
so per-disk and per-pass work (SMART, CPUStress CPU/RAM, Storage,
GPU) is visible in the UI instead of buried in stage summary JSON.
Agent emits sub-step reports from existing per-iteration loops.

Dashboard tiles become a mini run-view with a 9-dot step strip so
the operator reads run health across the whole grid at a glance.
Register page gets the same card shell + button styling.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 19:00:11 -04:00

379 lines
11 KiB
Go

// Package logs owns per-run flat-file logs and their live SSE fan-out.
// A single Writer serialises writes for one run; a Hub keeps a cache
// per run so handlers can open/close freely without stepping on each
// other. Lines go to disk for persistence (reload + replay) and onto
// the events.Hub so the UI tile can tail live.
package logs
import (
"fmt"
"html"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"vetting/internal/events"
)
type Line struct {
TS time.Time
Level string // info|warn|error|debug
Stage string // optional — one of store.DefaultStageOrder; empty = orchestrator/agent framing
Text string
}
type Writer struct {
runID int64
mu sync.Mutex
f *os.File
hub *events.Hub
// counters keyed by Stage (empty key = orphan/framing lines) so each
// stage's rendered output numbers from 1. Shared with Replay via
// seedCounters on Writer open so restarts don't reset to 1 mid-run.
counters map[string]int
}
// Hub owns the per-run Writers. The orchestrator creates one Hub at
// startup and hands it to the api package.
type Hub struct {
dir string
events *events.Hub
mu sync.Mutex
writers map[int64]*Writer
}
func NewHub(dir string, ev *events.Hub) (*Hub, error) {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("mkdir log dir: %w", err)
}
return &Hub{dir: dir, events: ev, writers: map[int64]*Writer{}}, nil
}
// WriterFor returns a cached Writer, opening the file lazily. The file
// is append-only; if an existing run's log is reopened (e.g. after a
// restart) we append rather than truncate so nothing is lost.
func (h *Hub) WriterFor(runID int64) (*Writer, error) {
h.mu.Lock()
defer h.mu.Unlock()
if w, ok := h.writers[runID]; ok {
return w, nil
}
path := filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID))
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, fmt.Errorf("open %s: %w", path, err)
}
w := &Writer{runID: runID, f: f, hub: h.events, counters: seedCounters(path)}
h.writers[runID] = w
return w, nil
}
// seedCounters scans the on-disk log and returns the per-stage line
// counts that were already rendered for this run. Called on Writer open
// so a mid-run process restart continues numbering where it left off
// — otherwise Append(line) would emit "1" but Replay already shows
// "42" on the reload, and anchor permalinks would collide.
func seedCounters(path string) map[string]int {
counters := map[string]int{}
b, err := os.ReadFile(path)
if err != nil {
return counters
}
for _, raw := range strings.Split(string(b), "\n") {
if raw == "" {
continue
}
tsEnd := strings.IndexByte(raw, ' ')
if tsEnd < 0 {
continue
}
rest := strings.TrimLeft(raw[tsEnd+1:], " ")
lvEnd := strings.IndexByte(rest, ' ')
if lvEnd < 0 {
continue
}
text := rest[lvEnd+1:]
stage := ""
if strings.HasPrefix(text, "[") {
if end := strings.Index(text, "] "); end > 1 {
stage = text[1:end]
}
}
counters[stage]++
}
return counters
}
// Close flushes and closes all open run files. Called from main on
// shutdown so the logs aren't left with buffered data.
func (h *Hub) Close() {
h.mu.Lock()
defer h.mu.Unlock()
for id, w := range h.writers {
if err := w.Close(); err != nil {
log.Printf("logs: close run-%d: %v", id, err)
}
}
h.writers = nil
}
// PathFor returns the on-disk path for a run's log; used by replay
// handlers and the report generator.
func (h *Hub) PathFor(runID int64) string {
return filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID))
}
// Replay reads the on-disk log for a run and returns one
// <div class="log-line"> fragment per line, suitable for inlining into
// the "All" log pane on initial page load. Missing file → empty string;
// the pane just stays empty until live events arrive. Does not subscribe
// to the SSE hub — callers are expected to pair this with a live
// sse-swap target on the same element.
//
// Line numbers are rebuilt from scratch here so a page reload shows
// stable IDs that match what Append will emit for future lines (the
// Writer's counters are seeded from disk on open).
func (h *Hub) Replay(runID int64) string {
path := h.PathFor(runID)
b, err := os.ReadFile(path)
if err != nil {
return ""
}
var out strings.Builder
counters := map[string]int{}
for _, raw := range strings.Split(string(b), "\n") {
if raw == "" {
continue
}
// Format from Append: "<RFC3339Nano> <LEVEL> <text>"
// where LEVEL is right-padded to width 5 (e.g. " INFO",
// "ERROR"). TrimLeft the pad before splitting off the level.
tsEnd := strings.IndexByte(raw, ' ')
if tsEnd < 0 {
continue
}
ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd])
if err != nil {
continue
}
rest := strings.TrimLeft(raw[tsEnd+1:], " ")
lvEnd := strings.IndexByte(rest, ' ')
if lvEnd < 0 {
continue
}
level := strings.ToLower(rest[:lvEnd])
text := rest[lvEnd+1:]
// Disk format prepends "[stage] " to text when stage was set.
stage := ""
if strings.HasPrefix(text, "[") {
if end := strings.Index(text, "] "); end > 1 {
stage = text[1:end]
text = text[end+2:]
}
}
counters[stage]++
out.WriteString(renderLogSSE(runID, counters[stage], Line{TS: ts, Level: level, Stage: stage, Text: text}))
}
return out.String()
}
// ReplayByStage scans the on-disk log once and returns a per-stage map
// of pre-rendered HTML strings, keyed by stage name (orphan/framing
// lines are keyed under ""). This is the one-pass alternative to
// calling ReplayForStage per stage: the detail-page renders nine stage
// panels, and doing nine file scans per page load is wasteful. Line
// numbers are per-stage so they agree with the counters Append uses
// for the same run.
func (h *Hub) ReplayByStage(runID int64) map[string]string {
path := h.PathFor(runID)
b, err := os.ReadFile(path)
if err != nil {
return map[string]string{}
}
bufs := map[string]*strings.Builder{}
counters := map[string]int{}
for _, raw := range strings.Split(string(b), "\n") {
if raw == "" {
continue
}
tsEnd := strings.IndexByte(raw, ' ')
if tsEnd < 0 {
continue
}
ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd])
if err != nil {
continue
}
rest := strings.TrimLeft(raw[tsEnd+1:], " ")
lvEnd := strings.IndexByte(rest, ' ')
if lvEnd < 0 {
continue
}
level := strings.ToLower(rest[:lvEnd])
text := rest[lvEnd+1:]
stage := ""
if strings.HasPrefix(text, "[") {
if end := strings.Index(text, "] "); end > 1 {
stage = text[1:end]
text = text[end+2:]
}
}
counters[stage]++
sb, ok := bufs[stage]
if !ok {
sb = &strings.Builder{}
bufs[stage] = sb
}
sb.WriteString(renderLogSSE(runID, counters[stage], Line{TS: ts, Level: level, Stage: stage, Text: text}))
}
out := make(map[string]string, len(bufs))
for k, sb := range bufs {
out[k] = sb.String()
}
return out
}
// ReplayForStage returns only the log lines whose Stage matches stageName
// (pass "" for orphan/framing lines). Used by the detail-page ActiveStep
// renderer so each expanded step shows only its own log history. Line
// numbers here are per-stage so they agree with what Append emits live.
func (h *Hub) ReplayForStage(runID int64, stageName string) string {
path := h.PathFor(runID)
b, err := os.ReadFile(path)
if err != nil {
return ""
}
var out strings.Builder
ord := 0
for _, raw := range strings.Split(string(b), "\n") {
if raw == "" {
continue
}
tsEnd := strings.IndexByte(raw, ' ')
if tsEnd < 0 {
continue
}
ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd])
if err != nil {
continue
}
rest := strings.TrimLeft(raw[tsEnd+1:], " ")
lvEnd := strings.IndexByte(rest, ' ')
if lvEnd < 0 {
continue
}
level := strings.ToLower(rest[:lvEnd])
text := rest[lvEnd+1:]
stage := ""
if strings.HasPrefix(text, "[") {
if end := strings.Index(text, "] "); end > 1 {
stage = text[1:end]
text = text[end+2:]
}
}
if stage != stageName {
continue
}
ord++
out.WriteString(renderLogSSE(runID, ord, Line{TS: ts, Level: level, Stage: stage, Text: text}))
}
return out.String()
}
// Append writes a line to disk and publishes an SSE event. Failures
// on disk log but don't block the SSE fan-out — the operator can still
// see the live tail even if disk IO is degraded.
func (w *Writer) Append(line Line) {
w.mu.Lock()
defer w.mu.Unlock()
if line.TS.IsZero() {
line.TS = time.Now().UTC()
}
if line.Level == "" {
line.Level = "info"
}
diskText := line.Text
if line.Stage != "" {
diskText = "[" + line.Stage + "] " + diskText
}
stamped := fmt.Sprintf("%s %5s %s\n", line.TS.Format(time.RFC3339Nano), strings.ToUpper(line.Level), diskText)
if _, err := w.f.WriteString(stamped); err != nil {
log.Printf("logs: write run-%d: %v", w.runID, err)
}
if w.counters == nil {
w.counters = map[string]int{}
}
w.counters[line.Stage]++
ord := w.counters[line.Stage]
if w.hub != nil {
payload := renderLogSSE(w.runID, ord, line)
w.hub.Publish(events.Event{
Name: fmt.Sprintf("log-%d", w.runID),
Payload: payload,
})
if line.Stage != "" {
w.hub.Publish(events.Event{
Name: fmt.Sprintf("log-%d-%s", w.runID, line.Stage),
Payload: payload,
})
}
}
}
func (w *Writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.f == nil {
return nil
}
err := w.f.Close()
w.f = nil
return err
}
// renderLogSSE returns an HTMX-compatible fragment. The detail-page
// panes contain <div id="log-N-..." hx-swap="beforeend">: each event
// appends one <div class="log-line log-LEVEL"> to them. ord is the
// per-(run, stage) 1-based line number; combined with runID + stage it
// forms a stable permalink id of the form L{run}-{stage}-{ord} (stage
// defaults to "all" when the line has no stage, so orphan/framing lines
// still anchor uniquely).
//
// Shape:
//
// <div class="log-line log-{level}" id="L{run}-{stage}-{ord}" data-ts="RFC3339Nano">
// <a class="log-anchor" href="#L{run}-{stage}-{ord}">#</a>
// <span class="ln">{ord}</span>
// <span class="lvl">{LEVEL}</span>
// <span class="log-ts">15:04:05</span>
// (optional) <span class="log-stage">[{stage}]</span>
// <span class="log-text">{text}</span>
// </div>
func renderLogSSE(runID int64, ord int, l Line) string {
level := strings.ToLower(l.Level)
stageKey := l.Stage
if stageKey == "" {
stageKey = "all"
}
anchorID := fmt.Sprintf("L%d-%s-%d", runID, html.EscapeString(stageKey), ord)
stagePrefix := ""
if l.Stage != "" {
stagePrefix = fmt.Sprintf(`<span class="log-stage">[%s]</span> `, html.EscapeString(l.Stage))
}
return fmt.Sprintf(
`<div class="log-line log-%s" id="%s" data-ts="%s"><a class="log-anchor" href="#%s">#</a><span class="ln">%d</span><span class="lvl">%s</span><span class="log-ts">%s</span>%s<span class="log-text">%s</span></div>`,
html.EscapeString(level),
anchorID,
html.EscapeString(l.TS.Format(time.RFC3339Nano)),
anchorID,
ord,
html.EscapeString(strings.ToUpper(level)),
html.EscapeString(l.TS.Format("15:04:05")),
stagePrefix,
html.EscapeString(l.Text),
)
}