8367ec2a9f
Add 4 new doc files (configuration reference, development guide, API reference with full request/response schemas, database schema), expand the README with a feature list and how-it-works walkthrough, fix missing Firmware and Burn stages in architecture.md and test-suite.md, add threshold engine and host-mode agent sections, and add godoc comments to 11 packages and 6 model types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
172 lines
3.6 KiB
Go
172 lines
3.6 KiB
Go
// Package events provides an in-process SSE fan-out hub. Browser
|
|
// clients subscribe via GET /events; the orchestrator publishes
|
|
// pre-rendered HTML fragments that HTMX swaps into the DOM.
|
|
package events
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Event is a typed event published on the internal bus. In Phase 1 the
|
|
// payload is an already-rendered HTML fragment; later phases will wrap
|
|
// structured run state in this same Event envelope.
|
|
type Event struct {
|
|
Name string // SSE event name (e.g. "heartbeat", "tile-update", "log-line")
|
|
Payload string // pre-rendered HTML, ready to write as SSE data
|
|
}
|
|
|
|
type subscriber struct {
|
|
id int64
|
|
ch chan Event
|
|
}
|
|
|
|
const (
|
|
defaultSubscriberBuffer = 32
|
|
heartbeatInterval = 15 * time.Second
|
|
)
|
|
|
|
// Hub is an in-process fan-out for SSE subscribers.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
nextID int64
|
|
subs map[int64]*subscriber
|
|
buffer int
|
|
heartbeat time.Duration
|
|
done chan struct{}
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func NewHub() *Hub {
|
|
h := &Hub{
|
|
subs: map[int64]*subscriber{},
|
|
buffer: defaultSubscriberBuffer,
|
|
heartbeat: heartbeatInterval,
|
|
done: make(chan struct{}),
|
|
}
|
|
go h.heartbeatLoop()
|
|
return h
|
|
}
|
|
|
|
func (h *Hub) Publish(ev Event) {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
for _, s := range h.subs {
|
|
select {
|
|
case s.ch <- ev:
|
|
default:
|
|
// Slow subscriber: drop the event rather than stall other clients.
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Subscribe() (id int64, ch <-chan Event, cancel func()) {
|
|
id = atomic.AddInt64(&h.nextID, 1)
|
|
s := &subscriber{id: id, ch: make(chan Event, h.buffer)}
|
|
h.mu.Lock()
|
|
h.subs[id] = s
|
|
h.mu.Unlock()
|
|
return id, s.ch, func() {
|
|
h.mu.Lock()
|
|
delete(h.subs, id)
|
|
h.mu.Unlock()
|
|
close(s.ch)
|
|
}
|
|
}
|
|
|
|
func (h *Hub) heartbeatLoop() {
|
|
t := time.NewTicker(h.heartbeat)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-h.done:
|
|
return
|
|
case <-t.C:
|
|
h.Publish(Event{
|
|
Name: "heartbeat",
|
|
Payload: fmt.Sprintf(`<span data-heartbeat="%d"></span>`, time.Now().Unix()),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// ServeSSE writes server-sent events for a single subscriber for the
|
|
// lifetime of the request. Each Event becomes one SSE message.
|
|
func (h *Hub) ServeSSE(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
|
|
_, eventsCh, cancel := h.Subscribe()
|
|
defer cancel()
|
|
|
|
fmt.Fprintf(w, "event: hello\ndata: ok\n\n")
|
|
flusher.Flush()
|
|
|
|
ctx := r.Context()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case ev, ok := <-eventsCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
writeSSE(w, ev)
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeSSE(w http.ResponseWriter, ev Event) {
|
|
if ev.Name != "" {
|
|
fmt.Fprintf(w, "event: %s\n", ev.Name)
|
|
}
|
|
for _, line := range splitLines(ev.Payload) {
|
|
fmt.Fprintf(w, "data: %s\n", line)
|
|
}
|
|
fmt.Fprint(w, "\n")
|
|
}
|
|
|
|
func splitLines(s string) []string {
|
|
if s == "" {
|
|
return []string{""}
|
|
}
|
|
out := []string{}
|
|
start := 0
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == '\n' {
|
|
out = append(out, s[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
if start <= len(s) {
|
|
out = append(out, s[start:])
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Shutdown stops the heartbeat goroutine and closes all subscriber channels.
|
|
func (h *Hub) Shutdown(_ context.Context) error {
|
|
h.closeOnce.Do(func() {
|
|
close(h.done)
|
|
h.mu.Lock()
|
|
for id, s := range h.subs {
|
|
close(s.ch)
|
|
delete(h.subs, id)
|
|
}
|
|
h.mu.Unlock()
|
|
})
|
|
return nil
|
|
}
|