Files
Vetting/internal/events/events.go
T
josh 17ec55cb85
CI / Lint + build + test (push) Successful in 1m34s
Release / detect (push) Successful in 4s
Release / build-live-image (push) Has been skipped
Release / bundle (push) Successful in 1m5s
chore: cleanup sprint — dead CSS, dedup helpers, handler refactor
Remove ~126 lines of orphaned CSS from tile slim-down and old detail
layout. Consolidate 4 duplicate duration formatters into shared
elapsed()/fmtElapsed() helpers. Break 160-line Result handler into
focused sub-functions. Implement real Hub.Shutdown() (was a no-op).
Standardize agent error responses to JSON. Replace panic() in router
init with error return. Extract magic numbers as named constants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-21 20:39:38 -04:00

169 lines
3.4 KiB
Go

package events
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
)
// Event is a typed event published on the internal bus. In Phase 1 the
// payload is an already-rendered HTML fragment; later phases will wrap
// structured run state in this same Event envelope.
type Event struct {
Name string // SSE event name (e.g. "heartbeat", "tile-update", "log-line")
Payload string // pre-rendered HTML, ready to write as SSE data
}
type subscriber struct {
id int64
ch chan Event
}
const (
defaultSubscriberBuffer = 32
heartbeatInterval = 15 * time.Second
)
// Hub is an in-process fan-out for SSE subscribers.
type Hub struct {
mu sync.RWMutex
nextID int64
subs map[int64]*subscriber
buffer int
heartbeat time.Duration
done chan struct{}
closeOnce sync.Once
}
func NewHub() *Hub {
h := &Hub{
subs: map[int64]*subscriber{},
buffer: defaultSubscriberBuffer,
heartbeat: heartbeatInterval,
done: make(chan struct{}),
}
go h.heartbeatLoop()
return h
}
func (h *Hub) Publish(ev Event) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, s := range h.subs {
select {
case s.ch <- ev:
default:
// Slow subscriber: drop the event rather than stall other clients.
}
}
}
func (h *Hub) Subscribe() (id int64, ch <-chan Event, cancel func()) {
id = atomic.AddInt64(&h.nextID, 1)
s := &subscriber{id: id, ch: make(chan Event, h.buffer)}
h.mu.Lock()
h.subs[id] = s
h.mu.Unlock()
return id, s.ch, func() {
h.mu.Lock()
delete(h.subs, id)
h.mu.Unlock()
close(s.ch)
}
}
func (h *Hub) heartbeatLoop() {
t := time.NewTicker(h.heartbeat)
defer t.Stop()
for {
select {
case <-h.done:
return
case <-t.C:
h.Publish(Event{
Name: "heartbeat",
Payload: fmt.Sprintf(`<span data-heartbeat="%d"></span>`, time.Now().Unix()),
})
}
}
}
// ServeSSE writes server-sent events for a single subscriber for the
// lifetime of the request. Each Event becomes one SSE message.
func (h *Hub) ServeSSE(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
_, eventsCh, cancel := h.Subscribe()
defer cancel()
fmt.Fprintf(w, "event: hello\ndata: ok\n\n")
flusher.Flush()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case ev, ok := <-eventsCh:
if !ok {
return
}
writeSSE(w, ev)
flusher.Flush()
}
}
}
func writeSSE(w http.ResponseWriter, ev Event) {
if ev.Name != "" {
fmt.Fprintf(w, "event: %s\n", ev.Name)
}
for _, line := range splitLines(ev.Payload) {
fmt.Fprintf(w, "data: %s\n", line)
}
fmt.Fprint(w, "\n")
}
func splitLines(s string) []string {
if s == "" {
return []string{""}
}
out := []string{}
start := 0
for i := 0; i < len(s); i++ {
if s[i] == '\n' {
out = append(out, s[start:i])
start = i + 1
}
}
if start <= len(s) {
out = append(out, s[start:])
}
return out
}
// Shutdown stops the heartbeat goroutine and closes all subscriber channels.
func (h *Hub) Shutdown(_ context.Context) error {
h.closeOnce.Do(func() {
close(h.done)
h.mu.Lock()
for id, s := range h.subs {
close(s.ch)
delete(h.subs, id)
}
h.mu.Unlock()
})
return nil
}