From 9b16ed80e6b9a1c03623338a1c732b30268b3f49 Mon Sep 17 00:00:00 2001 From: josh Date: Fri, 17 Apr 2026 23:37:01 -0400 Subject: [PATCH] Heartbeat command channel: reboot_for_vetting skips WoL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the operator clicks Start vetting and the host is heartbeating, the heartbeat response now carries cmd=reboot_for_vetting + run_id. The handler drives the Queued → WaitingWoL transition via the existing state machine, so a benign race with the 2s dispatcher poll is refused by the state machine (not double-dispatched). WaitingWoL retries for 10 minutes to cover a crashed-mid-reboot case, then falls back to operator action. Co-Authored-By: Claude Opus 4.7 --- internal/api/heartbeat_test.go | 160 ++++++++++++++++++++++++++++++++- internal/api/ui_handlers.go | 68 +++++++++++++- 2 files changed, 222 insertions(+), 6 deletions(-) diff --git a/internal/api/heartbeat_test.go b/internal/api/heartbeat_test.go index 1ca79ba..08257eb 100644 --- a/internal/api/heartbeat_test.go +++ b/internal/api/heartbeat_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "path/filepath" + "strings" "testing" "time" @@ -13,14 +14,17 @@ import ( "vetting/internal/api" "vetting/internal/db" + "vetting/internal/events" "vetting/internal/model" + "vetting/internal/orchestrator" "vetting/internal/store" ) // setupHeartbeat wires just enough of UI to exercise the heartbeat -// handler. Runner is left nil — the handler no-ops the SSE publish in -// that case, which matches "tests don't assert on SSE" (covered by -// integration-style runner tests). +// handler. Runner is left nil by default — the Phase-2 command path +// short-circuits to idle when Runner is absent, which is fine for the +// "no run yet" happy path. Callers that want to drive the Phase-2 +// transition use setupHeartbeatWithRunner. func setupHeartbeat(t *testing.T) (*api.UI, *store.Hosts) { t.Helper() conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db")) @@ -32,6 +36,25 @@ func setupHeartbeat(t *testing.T) (*api.UI, *store.Hosts) { return &api.UI{Hosts: hosts}, hosts } +// setupHeartbeatWithRunner also wires a Runs store + Runner so +// Phase-2 tests can exercise the Queued → WaitingWoL transition and +// the 10-minute WaitingWoL re-issue window. +func setupHeartbeatWithRunner(t *testing.T) (*api.UI, *store.Hosts, *store.Runs) { + t.Helper() + conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db")) + if err != nil { + t.Fatalf("open db: %v", err) + } + t.Cleanup(func() { _ = conn.Close() }) + hosts := &store.Hosts{DB: conn} + runs := &store.Runs{DB: conn} + stages := &store.Stages{DB: conn} + hub := events.NewHub() + runner := &orchestrator.Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub} + ui := &api.UI{Hosts: hosts, Runs: runs, Runner: runner} + return ui, hosts, runs +} + func heartbeatReq(mac string) *http.Request { req := httptest.NewRequest(http.MethodPost, "/api/v1/hosts/"+mac+"/heartbeat", nil) rctx := chi.NewRouteContext() @@ -100,3 +123,134 @@ func TestUIHeartbeat_BadMAC(t *testing.T) { t.Fatalf("status = %d, want 400", rr.Code) } } + +func TestUIHeartbeat_QueuedDispatches(t *testing.T) { + ui, hosts, runs := setupHeartbeatWithRunner(t) + ctx := context.Background() + hostID, err := hosts.Create(ctx, model.Host{ + Name: "hb-dispatch", + MAC: "aa:bb:cc:dd:ee:20", + WoLBroadcastIP: "10.0.0.255", + WoLPort: 9, + ExpectedSpecYAML: "memory:\n total_gib: 16\n", + }) + if err != nil { + t.Fatalf("create host: %v", err) + } + runID, err := runs.Create(ctx, hostID, "deadbeef") + if err != nil { + t.Fatalf("create run: %v", err) + } + + rr := httptest.NewRecorder() + ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:20")) + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, body = %q", rr.Code, rr.Body.String()) + } + var resp struct { + OK bool `json:"ok"` + Cmd string `json:"cmd"` + RunID int64 `json:"run_id"` + } + if err := json.Unmarshal(rr.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Cmd != "reboot_for_vetting" || resp.RunID != runID { + t.Fatalf("response = %+v, want cmd=reboot_for_vetting run_id=%d", resp, runID) + } + // Run advanced Queued → WaitingWoL via the state machine. + got, err := runs.Get(ctx, runID) + if err != nil { + t.Fatalf("get run: %v", err) + } + if got.State != model.StateWaitingWoL { + t.Fatalf("state = %s, want WaitingWoL", got.State) + } +} + +func TestUIHeartbeat_WaitingWoLRetries(t *testing.T) { + ui, hosts, runs := setupHeartbeatWithRunner(t) + ctx := context.Background() + hostID, err := hosts.Create(ctx, model.Host{ + Name: "hb-retry", + MAC: "aa:bb:cc:dd:ee:21", + WoLBroadcastIP: "10.0.0.255", + WoLPort: 9, + ExpectedSpecYAML: "memory:\n total_gib: 16\n", + }) + if err != nil { + t.Fatalf("create host: %v", err) + } + runID, err := runs.Create(ctx, hostID, "deadbeef") + if err != nil { + t.Fatalf("create run: %v", err) + } + // Simulate: dispatcher already moved the run to WaitingWoL, now + // the host's reporter comes back from a crashed reboot. + if err := runs.SetState(ctx, runID, model.StateWaitingWoL); err != nil { + t.Fatalf("set state: %v", err) + } + + rr := httptest.NewRecorder() + ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:21")) + var resp struct { + Cmd string `json:"cmd"` + RunID int64 `json:"run_id"` + } + _ = json.Unmarshal(rr.Body.Bytes(), &resp) + if resp.Cmd != "reboot_for_vetting" || resp.RunID != runID { + t.Fatalf("response = %+v, want reboot_for_vetting retry", resp) + } +} + +func TestUIHeartbeat_NoRunIsIdle(t *testing.T) { + ui, hosts, _ := setupHeartbeatWithRunner(t) + if _, err := hosts.Create(context.Background(), model.Host{ + Name: "hb-idle", + MAC: "aa:bb:cc:dd:ee:22", + WoLBroadcastIP: "10.0.0.255", + WoLPort: 9, + ExpectedSpecYAML: "memory:\n total_gib: 16\n", + }); err != nil { + t.Fatalf("create host: %v", err) + } + rr := httptest.NewRecorder() + ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:22")) + // Idle = cmd omitted entirely; the agent's heartbeatResponse + // decodes that as "", and handleResponse bails early. + body := rr.Body.String() + if strings.Contains(body, "reboot_for_vetting") { + t.Fatalf("idle host got reboot cmd: %s", body) + } + if strings.Contains(body, `"cmd"`) { + t.Fatalf("idle response should omit cmd, got: %s", body) + } +} + +func TestUIHeartbeat_CompletedRunIsIdle(t *testing.T) { + ui, hosts, runs := setupHeartbeatWithRunner(t) + ctx := context.Background() + hostID, err := hosts.Create(ctx, model.Host{ + Name: "hb-done", + MAC: "aa:bb:cc:dd:ee:23", + WoLBroadcastIP: "10.0.0.255", + WoLPort: 9, + ExpectedSpecYAML: "memory:\n total_gib: 16\n", + }) + if err != nil { + t.Fatalf("create host: %v", err) + } + runID, err := runs.Create(ctx, hostID, "deadbeef") + if err != nil { + t.Fatalf("create run: %v", err) + } + if err := runs.SetState(ctx, runID, model.StateCompleted); err != nil { + t.Fatalf("set state: %v", err) + } + rr := httptest.NewRecorder() + ui.Heartbeat(rr, heartbeatReq("aa:bb:cc:dd:ee:23")) + body := rr.Body.String() + if strings.Contains(body, "reboot_for_vetting") { + t.Fatalf("completed run returned reboot cmd: %s", body) + } +} diff --git a/internal/api/ui_handlers.go b/internal/api/ui_handlers.go index d8c0687..74ccbc8 100644 --- a/internal/api/ui_handlers.go +++ b/internal/api/ui_handlers.go @@ -1,6 +1,7 @@ package api import ( + "context" "encoding/json" "errors" "log" @@ -243,8 +244,10 @@ func (u *UI) CreateHostJSON(w http.ResponseWriter, r *http.Request) { // Heartbeat is called every ~30s by a host-mode vetting-agent running // as a systemd service on the registered host. LAN-trusted, no auth — -// same threat model as the browser UI and quick-register. Phase 1 -// just stamps last_seen_at and flips the dashboard tile to "online". +// same threat model as the browser UI and quick-register. Stamps +// last_seen_at, flips the dashboard tile to "online", and — if the +// operator has clicked Start vetting since the last heartbeat — replies +// with cmd=reboot_for_vetting so the host boots into PXE without WoL. func (u *UI) Heartbeat(w http.ResponseWriter, r *http.Request) { mac := strings.ToLower(strings.TrimSpace(chi.URLParam(r, "mac"))) if !macRe.MatchString(mac) { @@ -268,10 +271,69 @@ func (u *UI) Heartbeat(w http.ResponseWriter, r *http.Request) { if u.Runner != nil { u.Runner.PublishTileUpdate(r.Context(), host.ID) } + cmd, runID := u.pickHostCommand(r.Context(), host.ID) + resp := heartbeatResponse{OK: true, Cmd: cmd, RunID: runID} w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + _ = json.NewEncoder(w).Encode(resp) } +// heartbeatResponse is the JSON the host-mode agent decodes on every +// heartbeat. `cmd` is "" (omitted) in the idle case so the wire shape +// stays `{"ok": true}` when nothing is happening. +type heartbeatResponse struct { + OK bool `json:"ok"` + Cmd string `json:"cmd,omitempty"` + RunID int64 `json:"run_id,omitempty"` +} + +// pickHostCommand decides what the host-mode agent should do on the +// back of this heartbeat. Returns ("", 0) when there's nothing to do. +// +// - Queued run → Transition(Dispatched) and tell the agent to reboot. +// The dispatcher would have WoL'd it anyway; we beat it to the +// punch so the host skips the WoL dance. +// - WaitingWoL run created <10min ago → also return reboot, covering +// "host crashed mid-reboot, systemd brought the reporter back". +// - anything else → idle. +func (u *UI) pickHostCommand(ctx context.Context, hostID int64) (string, int64) { + if u.Runs == nil || u.Runner == nil { + return "", 0 + } + run, err := u.Runs.LatestForHost(ctx, hostID) + if err != nil { + log.Printf("heartbeat: latest run for host %d: %v", hostID, err) + return "", 0 + } + if run == nil { + return "", 0 + } + switch run.State { + case model.StateQueued: + if _, err := u.Runner.Transition(ctx, run.ID, orchestrator.TriggerDispatched); err != nil { + // Benign race with the dispatcher's own 2s poll — the + // state machine refuses the second transition; we just + // log and return idle so the agent doesn't reboot on a + // run that another path is already driving. + log.Printf("heartbeat: transition run %d: %v", run.ID, err) + return "", 0 + } + log.Printf("heartbeat: dispatched run %d for host %d via heartbeat (no WoL)", run.ID, hostID) + return cmdRebootForVetting, run.ID + case model.StateWaitingWoL: + // Tolerate a crashed-mid-reboot retry: the reporter is the + // only thing that could be telling us about this host right + // now, and WoL is only the fallback anyway. Bound it so a + // perpetually-broken PXE doesn't reboot-loop the box. + if time.Since(run.StartedAt) < 10*time.Minute { + return cmdRebootForVetting, run.ID + } + return "", 0 + } + return "", 0 +} + +const cmdRebootForVetting = "reboot_for_vetting" + func writeJSONError(w http.ResponseWriter, status int, msg string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status)