Files
Vetting/agent/client.go
T
josh 27098fc7ed
CI / Lint + build + test (push) Successful in 1m23s
Release / release (push) Successful in 6m2s
cpustress+orchestrator: serial CPU/RAM passes + silent-skip guard
Orion's run (log 20:49 → 20:54) shipped GREEN while silently skipping
CPUStress. Two compounding bugs:

1. CPUStress ran --cpu N AND --vm N --vm-bytes 90% concurrently.
   On a 4-core 8 GiB N95, that's 360% RAM overcommit; the OOM-killer
   fired, usually on the agent itself. Replaced with two sequential
   passes — CPU (all methods, --verify) for 3 min, then RAM (--vm 1,
   --vm-bytes capped to MemAvailable − 1.5 GiB, floor 256 MiB, --verify)
   for 3 min. Each pass now also asserts elapsed ≥ target − 2s so a
   premature clean exit counts as failure instead of a silent pass.

2. On systemd-restart after the OOM, the agent hardcoded nextStage :=
   "Inventory" and re-ran it. The orchestrator's /result handler
   advances run state via TriggerStageCompleted against the *current*
   RunState, not against body.Stage — so an Inventory result posted
   while the run was in StateCPUStress silently advanced CPUStress →
   Storage and marked CPUStress passed without it ever running.

Two-layer defense for #2:
- agent-side: /claim response now carries current_state; agent resumes
  at the matching stage on a re-claim (happy path).
- server-side: new TriggerStageMismatch + StageNameForState helper
  backstop. If body.Stage doesn't match the run's current stage, /result
  parks the run in FailedHolding with failed_stage labeled
  "<got> (expected <expected>)" and returns 409.

Other stages audited for similar unbounded concurrency — none found;
only CPUStress was unsafe.

Tests:
- cpustress_test.go — parseMemAvailable parses real meminfo, errors on
  missing/malformed; cap calc hits floor on tiny boxes, uses 1.5 GiB
  headroom on normal/huge boxes.
- statemachine_test.go — TriggerStageMismatch lands at FailedHolding
  from every stage state and is rejected from pre-stage/terminal
  states; StageNameForState round-trips the stageStates map.
- agent_handlers_test.go — TestResult_RejectsMismatchedStage proves
  the Orion scenario now 409s + FailedHolding; TestResult_AcceptsMatchingStage
  proves the guard doesn't break the happy path.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 17:29:13 -04:00

190 lines
5.3 KiB
Go

package agent
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Client talks to the orchestrator's /api/v1/runs/:id/* endpoints.
type Client struct {
BaseURL string
RunID int64
Token string
TLSCertFPR string // optional sha256 hex fingerprint
HTTP *http.Client
}
func NewClient(baseURL string, runID int64, token, tlsCertFPR string) *Client {
tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12}
// Cert pinning: if fingerprint provided, accept any cert whose DER
// sha256 matches. The orchestrator may be using a self-signed cert
// inside the LAN.
if tlsCertFPR != "" {
want := strings.ToLower(strings.ReplaceAll(tlsCertFPR, ":", ""))
tlsCfg.InsecureSkipVerify = true
tlsCfg.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error {
for _, c := range rawCerts {
sum := sha256.Sum256(c)
if hex.EncodeToString(sum[:]) == want {
return nil
}
}
return fmt.Errorf("agent: no presented cert matched pinned fingerprint")
}
}
return &Client{
BaseURL: strings.TrimRight(baseURL, "/"),
RunID: runID,
Token: token,
TLSCertFPR: tlsCertFPR,
HTTP: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{TLSClientConfig: tlsCfg},
},
}
}
func (c *Client) Hello(ctx context.Context) error {
return c.postJSON(ctx, "/hello", nil, nil)
}
func (c *Client) Claim(ctx context.Context, agentIP string) (*ClaimResponse, error) {
body := map[string]any{"agent_ip": agentIP}
var out ClaimResponse
if err := c.postJSON(ctx, "/claim", body, &out); err != nil {
return nil, err
}
return &out, nil
}
func (c *Client) Heartbeat(ctx context.Context) (*HeartbeatResponse, error) {
var out HeartbeatResponse
if err := c.postJSON(ctx, "/heartbeat", nil, &out); err != nil {
return nil, err
}
return &out, nil
}
func (c *Client) Log(ctx context.Context, lines []LogLine) error {
return c.postJSON(ctx, "/log", map[string]any{"lines": lines}, nil)
}
func (c *Client) Result(ctx context.Context, result any) (*ResultResponse, error) {
var out ResultResponse
if err := c.postJSON(ctx, "/result", result, &out); err != nil {
return nil, err
}
return &out, nil
}
func (c *Client) Hold(ctx context.Context, agentIP string) (*HoldResponse, error) {
var out HoldResponse
if err := c.postJSON(ctx, "/hold", map[string]any{"agent_ip": agentIP}, &out); err != nil {
return nil, err
}
return &out, nil
}
// Sensor posts a batch of numeric samples (thermal readings, fio IOPS,
// iperf throughput, PSU voltages). Empty batches are allowed.
func (c *Client) Sensor(ctx context.Context, samples []SensorSample) error {
return c.postJSON(ctx, "/sensor", map[string]any{"samples": samples}, nil)
}
// SensorSample is the on-wire shape; the server persists each row into
// the measurements table.
type SensorSample struct {
TS string `json:"ts,omitempty"`
Kind string `json:"kind"`
Key string `json:"key"`
Value float64 `json:"value"`
Unit string `json:"unit,omitempty"`
}
type ClaimResponse struct {
OK bool `json:"ok"`
RunID int64 `json:"run_id"`
Stages []string `json:"stages"`
ExpectedDisks []ClaimExpectedDiskSpec `json:"expected_disks"`
IperfPort int `json:"iperf_port"`
NonDestructive bool `json:"non_destructive"`
// CurrentState is the run's current state at claim time. A fresh
// claim yields "InventoryCheck"; a re-claim after an agent crash
// yields whatever stage the run was parked at, so the agent resumes
// at the right stage instead of silently replaying Inventory and
// letting the orchestrator advance past the crashed stage.
CurrentState string `json:"current_state"`
}
type ClaimExpectedDiskSpec struct {
Serial string `json:"serial"`
SizeGB int `json:"size_gb"`
}
type HeartbeatResponse struct {
Cmd string `json:"cmd"`
State string `json:"state"`
Stage string `json:"stage,omitempty"`
OverrideFlags json.RawMessage `json:"override_flags,omitempty"`
}
type LogLine struct {
TS string `json:"ts,omitempty"`
Level string `json:"level,omitempty"`
Stage string `json:"stage,omitempty"`
Text string `json:"text"`
}
type ResultResponse struct {
OK bool `json:"ok"`
NextState string `json:"next_state"`
}
type HoldResponse struct {
AuthorizedKey string `json:"authorized_key"`
RunID int64 `json:"run_id"`
}
func (c *Client) postJSON(ctx context.Context, path string, in, out any) error {
var body io.Reader
if in != nil {
buf, err := json.Marshal(in)
if err != nil {
return err
}
body = bytes.NewReader(buf)
}
url := fmt.Sprintf("%s/api/v1/runs/%d%s", c.BaseURL, c.RunID, path)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.Token)
if in != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.HTTP.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("%s %s: %d %s", req.Method, path, resp.StatusCode, strings.TrimSpace(string(b)))
}
if out != nil {
return json.NewDecoder(resp.Body).Decode(out)
}
return nil
}