Files
Vetting/agent/sensor_mux.go
T
josh 23c689aa5b
CI / Lint + build + test (push) Failing after 1m57s
Release / release (push) Has been cancelled
deep profile + threshold gating + firmware stage + Burn super-stage
Ships all five phases of the deep-profile overhaul together. Runs now
carry a profile (quick/deep/soak); every profile walks the same
11-stage order — Inventory → Firmware → SpecValidate → SMART →
CPUStress → Storage → Network → Burn → GPU → PSU → Reporting —
with only per-stage durations and concurrency scaled.

Phase 1: profiles.ProfileRegistry loaded from vetting.yaml; runs.profile
column + CreateWithProfile; threshold table + evaluator seeded per-run
from the shared vetting.thresholds block; breach flips result at
/sensor + /result.

Phase 2: upgraded CPUStress (stress-ng --cpu-method=all --verify +
EDAC/MCE poll), Storage (fio --verify=md5 + SMART start/end delta),
Network (sustained iperf + /proc/net/dev deltas) with per-profile
knobs from Deps.

Phase 3: Burn super-stage with goroutine fan-out for CPU + memory +
fio + iperf, PSU rails sampled across the Burn window, SensorMux
(2 s flush, 500-sample cap) to absorb backpressure.

Phase 4: Firmware stage + firmware_snapshots table; probes dmidecode
(BIOS), ipmitool (BMC), ethtool -i (NIC), nvme (sysfs + id-ctrl),
lspci (HBA), /proc/cpuinfo (microcode). spec.DiffFirmware folds into
SpecValidate with pin-by-identifier and fan-out-across-component
matching; mismatches park the run in FailedHolding.

Phase 5: profile radio on the host start form, profile chip on the
run header, Firmware section in the HTML report, coverage artifact
uploaded from CI, agent/tests/fakes/ scaffold with Deps.LookPath
seam + stress_ng and dmidecode example fakes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 22:50:57 -04:00

140 lines
3.8 KiB
Go

package agent
import (
"context"
"log"
"sync"
"time"
)
// SensorMux coalesces sensor samples from every stage + sidecar into a
// single batched HTTP POST stream. Without it, a Burn run that fans out
// four concurrent workloads + thermal + PSU + EDAC sidecars can push ~50
// samples/sec, each as a separate /sensor request — enough to either
// saturate the orchestrator's request budget or stall a stage on its
// own sensor-forwarding path.
//
// Contract:
// - Send is non-blocking; a full input channel drops a batch on the
// floor and logs a warning. That's preferred over back-pressuring
// a workload goroutine and skewing its timing.
// - Flush happens every flushInterval *or* whenever the pending buffer
// exceeds maxBatch samples. Chunk-at-flush keeps each HTTP request
// bounded regardless of the incoming rate.
// - Close flushes whatever is in the buffer. Callers that need the
// final flush to reach the server should defer Close before other
// deferred shutdown work.
type SensorMux struct {
c *Client
in chan []SensorSample
flushInterval time.Duration
maxBatch int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewSensorMux starts the flush loop. Callers hand the returned mux to
// every code path that previously called Client.Sensor directly (stage
// Deps.Sensor, thermal sidecar, EDAC sidecar). The mux lives for the
// duration of the agent run.
func NewSensorMux(parent context.Context, c *Client) *SensorMux {
ctx, cancel := context.WithCancel(parent)
m := &SensorMux{
c: c,
in: make(chan []SensorSample, 32),
flushInterval: 2 * time.Second,
maxBatch: 500,
ctx: ctx,
cancel: cancel,
}
m.wg.Add(1)
go m.loop()
return m
}
// Send enqueues a batch for the next flush tick. Empty batches are
// silently ignored so callers with conditional sample lists don't need
// to guard the call site.
func (m *SensorMux) Send(samples []SensorSample) {
if m == nil || len(samples) == 0 {
return
}
// Copy so caller mutations don't race with the flush loop.
out := make([]SensorSample, len(samples))
copy(out, samples)
select {
case m.in <- out:
default:
log.Printf("sensor mux: input channel full, dropping %d samples", len(out))
}
}
// Close stops the flush loop and flushes the residual buffer. Safe to
// call twice (the second is a no-op because the internal context is
// already cancelled).
func (m *SensorMux) Close() {
if m == nil {
return
}
m.cancel()
m.wg.Wait()
}
func (m *SensorMux) loop() {
defer m.wg.Done()
buf := make([]SensorSample, 0, m.maxBatch)
t := time.NewTicker(m.flushInterval)
defer t.Stop()
for {
select {
case <-m.ctx.Done():
m.flushChunks(buf)
buf = nil
// Drain whatever is still sitting in the channel so a
// workload that pushed right before Close doesn't lose
// those final samples.
for {
select {
case batch := <-m.in:
m.flushChunks(batch)
default:
return
}
}
case batch := <-m.in:
buf = append(buf, batch...)
if len(buf) >= m.maxBatch {
m.flushChunks(buf)
buf = buf[:0]
}
case <-t.C:
if len(buf) > 0 {
m.flushChunks(buf)
buf = buf[:0]
}
}
}
}
// flushChunks splits a potentially-large slice into maxBatch-sized
// HTTP requests so no single POST carries more than the configured cap.
// A 10-second per-chunk timeout keeps a stalled orchestrator from
// freezing the flush loop.
func (m *SensorMux) flushChunks(all []SensorSample) {
for len(all) > 0 {
n := len(all)
if n > m.maxBatch {
n = m.maxBatch
}
chunk := all[:n]
all = all[n:]
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := m.c.Sensor(ctx, chunk); err != nil {
log.Printf("sensor mux: flush of %d samples failed: %v", len(chunk), err)
}
cancel()
}
}