Initial implementation: host lifecycle + PXE + admin dashboard
Go service for Proxmox homelab cluster provisioning. Handles PXE boot, Proxmox autoinstall (answer file generation), cluster join via SSH, and Infrastructure API registration. - Host state machine (registered → pxe_ready → installing → ready) - dnsmasq supervisor with MAC-based allowlist - iPXE script and Proxmox answer file generation - First-boot phone-home → cluster join → infra registration - Operation locking with expiry (409 on conflict) - SSE event hub for real-time dashboard updates - Admin dashboard (host grid, detail, registration form) - Config-driven server types with hot-reload - Docker deployment (multi-stage fat image) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,160 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
Name string
|
||||
Payload string
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
id int64
|
||||
ch chan Event
|
||||
}
|
||||
|
||||
const (
|
||||
defaultBuffer = 32
|
||||
heartbeatInterval = 15 * time.Second
|
||||
)
|
||||
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
nextID int64
|
||||
subs map[int64]*subscriber
|
||||
buffer int
|
||||
heartbeat time.Duration
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
h := &Hub{
|
||||
subs: map[int64]*subscriber{},
|
||||
buffer: defaultBuffer,
|
||||
heartbeat: heartbeatInterval,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go h.heartbeatLoop()
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *Hub) Publish(ev Event) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for _, s := range h.subs {
|
||||
select {
|
||||
case s.ch <- ev:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Subscribe() (id int64, ch <-chan Event, cancel func()) {
|
||||
id = atomic.AddInt64(&h.nextID, 1)
|
||||
s := &subscriber{id: id, ch: make(chan Event, h.buffer)}
|
||||
h.mu.Lock()
|
||||
h.subs[id] = s
|
||||
h.mu.Unlock()
|
||||
return id, s.ch, func() {
|
||||
h.mu.Lock()
|
||||
delete(h.subs, id)
|
||||
h.mu.Unlock()
|
||||
close(s.ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) heartbeatLoop() {
|
||||
t := time.NewTicker(h.heartbeat)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.done:
|
||||
return
|
||||
case <-t.C:
|
||||
h.Publish(Event{
|
||||
Name: "heartbeat",
|
||||
Payload: fmt.Sprintf(`<span data-heartbeat="%d"></span>`, time.Now().Unix()),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ServeSSE(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
|
||||
_, eventsCh, cancel := h.Subscribe()
|
||||
defer cancel()
|
||||
|
||||
fmt.Fprintf(w, "event: hello\ndata: ok\n\n")
|
||||
flusher.Flush()
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ev, ok := <-eventsCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
writeSSE(w, ev)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeSSE(w http.ResponseWriter, ev Event) {
|
||||
if ev.Name != "" {
|
||||
fmt.Fprintf(w, "event: %s\n", ev.Name)
|
||||
}
|
||||
for _, line := range splitLines(ev.Payload) {
|
||||
fmt.Fprintf(w, "data: %s\n", line)
|
||||
}
|
||||
fmt.Fprint(w, "\n")
|
||||
}
|
||||
|
||||
func splitLines(s string) []string {
|
||||
if s == "" {
|
||||
return []string{""}
|
||||
}
|
||||
out := []string{}
|
||||
start := 0
|
||||
for i := 0; i < len(s); i++ {
|
||||
if s[i] == '\n' {
|
||||
out = append(out, s[start:i])
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
if start <= len(s) {
|
||||
out = append(out, s[start:])
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (h *Hub) Shutdown(_ context.Context) error {
|
||||
h.closeOnce.Do(func() {
|
||||
close(h.done)
|
||||
h.mu.Lock()
|
||||
for id, s := range h.subs {
|
||||
close(s.ch)
|
||||
delete(h.subs, id)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user