package agent import ( "context" "log" "sync" "time" ) // SensorMux coalesces sensor samples from every stage + sidecar into a // single batched HTTP POST stream. Without it, a Burn run that fans out // four concurrent workloads + thermal + PSU + EDAC sidecars can push ~50 // samples/sec, each as a separate /sensor request — enough to either // saturate the orchestrator's request budget or stall a stage on its // own sensor-forwarding path. // // Contract: // - Send is non-blocking; a full input channel drops a batch on the // floor and logs a warning. That's preferred over back-pressuring // a workload goroutine and skewing its timing. // - Flush happens every flushInterval *or* whenever the pending buffer // exceeds maxBatch samples. Chunk-at-flush keeps each HTTP request // bounded regardless of the incoming rate. // - Close flushes whatever is in the buffer. Callers that need the // final flush to reach the server should defer Close before other // deferred shutdown work. type SensorMux struct { c *Client in chan []SensorSample flushInterval time.Duration maxBatch int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewSensorMux starts the flush loop. Callers hand the returned mux to // every code path that previously called Client.Sensor directly (stage // Deps.Sensor, thermal sidecar, EDAC sidecar). The mux lives for the // duration of the agent run. func NewSensorMux(parent context.Context, c *Client) *SensorMux { ctx, cancel := context.WithCancel(parent) m := &SensorMux{ c: c, in: make(chan []SensorSample, 32), flushInterval: 2 * time.Second, maxBatch: 500, ctx: ctx, cancel: cancel, } m.wg.Add(1) go m.loop() return m } // Send enqueues a batch for the next flush tick. Empty batches are // silently ignored so callers with conditional sample lists don't need // to guard the call site. func (m *SensorMux) Send(samples []SensorSample) { if m == nil || len(samples) == 0 { return } // Copy so caller mutations don't race with the flush loop. out := make([]SensorSample, len(samples)) copy(out, samples) select { case m.in <- out: default: log.Printf("sensor mux: input channel full, dropping %d samples", len(out)) } } // Close stops the flush loop and flushes the residual buffer. Safe to // call twice (the second is a no-op because the internal context is // already cancelled). func (m *SensorMux) Close() { if m == nil { return } m.cancel() m.wg.Wait() } func (m *SensorMux) loop() { defer m.wg.Done() buf := make([]SensorSample, 0, m.maxBatch) t := time.NewTicker(m.flushInterval) defer t.Stop() for { select { case <-m.ctx.Done(): m.flushChunks(buf) buf = nil // Drain whatever is still sitting in the channel so a // workload that pushed right before Close doesn't lose // those final samples. for { select { case batch := <-m.in: m.flushChunks(batch) default: return } } case batch := <-m.in: buf = append(buf, batch...) if len(buf) >= m.maxBatch { m.flushChunks(buf) buf = buf[:0] } case <-t.C: if len(buf) > 0 { m.flushChunks(buf) buf = buf[:0] } } } } // flushChunks splits a potentially-large slice into maxBatch-sized // HTTP requests so no single POST carries more than the configured cap. // A 10-second per-chunk timeout keeps a stalled orchestrator from // freezing the flush loop. func (m *SensorMux) flushChunks(all []SensorSample) { for len(all) > 0 { n := len(all) if n > m.maxBatch { n = m.maxBatch } chunk := all[:n] all = all[n:] ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if err := m.c.Sensor(ctx, chunk); err != nil { log.Printf("sensor mux: flush of %d samples failed: %v", len(chunk), err) } cancel() } }