// Package logs owns per-run flat-file logs and their live SSE fan-out. // A single Writer serialises writes for one run; a Hub keeps a cache // per run so handlers can open/close freely without stepping on each // other. Lines go to disk for persistence (reload + replay) and onto // the events.Hub so the UI tile can tail live. package logs import ( "fmt" "html" "log" "os" "path/filepath" "strings" "sync" "time" "vetting/internal/events" ) type Line struct { TS time.Time Level string // info|warn|error|debug Stage string // optional — one of store.DefaultStageOrder; empty = orchestrator/agent framing Text string } type Writer struct { runID int64 mu sync.Mutex f *os.File hub *events.Hub } // Hub owns the per-run Writers. The orchestrator creates one Hub at // startup and hands it to the api package. type Hub struct { dir string events *events.Hub mu sync.Mutex writers map[int64]*Writer } func NewHub(dir string, ev *events.Hub) (*Hub, error) { if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("mkdir log dir: %w", err) } return &Hub{dir: dir, events: ev, writers: map[int64]*Writer{}}, nil } // WriterFor returns a cached Writer, opening the file lazily. The file // is append-only; if an existing run's log is reopened (e.g. after a // restart) we append rather than truncate so nothing is lost. func (h *Hub) WriterFor(runID int64) (*Writer, error) { h.mu.Lock() defer h.mu.Unlock() if w, ok := h.writers[runID]; ok { return w, nil } path := filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID)) f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return nil, fmt.Errorf("open %s: %w", path, err) } w := &Writer{runID: runID, f: f, hub: h.events} h.writers[runID] = w return w, nil } // Close flushes and closes all open run files. Called from main on // shutdown so the logs aren't left with buffered data. func (h *Hub) Close() { h.mu.Lock() defer h.mu.Unlock() for id, w := range h.writers { if err := w.Close(); err != nil { log.Printf("logs: close run-%d: %v", id, err) } } h.writers = nil } // PathFor returns the on-disk path for a run's log; used by replay // handlers and the report generator. func (h *Hub) PathFor(runID int64) string { return filepath.Join(h.dir, fmt.Sprintf("run-%d.log", runID)) } // Replay reads the on-disk log for a run and returns one //
fragment per line, suitable for inlining into // the "All" log pane on initial page load. Missing file → empty string; // the pane just stays empty until live events arrive. Does not subscribe // to the SSE hub — callers are expected to pair this with a live // sse-swap target on the same element. func (h *Hub) Replay(runID int64) string { path := h.PathFor(runID) b, err := os.ReadFile(path) if err != nil { return "" } var out strings.Builder for _, raw := range strings.Split(string(b), "\n") { if raw == "" { continue } // Format from Append: " " // where LEVEL is right-padded to width 5 (e.g. " INFO", // "ERROR"). TrimLeft the pad before splitting off the level. tsEnd := strings.IndexByte(raw, ' ') if tsEnd < 0 { continue } ts, err := time.Parse(time.RFC3339Nano, raw[:tsEnd]) if err != nil { continue } rest := strings.TrimLeft(raw[tsEnd+1:], " ") lvEnd := strings.IndexByte(rest, ' ') if lvEnd < 0 { continue } level := strings.ToLower(rest[:lvEnd]) text := rest[lvEnd+1:] // Disk format prepends "[stage] " to text when stage was set. stage := "" if strings.HasPrefix(text, "[") { if end := strings.Index(text, "] "); end > 1 { stage = text[1:end] text = text[end+2:] } } out.WriteString(renderLogSSE(Line{TS: ts, Level: level, Stage: stage, Text: text})) } return out.String() } // Append writes a line to disk and publishes an SSE event. Failures // on disk log but don't block the SSE fan-out — the operator can still // see the live tail even if disk IO is degraded. func (w *Writer) Append(line Line) { w.mu.Lock() defer w.mu.Unlock() if line.TS.IsZero() { line.TS = time.Now().UTC() } if line.Level == "" { line.Level = "info" } diskText := line.Text if line.Stage != "" { diskText = "[" + line.Stage + "] " + diskText } stamped := fmt.Sprintf("%s %5s %s\n", line.TS.Format(time.RFC3339Nano), strings.ToUpper(line.Level), diskText) if _, err := w.f.WriteString(stamped); err != nil { log.Printf("logs: write run-%d: %v", w.runID, err) } if w.hub != nil { payload := renderLogSSE(line) w.hub.Publish(events.Event{ Name: fmt.Sprintf("log-%d", w.runID), Payload: payload, }) if line.Stage != "" { w.hub.Publish(events.Event{ Name: fmt.Sprintf("log-%d-%s", w.runID, line.Stage), Payload: payload, }) } } } func (w *Writer) Close() error { w.mu.Lock() defer w.mu.Unlock() if w.f == nil { return nil } err := w.f.Close() w.f = nil return err } // renderLogSSE returns an HTMX-compatible fragment. The detail-page // panes contain
: each event // appends one
to them. Stage, if set, // is rendered as a dim prefix so the "All" pane stays disambiguable // even with multiple stages interleaved. func renderLogSSE(l Line) string { level := strings.ToLower(l.Level) stagePrefix := "" if l.Stage != "" { stagePrefix = fmt.Sprintf(`[%s] `, html.EscapeString(l.Stage)) } return fmt.Sprintf( `
%s %s%s
`, html.EscapeString(level), html.EscapeString(l.TS.Format("15:04:05")), stagePrefix, html.EscapeString(l.Text), ) }