Heartbeat command channel: reboot_for_vetting skips WoL
CI / Lint + build + test (push) Failing after 5m13s
CI / Lint + build + test (push) Failing after 5m13s
When the operator clicks Start vetting and the host is heartbeating, the heartbeat response now carries cmd=reboot_for_vetting + run_id. The handler drives the Queued → WaitingWoL transition via the existing state machine, so a benign race with the 2s dispatcher poll is refused by the state machine (not double-dispatched). WaitingWoL retries for 10 minutes to cover a crashed-mid-reboot case, then falls back to operator action. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -13,14 +14,17 @@ import (
|
||||
|
||||
"vetting/internal/api"
|
||||
"vetting/internal/db"
|
||||
"vetting/internal/events"
|
||||
"vetting/internal/model"
|
||||
"vetting/internal/orchestrator"
|
||||
"vetting/internal/store"
|
||||
)
|
||||
|
||||
// setupHeartbeat wires just enough of UI to exercise the heartbeat
|
||||
// handler. Runner is left nil — the handler no-ops the SSE publish in
|
||||
// that case, which matches "tests don't assert on SSE" (covered by
|
||||
// integration-style runner tests).
|
||||
// handler. Runner is left nil by default — the Phase-2 command path
|
||||
// short-circuits to idle when Runner is absent, which is fine for the
|
||||
// "no run yet" happy path. Callers that want to drive the Phase-2
|
||||
// transition use setupHeartbeatWithRunner.
|
||||
func setupHeartbeat(t *testing.T) (*api.UI, *store.Hosts) {
|
||||
t.Helper()
|
||||
conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db"))
|
||||
@@ -32,6 +36,25 @@ func setupHeartbeat(t *testing.T) (*api.UI, *store.Hosts) {
|
||||
return &api.UI{Hosts: hosts}, hosts
|
||||
}
|
||||
|
||||
// setupHeartbeatWithRunner also wires a Runs store + Runner so
|
||||
// Phase-2 tests can exercise the Queued → WaitingWoL transition and
|
||||
// the 10-minute WaitingWoL re-issue window.
|
||||
func setupHeartbeatWithRunner(t *testing.T) (*api.UI, *store.Hosts, *store.Runs) {
|
||||
t.Helper()
|
||||
conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open db: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { _ = conn.Close() })
|
||||
hosts := &store.Hosts{DB: conn}
|
||||
runs := &store.Runs{DB: conn}
|
||||
stages := &store.Stages{DB: conn}
|
||||
hub := events.NewHub()
|
||||
runner := &orchestrator.Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub}
|
||||
ui := &api.UI{Hosts: hosts, Runs: runs, Runner: runner}
|
||||
return ui, hosts, runs
|
||||
}
|
||||
|
||||
func heartbeatReq(mac string) *http.Request {
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v1/hosts/"+mac+"/heartbeat", nil)
|
||||
rctx := chi.NewRouteContext()
|
||||
@@ -100,3 +123,134 @@ func TestUIHeartbeat_BadMAC(t *testing.T) {
|
||||
t.Fatalf("status = %d, want 400", rr.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUIHeartbeat_QueuedDispatches(t *testing.T) {
|
||||
ui, hosts, runs := setupHeartbeatWithRunner(t)
|
||||
ctx := context.Background()
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "hb-dispatch",
|
||||
MAC: "aa:bb:cc:dd:ee:20",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
|
||||
rr := httptest.NewRecorder()
|
||||
ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:20"))
|
||||
if rr.Code != http.StatusOK {
|
||||
t.Fatalf("status = %d, body = %q", rr.Code, rr.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
OK bool `json:"ok"`
|
||||
Cmd string `json:"cmd"`
|
||||
RunID int64 `json:"run_id"`
|
||||
}
|
||||
if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if resp.Cmd != "reboot_for_vetting" || resp.RunID != runID {
|
||||
t.Fatalf("response = %+v, want cmd=reboot_for_vetting run_id=%d", resp, runID)
|
||||
}
|
||||
// Run advanced Queued → WaitingWoL via the state machine.
|
||||
got, err := runs.Get(ctx, runID)
|
||||
if err != nil {
|
||||
t.Fatalf("get run: %v", err)
|
||||
}
|
||||
if got.State != model.StateWaitingWoL {
|
||||
t.Fatalf("state = %s, want WaitingWoL", got.State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUIHeartbeat_WaitingWoLRetries(t *testing.T) {
|
||||
ui, hosts, runs := setupHeartbeatWithRunner(t)
|
||||
ctx := context.Background()
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "hb-retry",
|
||||
MAC: "aa:bb:cc:dd:ee:21",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
// Simulate: dispatcher already moved the run to WaitingWoL, now
|
||||
// the host's reporter comes back from a crashed reboot.
|
||||
if err := runs.SetState(ctx, runID, model.StateWaitingWoL); err != nil {
|
||||
t.Fatalf("set state: %v", err)
|
||||
}
|
||||
|
||||
rr := httptest.NewRecorder()
|
||||
ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:21"))
|
||||
var resp struct {
|
||||
Cmd string `json:"cmd"`
|
||||
RunID int64 `json:"run_id"`
|
||||
}
|
||||
_ = json.Unmarshal(rr.Body.Bytes(), &resp)
|
||||
if resp.Cmd != "reboot_for_vetting" || resp.RunID != runID {
|
||||
t.Fatalf("response = %+v, want reboot_for_vetting retry", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUIHeartbeat_NoRunIsIdle(t *testing.T) {
|
||||
ui, hosts, _ := setupHeartbeatWithRunner(t)
|
||||
if _, err := hosts.Create(context.Background(), model.Host{
|
||||
Name: "hb-idle",
|
||||
MAC: "aa:bb:cc:dd:ee:22",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
}); err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
rr := httptest.NewRecorder()
|
||||
ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:22"))
|
||||
// Idle = cmd omitted entirely; the agent's heartbeatResponse
|
||||
// decodes that as "", and handleResponse bails early.
|
||||
body := rr.Body.String()
|
||||
if strings.Contains(body, "reboot_for_vetting") {
|
||||
t.Fatalf("idle host got reboot cmd: %s", body)
|
||||
}
|
||||
if strings.Contains(body, `"cmd"`) {
|
||||
t.Fatalf("idle response should omit cmd, got: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUIHeartbeat_CompletedRunIsIdle(t *testing.T) {
|
||||
ui, hosts, runs := setupHeartbeatWithRunner(t)
|
||||
ctx := context.Background()
|
||||
hostID, err := hosts.Create(ctx, model.Host{
|
||||
Name: "hb-done",
|
||||
MAC: "aa:bb:cc:dd:ee:23",
|
||||
WoLBroadcastIP: "10.0.0.255",
|
||||
WoLPort: 9,
|
||||
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create host: %v", err)
|
||||
}
|
||||
runID, err := runs.Create(ctx, hostID, "deadbeef")
|
||||
if err != nil {
|
||||
t.Fatalf("create run: %v", err)
|
||||
}
|
||||
if err := runs.SetState(ctx, runID, model.StateCompleted); err != nil {
|
||||
t.Fatalf("set state: %v", err)
|
||||
}
|
||||
rr := httptest.NewRecorder()
|
||||
ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:23"))
|
||||
body := rr.Body.String()
|
||||
if strings.Contains(body, "reboot_for_vetting") {
|
||||
t.Fatalf("completed run returned reboot cmd: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
@@ -243,8 +244,10 @@ func (u *UI) CreateHostJSON(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Heartbeat is called every ~30s by a host-mode vetting-agent running
|
||||
// as a systemd service on the registered host. LAN-trusted, no auth —
|
||||
// same threat model as the browser UI and quick-register. Phase 1
|
||||
// just stamps last_seen_at and flips the dashboard tile to "online".
|
||||
// same threat model as the browser UI and quick-register. Stamps
|
||||
// last_seen_at, flips the dashboard tile to "online", and — if the
|
||||
// operator has clicked Start vetting since the last heartbeat — replies
|
||||
// with cmd=reboot_for_vetting so the host boots into PXE without WoL.
|
||||
func (u *UI) Heartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac")))
|
||||
if !macRe.MatchString(mac) {
|
||||
@@ -268,10 +271,69 @@ func (u *UI) Heartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
if u.Runner != nil {
|
||||
u.Runner.PublishTileUpdate(r.Context(), host.ID)
|
||||
}
|
||||
cmd, runID := u.pickHostCommand(r.Context(), host.ID)
|
||||
resp := heartbeatResponse{OK: true, Cmd: cmd, RunID: runID}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
}
|
||||
|
||||
// heartbeatResponse is the JSON the host-mode agent decodes on every
|
||||
// heartbeat. `cmd` is "" (omitted) in the idle case so the wire shape
|
||||
// stays `{"ok": true}` when nothing is happening.
|
||||
type heartbeatResponse struct {
|
||||
OK bool `json:"ok"`
|
||||
Cmd string `json:"cmd,omitempty"`
|
||||
RunID int64 `json:"run_id,omitempty"`
|
||||
}
|
||||
|
||||
// pickHostCommand decides what the host-mode agent should do on the
|
||||
// back of this heartbeat. Returns ("", 0) when there's nothing to do.
|
||||
//
|
||||
// - Queued run → Transition(Dispatched) and tell the agent to reboot.
|
||||
// The dispatcher would have WoL'd it anyway; we beat it to the
|
||||
// punch so the host skips the WoL dance.
|
||||
// - WaitingWoL run created <10min ago → also return reboot, covering
|
||||
// "host crashed mid-reboot, systemd brought the reporter back".
|
||||
// - anything else → idle.
|
||||
func (u *UI) pickHostCommand(ctx context.Context, hostID int64) (string, int64) {
|
||||
if u.Runs == nil || u.Runner == nil {
|
||||
return "", 0
|
||||
}
|
||||
run, err := u.Runs.LatestForHost(ctx, hostID)
|
||||
if err != nil {
|
||||
log.Printf("heartbeat: latest run for host %d: %v", hostID, err)
|
||||
return "", 0
|
||||
}
|
||||
if run == nil {
|
||||
return "", 0
|
||||
}
|
||||
switch run.State {
|
||||
case model.StateQueued:
|
||||
if _, err := u.Runner.Transition(ctx, run.ID, orchestrator.TriggerDispatched); err != nil {
|
||||
// Benign race with the dispatcher's own 2s poll — the
|
||||
// state machine refuses the second transition; we just
|
||||
// log and return idle so the agent doesn't reboot on a
|
||||
// run that another path is already driving.
|
||||
log.Printf("heartbeat: transition run %d: %v", run.ID, err)
|
||||
return "", 0
|
||||
}
|
||||
log.Printf("heartbeat: dispatched run %d for host %d via heartbeat (no WoL)", run.ID, hostID)
|
||||
return cmdRebootForVetting, run.ID
|
||||
case model.StateWaitingWoL:
|
||||
// Tolerate a crashed-mid-reboot retry: the reporter is the
|
||||
// only thing that could be telling us about this host right
|
||||
// now, and WoL is only the fallback anyway. Bound it so a
|
||||
// perpetually-broken PXE doesn't reboot-loop the box.
|
||||
if time.Since(run.StartedAt) < 10*time.Minute {
|
||||
return cmdRebootForVetting, run.ID
|
||||
}
|
||||
return "", 0
|
||||
}
|
||||
return "", 0
|
||||
}
|
||||
|
||||
const cmdRebootForVetting = "reboot_for_vetting"
|
||||
|
||||
func writeJSONError(w http.ResponseWriter, status int, msg string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
|
||||
Reference in New Issue
Block a user