// Package logs owns per-run flat-file logs and their live SSE fan-out. // A single Writer serialises writes for one run; a Hub keeps a cache // per run so handlers can open/close freely without stepping on each // other. Lines go to disk for persistence (reload + replay) and onto // the events.Hub so the UI tile can tail live. package logs import ( "fmt" "html" "log" "os" "path/filepath" "strings" "sync" "time" "vetting/internal/events" ) type Line struct { TS time.Time Level string // info|warn|error|debug Stage string // optional — one of store.DefaultStageOrder; empty = orchestrator/agent framing Text string } type Writer struct { runID int64 mu sync.Mutex f *os.File hub *events.Hub // counters keyed by Stage (empty key = orphan/framing lines) so each // stage's rendered output numbers from 1. Shared with Replay via // seedCounters on Writer open so restarts don't reset to 1 mid-run. counters map[string]int } // Hub owns the per-run Writers. The orchestrator creates one Hub at // startup and hands it to the api package. type Hub struct { dir string events *events.Hub mu sync.Mutex writers map[int64]*Writer } func NewHub(dir string, ev *events.Hub) (*Hub, error) { if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("mkdir log dir: %w", err) } return &Hub{dir: dir, events: ev, writers: map[int64]*Writer{}}, nil } // WriterFor returns a cached Writer, opening the file lazily. The file // is append-only; if an existing run's log is reopened (e.g. after a // restart) we append rather than truncate so nothing is lost. func (h *Hub) WriterFor(runID int64) (*Writer, error) { h.mu.Lock() defer h.mu.Unlock() if w, ok := h.writers[runID]; ok { return w, nil } path := filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID)) f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return nil, fmt.Errorf("open %s: %w", path, err) } w := &Writer{runID: runID, f: f, hub: h.events, counters: seedCounters(path)} h.writers[runID] = w return w, nil } // seedCounters scans the on-disk log and returns the per-stage line // counts that were already rendered for this run. Called on Writer open // so a mid-run process restart continues numbering where it left off // — otherwise Append(line) would emit "1" but Replay already shows // "42" on the reload, and anchor permalinks would collide. func seedCounters(path string) map[string]int { counters := map[string]int{} b, err := os.ReadFile(path) if err != nil { return counters } for _, raw := range strings.Split(string(b), "\n") { if raw == "" { continue } tsEnd := strings.IndexByte(raw, ' ') if tsEnd < 0 { continue } rest := strings.TrimLeft(raw[tsEnd+1:], " ") lvEnd := strings.IndexByte(rest, ' ') if lvEnd < 0 { continue } text := rest[lvEnd+1:] stage := "" if strings.HasPrefix(text, "[") { if end := strings.Index(text, "] "); end > 1 { stage = text[1:end] } } counters[stage]++ } return counters } // Close flushes and closes all open run files. Called from main on // shutdown so the logs aren't left with buffered data. func (h *Hub) Close() { h.mu.Lock() defer h.mu.Unlock() for id, w := range h.writers { if err := w.Close(); err != nil { log.Printf("logs: close run-%d: %v", id, err) } } h.writers = nil } // PathFor returns the on-disk path for a run's log; used by replay // handlers and the report generator. func (h *Hub) PathFor(runID int64) string { return filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID)) } // Replay reads the on-disk log for a run and returns one //
fragment per line, suitable for inlining into // the "All" log pane on initial page load. Missing file → empty string; // the pane just stays empty until live events arrive. Does not subscribe // to the SSE hub — callers are expected to pair this with a live // sse-swap target on the same element. // // Line numbers are rebuilt from scratch here so a page reload shows // stable IDs that match what Append will emit for future lines (the // Writer's counters are seeded from disk on open). func (h *Hub) Replay(runID int64) string { path := h.PathFor(runID) b, err := os.ReadFile(path) if err != nil { return "" } var out strings.Builder counters := map[string]int{} for _, raw := range strings.Split(string(b), "\n") { if raw == "" { continue } // Format from Append: " " // where LEVEL is right-padded to width 5 (e.g. " INFO", // "ERROR"). TrimLeft the pad before splitting off the level. tsEnd := strings.IndexByte(raw, ' ') if tsEnd < 0 { continue } ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd]) if err != nil { continue } rest := strings.TrimLeft(raw[tsEnd+1:], " ") lvEnd := strings.IndexByte(rest, ' ') if lvEnd < 0 { continue } level := strings.ToLower(rest[:lvEnd]) text := rest[lvEnd+1:] // Disk format prepends "[stage] " to text when stage was set. stage := "" if strings.HasPrefix(text, "[") { if end := strings.Index(text, "] "); end > 1 { stage = text[1:end] text = text[end+2:] } } counters[stage]++ out.WriteString(renderLogSSE(runID, counters[stage], Line{TS: ts, Level: level, Stage: stage, Text: text})) } return out.String() } // ReplayByStage scans the on-disk log once and returns a per-stage map // of pre-rendered HTML strings, keyed by stage name (orphan/framing // lines are keyed under ""). This is the one-pass alternative to // calling ReplayForStage per stage: the detail-page renders nine stage // panels, and doing nine file scans per page load is wasteful. Line // numbers are per-stage so they agree with the counters Append uses // for the same run. func (h *Hub) ReplayByStage(runID int64) map[string]string { path := h.PathFor(runID) b, err := os.ReadFile(path) if err != nil { return map[string]string{} } bufs := map[string]*strings.Builder{} counters := map[string]int{} for _, raw := range strings.Split(string(b), "\n") { if raw == "" { continue } tsEnd := strings.IndexByte(raw, ' ') if tsEnd < 0 { continue } ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd]) if err != nil { continue } rest := strings.TrimLeft(raw[tsEnd+1:], " ") lvEnd := strings.IndexByte(rest, ' ') if lvEnd < 0 { continue } level := strings.ToLower(rest[:lvEnd]) text := rest[lvEnd+1:] stage := "" if strings.HasPrefix(text, "[") { if end := strings.Index(text, "] "); end > 1 { stage = text[1:end] text = text[end+2:] } } counters[stage]++ sb, ok := bufs[stage] if !ok { sb = &strings.Builder{} bufs[stage] = sb } sb.WriteString(renderLogSSE(runID, counters[stage], Line{TS: ts, Level: level, Stage: stage, Text: text})) } out := make(map[string]string, len(bufs)) for k, sb := range bufs { out[k] = sb.String() } return out } // ReplayForStage returns only the log lines whose Stage matches stageName // (pass "" for orphan/framing lines). Used by the detail-page ActiveStep // renderer so each expanded step shows only its own log history. Line // numbers here are per-stage so they agree with what Append emits live. func (h *Hub) ReplayForStage(runID int64, stageName string) string { path := h.PathFor(runID) b, err := os.ReadFile(path) if err != nil { return "" } var out strings.Builder ord := 0 for _, raw := range strings.Split(string(b), "\n") { if raw == "" { continue } tsEnd := strings.IndexByte(raw, ' ') if tsEnd < 0 { continue } ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd]) if err != nil { continue } rest := strings.TrimLeft(raw[tsEnd+1:], " ") lvEnd := strings.IndexByte(rest, ' ') if lvEnd < 0 { continue } level := strings.ToLower(rest[:lvEnd]) text := rest[lvEnd+1:] stage := "" if strings.HasPrefix(text, "[") { if end := strings.Index(text, "] "); end > 1 { stage = text[1:end] text = text[end+2:] } } if stage != stageName { continue } ord++ out.WriteString(renderLogSSE(runID, ord, Line{TS: ts, Level: level, Stage: stage, Text: text})) } return out.String() } // Append writes a line to disk and publishes an SSE event. Failures // on disk log but don't block the SSE fan-out — the operator can still // see the live tail even if disk IO is degraded. func (w *Writer) Append(line Line) { w.mu.Lock() defer w.mu.Unlock() if line.TS.IsZero() { line.TS = time.Now().UTC() } if line.Level == "" { line.Level = "info" } diskText := line.Text if line.Stage != "" { diskText = "[" + line.Stage + "] " + diskText } stamped := fmt.Sprintf("%s %5s %s\n", line.TS.Format(time.RFC3339Nano), strings.ToUpper(line.Level), diskText) if _, err := w.f.WriteString(stamped); err != nil { log.Printf("logs: write run-%d: %v", w.runID, err) } if w.counters == nil { w.counters = map[string]int{} } w.counters[line.Stage]++ ord := w.counters[line.Stage] if w.hub != nil { payload := renderLogSSE(w.runID, ord, line) w.hub.Publish(events.Event{ Name: fmt.Sprintf("log-%d", w.runID), Payload: payload, }) if line.Stage != "" { w.hub.Publish(events.Event{ Name: fmt.Sprintf("log-%d-%s", w.runID, line.Stage), Payload: payload, }) } } } func (w *Writer) Close() error { w.mu.Lock() defer w.mu.Unlock() if w.f == nil { return nil } err := w.f.Close() w.f = nil return err } // renderLogSSE returns an HTMX-compatible fragment. The detail-page // panes contain
: each event // appends one
to them. ord is the // per-(run, stage) 1-based line number; combined with runID + stage it // forms a stable permalink id of the form L{run}-{stage}-{ord} (stage // defaults to "all" when the line has no stage, so orphan/framing lines // still anchor uniquely). // // Shape: // //
// # // {ord} // {LEVEL} // 15:04:05 // (optional) [{stage}] // {text} //
func renderLogSSE(runID int64, ord int, l Line) string { level := strings.ToLower(l.Level) stageKey := l.Stage if stageKey == "" { stageKey = "all" } anchorID := fmt.Sprintf("L%d-%s-%d", runID, html.EscapeString(stageKey), ord) stagePrefix := "" if l.Stage != "" { stagePrefix = fmt.Sprintf(`[%s] `, html.EscapeString(l.Stage)) } return fmt.Sprintf( `
#%d%s%s%s%s
`, html.EscapeString(level), anchorID, html.EscapeString(l.TS.Format(time.RFC3339Nano)), anchorID, ord, html.EscapeString(strings.ToUpper(level)), html.EscapeString(l.TS.Format("15:04:05")), stagePrefix, html.EscapeString(l.Text), ) }