23c689aa5b
Ships all five phases of the deep-profile overhaul together. Runs now carry a profile (quick/deep/soak); every profile walks the same 11-stage order — Inventory → Firmware → SpecValidate → SMART → CPUStress → Storage → Network → Burn → GPU → PSU → Reporting — with only per-stage durations and concurrency scaled. Phase 1: profiles.ProfileRegistry loaded from vetting.yaml; runs.profile column + CreateWithProfile; threshold table + evaluator seeded per-run from the shared vetting.thresholds block; breach flips result at /sensor + /result. Phase 2: upgraded CPUStress (stress-ng --cpu-method=all --verify + EDAC/MCE poll), Storage (fio --verify=md5 + SMART start/end delta), Network (sustained iperf + /proc/net/dev deltas) with per-profile knobs from Deps. Phase 3: Burn super-stage with goroutine fan-out for CPU + memory + fio + iperf, PSU rails sampled across the Burn window, SensorMux (2 s flush, 500-sample cap) to absorb backpressure. Phase 4: Firmware stage + firmware_snapshots table; probes dmidecode (BIOS), ipmitool (BMC), ethtool -i (NIC), nvme (sysfs + id-ctrl), lspci (HBA), /proc/cpuinfo (microcode). spec.DiffFirmware folds into SpecValidate with pin-by-identifier and fan-out-across-component matching; mismatches park the run in FailedHolding. Phase 5: profile radio on the host start form, profile chip on the run header, Firmware section in the HTML report, coverage artifact uploaded from CI, agent/tests/fakes/ scaffold with Deps.LookPath seam + stress_ng and dmidecode example fakes. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
140 lines
3.8 KiB
Go
140 lines
3.8 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SensorMux coalesces sensor samples from every stage + sidecar into a
|
|
// single batched HTTP POST stream. Without it, a Burn run that fans out
|
|
// four concurrent workloads + thermal + PSU + EDAC sidecars can push ~50
|
|
// samples/sec, each as a separate /sensor request — enough to either
|
|
// saturate the orchestrator's request budget or stall a stage on its
|
|
// own sensor-forwarding path.
|
|
//
|
|
// Contract:
|
|
// - Send is non-blocking; a full input channel drops a batch on the
|
|
// floor and logs a warning. That's preferred over back-pressuring
|
|
// a workload goroutine and skewing its timing.
|
|
// - Flush happens every flushInterval *or* whenever the pending buffer
|
|
// exceeds maxBatch samples. Chunk-at-flush keeps each HTTP request
|
|
// bounded regardless of the incoming rate.
|
|
// - Close flushes whatever is in the buffer. Callers that need the
|
|
// final flush to reach the server should defer Close before other
|
|
// deferred shutdown work.
|
|
type SensorMux struct {
|
|
c *Client
|
|
in chan []SensorSample
|
|
flushInterval time.Duration
|
|
maxBatch int
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewSensorMux starts the flush loop. Callers hand the returned mux to
|
|
// every code path that previously called Client.Sensor directly (stage
|
|
// Deps.Sensor, thermal sidecar, EDAC sidecar). The mux lives for the
|
|
// duration of the agent run.
|
|
func NewSensorMux(parent context.Context, c *Client) *SensorMux {
|
|
ctx, cancel := context.WithCancel(parent)
|
|
m := &SensorMux{
|
|
c: c,
|
|
in: make(chan []SensorSample, 32),
|
|
flushInterval: 2 * time.Second,
|
|
maxBatch: 500,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
m.wg.Add(1)
|
|
go m.loop()
|
|
return m
|
|
}
|
|
|
|
// Send enqueues a batch for the next flush tick. Empty batches are
|
|
// silently ignored so callers with conditional sample lists don't need
|
|
// to guard the call site.
|
|
func (m *SensorMux) Send(samples []SensorSample) {
|
|
if m == nil || len(samples) == 0 {
|
|
return
|
|
}
|
|
// Copy so caller mutations don't race with the flush loop.
|
|
out := make([]SensorSample, len(samples))
|
|
copy(out, samples)
|
|
select {
|
|
case m.in <- out:
|
|
default:
|
|
log.Printf("sensor mux: input channel full, dropping %d samples", len(out))
|
|
}
|
|
}
|
|
|
|
// Close stops the flush loop and flushes the residual buffer. Safe to
|
|
// call twice (the second is a no-op because the internal context is
|
|
// already cancelled).
|
|
func (m *SensorMux) Close() {
|
|
if m == nil {
|
|
return
|
|
}
|
|
m.cancel()
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *SensorMux) loop() {
|
|
defer m.wg.Done()
|
|
buf := make([]SensorSample, 0, m.maxBatch)
|
|
t := time.NewTicker(m.flushInterval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
m.flushChunks(buf)
|
|
buf = nil
|
|
// Drain whatever is still sitting in the channel so a
|
|
// workload that pushed right before Close doesn't lose
|
|
// those final samples.
|
|
for {
|
|
select {
|
|
case batch := <-m.in:
|
|
m.flushChunks(batch)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
case batch := <-m.in:
|
|
buf = append(buf, batch...)
|
|
if len(buf) >= m.maxBatch {
|
|
m.flushChunks(buf)
|
|
buf = buf[:0]
|
|
}
|
|
case <-t.C:
|
|
if len(buf) > 0 {
|
|
m.flushChunks(buf)
|
|
buf = buf[:0]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushChunks splits a potentially-large slice into maxBatch-sized
|
|
// HTTP requests so no single POST carries more than the configured cap.
|
|
// A 10-second per-chunk timeout keeps a stalled orchestrator from
|
|
// freezing the flush loop.
|
|
func (m *SensorMux) flushChunks(all []SensorSample) {
|
|
for len(all) > 0 {
|
|
n := len(all)
|
|
if n > m.maxBatch {
|
|
n = m.maxBatch
|
|
}
|
|
chunk := all[:n]
|
|
all = all[n:]
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
if err := m.c.Sensor(ctx, chunk); err != nil {
|
|
log.Printf("sensor mux: flush of %d samples failed: %v", len(chunk), err)
|
|
}
|
|
cancel()
|
|
}
|
|
}
|