4524ab8dc0
Non-destructive pre-declares "don't touch the disks" on Start: the Storage stage skips wipe-probe, badblocks -w, and write-mode fio, and reports a read-only summary. Runs a new non_destructive column; threaded through Claim → agent tests.Deps → Storage stage. Cancel halts an in-flight run. The orchestrator transitions to a new StateCancelled via TriggerOperatorCancelled (valid from any active state); the agent's next heartbeat returns cmd=cancel_stage, which fires a stored CancelFunc on the per-stage context. Stage subprocesses spawned with exec.CommandContext die with the context, the agent posts a cancelled outcome, then powers the host off. Destructive stages mid-run may leave the host in an intermediate state — the UI confirm dialog warns the operator; recovery is manual for now. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
129 lines
4.0 KiB
Go
129 lines
4.0 KiB
Go
package api_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"path/filepath"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"vetting/internal/api"
|
|
"vetting/internal/db"
|
|
"vetting/internal/model"
|
|
"vetting/internal/orchestrator"
|
|
"vetting/internal/store"
|
|
)
|
|
|
|
func setupAgent(t *testing.T) (*api.Agent, int64, string) {
|
|
t.Helper()
|
|
path := filepath.Join(t.TempDir(), "vetting.db")
|
|
conn, err := db.Open(path)
|
|
if err != nil {
|
|
t.Fatalf("open db: %v", err)
|
|
}
|
|
t.Cleanup(func() { _ = conn.Close() })
|
|
|
|
hosts := &store.Hosts{DB: conn}
|
|
runs := &store.Runs{DB: conn}
|
|
meas := &store.Measurements{DB: conn}
|
|
|
|
hostID, err := hosts.Create(context.Background(), model.Host{
|
|
Name: "t-host",
|
|
MAC: "aa:bb:cc:dd:ee:01",
|
|
WoLBroadcastIP: "10.0.0.255",
|
|
WoLPort: 9,
|
|
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("create host: %v", err)
|
|
}
|
|
plain, hash, err := orchestrator.IssueRunToken()
|
|
if err != nil {
|
|
t.Fatalf("issue token: %v", err)
|
|
}
|
|
runID, err := runs.Create(context.Background(), hostID, hash, false)
|
|
if err != nil {
|
|
t.Fatalf("create run: %v", err)
|
|
}
|
|
return &api.Agent{
|
|
Hosts: hosts,
|
|
Runs: runs,
|
|
Measurements: meas,
|
|
}, runID, plain
|
|
}
|
|
|
|
func routedRequest(runID int64, method, path string, body []byte) *http.Request {
|
|
req := httptest.NewRequest(method, path, bytes.NewReader(body))
|
|
// chi.URLParam is read from chi's context routing; fake that here.
|
|
rctx := chi.NewRouteContext()
|
|
rctx.URLParams.Add("id", strconv.FormatInt(runID, 10))
|
|
return req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
|
|
}
|
|
|
|
func TestSensorPersistsBatch(t *testing.T) {
|
|
a, runID, token := setupAgent(t)
|
|
batch := api.SensorBatch{Samples: []api.SensorSample{
|
|
{Kind: "thermal", Key: "cpu", Value: 47.5, Unit: "C"},
|
|
{Kind: "iperf", Key: "throughput_mbps", Value: 938.2, Unit: "Mbps"},
|
|
}}
|
|
buf, _ := json.Marshal(batch)
|
|
req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/sensor", buf)
|
|
req.Header.Set("Authorization", "Bearer "+token)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
rr := httptest.NewRecorder()
|
|
a.Sensor(rr, req)
|
|
if rr.Code != http.StatusOK {
|
|
t.Fatalf("status = %d, body = %q", rr.Code, rr.Body.String())
|
|
}
|
|
rows, err := a.Measurements.ListForRun(context.Background(), runID)
|
|
if err != nil {
|
|
t.Fatalf("ListForRun: %v", err)
|
|
}
|
|
if len(rows) != 2 {
|
|
t.Fatalf("expected 2 measurements, got %d", len(rows))
|
|
}
|
|
}
|
|
|
|
func TestSensorRejectsBadToken(t *testing.T) {
|
|
a, runID, _ := setupAgent(t)
|
|
body, _ := json.Marshal(api.SensorBatch{})
|
|
req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/sensor", body)
|
|
req.Header.Set("Authorization", "Bearer wrong-token")
|
|
rr := httptest.NewRecorder()
|
|
a.Sensor(rr, req)
|
|
if rr.Code != http.StatusUnauthorized {
|
|
t.Fatalf("status = %d, want 401", rr.Code)
|
|
}
|
|
}
|
|
|
|
// TestHeartbeatShutdownWhenCompleted: once the orchestrator has flipped
|
|
// the run into Completed, the next heartbeat response must carry
|
|
// cmd=shutdown so the agent powers the host down.
|
|
func TestHeartbeatShutdownWhenCompleted(t *testing.T) {
|
|
a, runID, token := setupAgent(t)
|
|
// Wire a runner so Heartbeat's TouchHeartbeat call doesn't nil-panic.
|
|
a.Runner = &orchestrator.Runner{Runs: a.Runs, Hosts: a.Hosts, Stages: &store.Stages{DB: a.Runs.DB}}
|
|
if err := a.Runs.SetState(context.Background(), runID, model.StateCompleted); err != nil {
|
|
t.Fatalf("set state: %v", err)
|
|
}
|
|
req := routedRequest(runID, http.MethodPost, "/api/v1/runs/"+strconv.FormatInt(runID, 10)+"/heartbeat", nil)
|
|
req.Header.Set("Authorization", "Bearer "+token)
|
|
rr := httptest.NewRecorder()
|
|
a.Heartbeat(rr, req)
|
|
if rr.Code != http.StatusOK {
|
|
t.Fatalf("status = %d, body = %s", rr.Code, rr.Body.String())
|
|
}
|
|
var resp map[string]any
|
|
if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
|
|
t.Fatalf("decode: %v", err)
|
|
}
|
|
if resp["cmd"] != "shutdown" {
|
|
t.Fatalf("cmd = %v, want shutdown", resp["cmd"])
|
|
}
|
|
}
|