package agent import ( "bytes" "context" "crypto/sha256" "crypto/tls" "crypto/x509" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "strings" "time" ) // Client talks to the orchestrator's /api/v1/runs/:id/* endpoints. type Client struct { BaseURL string RunID int64 Token string TLSCertFPR string // optional sha256 hex fingerprint HTTP *http.Client } func NewClient(baseURL string, runID int64, token, tlsCertFPR string) *Client { tlsCfg := &tls.Config{MinVersion: tls.VersionTLS12} // Cert pinning: if fingerprint provided, accept any cert whose DER // sha256 matches. The orchestrator may be using a self-signed cert // inside the LAN. if tlsCertFPR != "" { want := strings.ToLower(strings.ReplaceAll(tlsCertFPR, ":", "")) tlsCfg.InsecureSkipVerify = true tlsCfg.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error { for _, c := range rawCerts { sum := sha256.Sum256(c) if hex.EncodeToString(sum[:]) == want { return nil } } return fmt.Errorf("agent: no presented cert matched pinned fingerprint") } } return &Client{ BaseURL: strings.TrimRight(baseURL, "/"), RunID: runID, Token: token, TLSCertFPR: tlsCertFPR, HTTP: &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{TLSClientConfig: tlsCfg}, }, } } func (c *Client) Hello(ctx context.Context) error { return c.postJSON(ctx, "/hello", nil, nil) } func (c *Client) Claim(ctx context.Context, agentIP string) (*ClaimResponse, error) { body := map[string]any{"agent_ip": agentIP} var out ClaimResponse if err := c.postJSON(ctx, "/claim", body, &out); err != nil { return nil, err } return &out, nil } func (c *Client) Heartbeat(ctx context.Context) (*HeartbeatResponse, error) { var out HeartbeatResponse if err := c.postJSON(ctx, "/heartbeat", nil, &out); err != nil { return nil, err } return &out, nil } func (c *Client) Log(ctx context.Context, lines []LogLine) error { return c.postJSON(ctx, "/log", map[string]any{"lines": lines}, nil) } func (c *Client) Result(ctx context.Context, result any) (*ResultResponse, error) { var out ResultResponse if err := c.postJSON(ctx, "/result", result, &out); err != nil { return nil, err } return &out, nil } func (c *Client) Hold(ctx context.Context, agentIP string) (*HoldResponse, error) { var out HoldResponse if err := c.postJSON(ctx, "/hold", map[string]any{"agent_ip": agentIP}, &out); err != nil { return nil, err } return &out, nil } // Sensor posts a batch of numeric samples (thermal readings, fio IOPS, // iperf throughput, PSU voltages). Empty batches are allowed. func (c *Client) Sensor(ctx context.Context, samples []SensorSample) error { return c.postJSON(ctx, "/sensor", map[string]any{"samples": samples}, nil) } // SensorSample is the on-wire shape; the server persists each row into // the measurements table. type SensorSample struct { TS string `json:"ts,omitempty"` Kind string `json:"kind"` Key string `json:"key"` Value float64 `json:"value"` Unit string `json:"unit,omitempty"` } type ClaimResponse struct { OK bool `json:"ok"` RunID int64 `json:"run_id"` Stages []string `json:"stages"` ExpectedDisks []ClaimExpectedDiskSpec `json:"expected_disks"` IperfPort int `json:"iperf_port"` NonDestructive bool `json:"non_destructive"` // CurrentState is the run's current state at claim time. A fresh // claim yields "InventoryCheck"; a re-claim after an agent crash // yields whatever stage the run was parked at, so the agent resumes // at the right stage instead of silently replaying Inventory and // letting the orchestrator advance past the crashed stage. CurrentState string `json:"current_state"` // StageConfig carries per-profile stage knobs (Phase 2): stage-level // timeouts and probe-level durations/modes. Empty when the agent // talks to a pre-Phase-2 orchestrator; the agent applies compile- // time defaults in that case. StageConfig ClaimStageConfig `json:"stage_config"` } // ClaimStageConfig mirrors config.StageConfig server-side — duplicated so // the agent doesn't need to import internal/config. Durations arrive as // strings ("2m", "2h") and are parsed by the tests package at the point // of use. An empty field means "use the agent-side default" so a missing // knob doesn't silently turn CPUStress / Storage into a no-op. type ClaimStageConfig struct { Profile string `json:"profile"` StageTimeouts map[string]string `json:"stage_timeouts,omitempty"` CPUStress ClaimCPUStressKnobs `json:"cpustress"` Storage ClaimStorageKnobs `json:"storage"` Network ClaimNetworkKnobs `json:"network"` Burn ClaimBurnKnobs `json:"burn"` } type ClaimCPUStressKnobs struct { CPUPass string `json:"cpu_pass,omitempty"` MemPass string `json:"mem_pass,omitempty"` EDACPoll string `json:"edac_poll,omitempty"` } type ClaimStorageKnobs struct { Mode string `json:"mode,omitempty"` FioSize string `json:"fio_size,omitempty"` FioTime string `json:"fio_time,omitempty"` FioBS string `json:"fio_bs,omitempty"` FioRW string `json:"fio_rw,omitempty"` Verify string `json:"verify,omitempty"` } type ClaimNetworkKnobs struct { Duration string `json:"duration,omitempty"` } // ClaimBurnKnobs mirrors config.BurnKnobs. Duration/CPUWorkers arrive as // strings so the agent can treat empty as "use compile-time default". // MemPct is a percentage (0-100); IperfParallel is the parallel stream // count fed to iperf3 -P. FioOnSpare gates whether fio runs inside Burn. type ClaimBurnKnobs struct { Duration string `json:"duration,omitempty"` CPUWorkers string `json:"cpu_workers,omitempty"` MemPct int `json:"mem_pct,omitempty"` FioOnSpare bool `json:"fio_on_spare,omitempty"` IperfParallel int `json:"iperf_parallel,omitempty"` } type ClaimExpectedDiskSpec struct { Serial string `json:"serial"` SizeGB int `json:"size_gb"` } type HeartbeatResponse struct { Cmd string `json:"cmd"` State string `json:"state"` Stage string `json:"stage,omitempty"` OverrideFlags json.RawMessage `json:"override_flags,omitempty"` } type LogLine struct { TS string `json:"ts,omitempty"` Level string `json:"level,omitempty"` Stage string `json:"stage,omitempty"` Text string `json:"text"` } type ResultResponse struct { OK bool `json:"ok"` NextState string `json:"next_state"` } // SubStepReport is the wire shape the agent POSTs inside /result for // each granular sub-step (CPU/Memory pass, per-disk SMART, per-device // GPU, …). Ordinal is assigned by the server in slice order; the agent // doesn't set it. Summary is opaque JSON the UI may render later. type SubStepReport struct { Name string `json:"name"` Passed bool `json:"passed"` Skipped bool `json:"skipped,omitempty"` StartedAt string `json:"started_at,omitempty"` CompletedAt string `json:"completed_at,omitempty"` Summary json.RawMessage `json:"summary,omitempty"` } type HoldResponse struct { AuthorizedKey string `json:"authorized_key"` RunID int64 `json:"run_id"` } func (c *Client) postJSON(ctx context.Context, path string, in, out any) error { var body io.Reader if in != nil { buf, err := json.Marshal(in) if err != nil { return err } body = bytes.NewReader(buf) } url := fmt.Sprintf("%s/api/v1/runs/%d%s", c.BaseURL, c.RunID, path) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) if err != nil { return err } req.Header.Set("Authorization", "Bearer "+c.Token) if in != nil { req.Header.Set("Content-Type", "application/json") } resp, err := c.HTTP.Do(req) if err != nil { return err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode >= 300 { b, _ := io.ReadAll(resp.Body) return fmt.Errorf("%s %s: %d %s", req.Method, path, resp.StatusCode, strings.TrimSpace(string(b))) } if out != nil { return json.NewDecoder(resp.Body).Decode(out) } return nil }