package tests import ( "context" "encoding/json" "fmt" "net/url" "os/exec" "strconv" "strings" "time" "vetting/agent/probes" ) // NetworkConfig is what the agent passes to Network: the orchestrator's // iperf3 server address, port, and the per-profile duration. type NetworkConfig struct { OrchestratorURL string IperfPort int // 0 = 5201 Duration time.Duration } // Network runs iperf3 against the orchestrator's bundled server for // the profile-configured duration. Records throughput as a measurement; // records per-interface rx/tx error-rate deltas as nic_retrans samples // so the server-side threshold gate (`nic_retrans rate < 0.001`) fires // on a flaky PHY or a wire that drops half its packets under load. // // Failure cases: iperf3 missing, server unreachable, zero throughput. // Zero throughput is treated as a hard failure — an iperf that finished // cleanly but pushed zero bytes is indistinguishable from a bad run. func Network(ctx context.Context, d Deps, cfg NetworkConfig) Outcome { if _, err := exec.LookPath("iperf3"); err != nil { // Live image ships iperf3; absence means packaging regression. d.Error("Network: iperf3 not found — live image is missing required tool") return Outcome{ Passed: false, Message: "iperf3 binary missing from live image", Summary: "failed (iperf3 missing)", Extras: map[string]any{"reason": "iperf3_missing"}, } } host, err := deriveHost(cfg.OrchestratorURL) if err != nil || host == "" { d.Warn("Network: can't derive orchestrator host from URL — skipping stage") return Outcome{ Passed: true, Summary: "skipped (no orchestrator host)", Extras: map[string]any{"skipped": true, "reason": "no_host"}, } } port := cfg.IperfPort if port == 0 { port = 5201 } duration := cfg.Duration if duration <= 0 { duration = 10 * time.Second } // Snapshot /proc/net/dev before the test so we can attribute any // error-count growth to *this stage's* traffic. The same snapshot // taken after iperf returns is the end of the window. netStart := indexNetDev(probes.NetDev()) args := []string{ "-c", host, "-p", strconv.Itoa(port), "-t", strconv.Itoa(int(duration.Seconds())), "-J", // JSON output } d.Info(fmt.Sprintf("Network: iperf3 -c %s -p %d -t %s", host, port, duration)) runCtx, cancel := context.WithTimeout(ctx, duration+30*time.Second) defer cancel() cmd := exec.CommandContext(runCtx, "iperf3", args...) out, err := cmd.Output() if err != nil { d.Error("Network: iperf3 client failed: " + err.Error()) return Outcome{ Passed: false, Message: "iperf3 client error: " + err.Error(), Summary: "iperf3 failed", Extras: map[string]any{"stderr_tail": tailLines(string(out), 20)}, } } mbps, retrans, bytesSent, parsed, err := parseIperfJSON(out) if err != nil { d.Error("Network: parse iperf3 output: " + err.Error()) return Outcome{ Passed: false, Message: "parse iperf3 json: " + err.Error(), Summary: "parse error", Extras: map[string]any{"raw": string(out)}, } } netEnd := indexNetDev(probes.NetDev()) netDelta := diffNetDev(netStart, netEnd) samples := []Sample{{Kind: "iperf", Key: "throughput_mbps", Value: mbps, Unit: "Mbps"}} // iperf-derived retrans rate: retrans_count / packet_count_estimate. // TCP typical MTU 1500; payload ~1460. We divide bytes by 1460 to // approximate packets. This keeps the rate bounded in [0, 1]. if bytesSent > 0 { packets := float64(bytesSent) / 1460.0 if packets > 0 { samples = append(samples, Sample{ Kind: "nic_retrans", Key: "iperf/rate", Value: float64(retrans) / packets, Unit: "rate", }) } } // Per-interface error-rate deltas. A flaky cable typically surfaces // as tx_errs or tx_drop on the originating interface, not inside // iperf's own tally. for iface, delta := range netDelta { if delta.TxBytes > 0 { packets := float64(delta.TxBytes) / 1460.0 if packets > 0 { rate := float64(delta.TxErrs+delta.TxDrop) / packets samples = append(samples, Sample{ Kind: "nic_retrans", Key: iface + "/rate", Value: rate, Unit: "rate", }) } } // Diagnostic raw counts so the report can show which interface // bled. These don't fire a threshold today but are useful for // post-mortem. samples = append(samples, Sample{Kind: "nic_errs", Key: iface + "/rx", Value: float64(delta.RxErrs + delta.RxDrop), Unit: "count"}, Sample{Kind: "nic_errs", Key: iface + "/tx", Value: float64(delta.TxErrs + delta.TxDrop), Unit: "count"}, ) } if d.Sensor != nil { _ = d.Sensor(ctx, samples) } extras := map[string]any{ "throughput_mbps": mbps, "retransmits": retrans, "bytes_sent": bytesSent, "net_delta": netDelta, "iperf_end": parsed, } if mbps <= 0 { return Outcome{ Passed: false, Message: "iperf3 reported zero throughput", Summary: "zero throughput", Extras: extras, } } d.Info(fmt.Sprintf("Network: iperf3 PASSED: %.1f Mbps (retransmits=%d)", mbps, retrans)) return Outcome{ Passed: true, Summary: fmt.Sprintf("%.1f Mbps to %s (retransmits=%d)", mbps, host, retrans), Extras: extras, } } // indexNetDev flattens a NetDev slice into a map keyed by interface // name so diffNetDev can pair start/end by name without O(n²) scans. func indexNetDev(snaps []probes.NetDevSnapshot) map[string]probes.NetDevSnapshot { out := map[string]probes.NetDevSnapshot{} for _, s := range snaps { out[s.Iface] = s } return out } // diffNetDev computes end − start for each interface present in both // snapshots. An interface that dropped away mid-run is dropped from // the result (can't compute a delta). Underflow (end < start, rare // after a counter reset) is clamped to 0. func diffNetDev(start, end map[string]probes.NetDevSnapshot) map[string]probes.NetDevSnapshot { out := map[string]probes.NetDevSnapshot{} for iface, e := range end { s, ok := start[iface] if !ok { continue } out[iface] = probes.NetDevSnapshot{ Iface: iface, RxBytes: subU64(e.RxBytes, s.RxBytes), RxErrs: subU64(e.RxErrs, s.RxErrs), RxDrop: subU64(e.RxDrop, s.RxDrop), TxBytes: subU64(e.TxBytes, s.TxBytes), TxErrs: subU64(e.TxErrs, s.TxErrs), TxDrop: subU64(e.TxDrop, s.TxDrop), } } return out } func subU64(a, b uint64) uint64 { if a < b { return 0 } return a - b } // deriveHost pulls the hostname out of an https://host:port base URL. func deriveHost(raw string) (string, error) { if raw == "" { return "", fmt.Errorf("empty url") } u, err := url.Parse(raw) if err != nil { return "", err } h := u.Hostname() return strings.TrimSpace(h), nil } // parseIperfJSON pulls end.sum_sent.bits_per_second and retransmits out // of iperf3 -J. Returns (Mbps, retransmits, bytes_sent, full-end-map, err). func parseIperfJSON(b []byte) (float64, int64, int64, map[string]any, error) { var top map[string]any if err := json.Unmarshal(b, &top); err != nil { return 0, 0, 0, nil, err } end, ok := top["end"].(map[string]any) if !ok { return 0, 0, 0, nil, fmt.Errorf("missing end") } // Pull the first sum that carries bits_per_second; retransmits + // bytes live there too for TCP. var mbps float64 var retrans int64 var bytesSent int64 for _, key := range []string{"sum_sent", "sum_received", "sum"} { sum, ok := end[key].(map[string]any) if !ok { continue } bps, ok := sum["bits_per_second"].(float64) if !ok { continue } mbps = bps / 1_000_000 if r, ok := sum["retransmits"].(float64); ok { retrans = int64(r) } if bs, ok := sum["bytes"].(float64); ok { bytesSent = int64(bs) } break } if mbps == 0 { return 0, 0, 0, end, fmt.Errorf("no bits_per_second in end.sum_*") } return mbps, retrans, bytesSent, end, nil }