package events import ( "context" "fmt" "net/http" "sync" "sync/atomic" "time" ) type Event struct { Name string Payload string } type subscriber struct { id int64 ch chan Event } const ( defaultBuffer = 32 heartbeatInterval = 15 * time.Second ) type Hub struct { mu sync.RWMutex nextID int64 subs map[int64]*subscriber buffer int heartbeat time.Duration done chan struct{} closeOnce sync.Once } func NewHub() *Hub { h := &Hub{ subs: map[int64]*subscriber{}, buffer: defaultBuffer, heartbeat: heartbeatInterval, done: make(chan struct{}), } go h.heartbeatLoop() return h } func (h *Hub) Publish(ev Event) { h.mu.RLock() defer h.mu.RUnlock() for _, s := range h.subs { select { case s.ch <- ev: default: } } } func (h *Hub) Subscribe() (id int64, ch <-chan Event, cancel func()) { id = atomic.AddInt64(&h.nextID, 1) s := &subscriber{id: id, ch: make(chan Event, h.buffer)} h.mu.Lock() h.subs[id] = s h.mu.Unlock() return id, s.ch, func() { h.mu.Lock() delete(h.subs, id) h.mu.Unlock() close(s.ch) } } func (h *Hub) heartbeatLoop() { t := time.NewTicker(h.heartbeat) defer t.Stop() for { select { case <-h.done: return case <-t.C: h.Publish(Event{ Name: "heartbeat", Payload: fmt.Sprintf(``, time.Now().Unix()), }) } } } func (h *Hub) ServeSSE(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") _, eventsCh, cancel := h.Subscribe() defer cancel() fmt.Fprintf(w, "event: hello\ndata: ok\n\n") flusher.Flush() ctx := r.Context() for { select { case <-ctx.Done(): return case ev, ok := <-eventsCh: if !ok { return } writeSSE(w, ev) flusher.Flush() } } } func writeSSE(w http.ResponseWriter, ev Event) { if ev.Name != "" { fmt.Fprintf(w, "event: %s\n", ev.Name) } for _, line := range splitLines(ev.Payload) { fmt.Fprintf(w, "data: %s\n", line) } fmt.Fprint(w, "\n") } func splitLines(s string) []string { if s == "" { return []string{""} } out := []string{} start := 0 for i := 0; i < len(s); i++ { if s[i] == '\n' { out = append(out, s[start:i]) start = i + 1 } } if start <= len(s) { out = append(out, s[start:]) } return out } func (h *Hub) Shutdown(_ context.Context) error { h.closeOnce.Do(func() { close(h.done) h.mu.Lock() for id, s := range h.subs { close(s.ch) delete(h.subs, id) } h.mu.Unlock() }) return nil }