Files
Vetting/internal/orchestrator/dispatcher_test.go
T
josh 4524ab8dc0
CI / Lint + build + test (push) Successful in 2m5s
Release / release (push) Successful in 3m5s
runs: add non-destructive flag + operator Cancel button
Non-destructive pre-declares "don't touch the disks" on Start: the
Storage stage skips wipe-probe, badblocks -w, and write-mode fio,
and reports a read-only summary. Runs a new non_destructive column;
threaded through Claim → agent tests.Deps → Storage stage.

Cancel halts an in-flight run. The orchestrator transitions to a
new StateCancelled via TriggerOperatorCancelled (valid from any
active state); the agent's next heartbeat returns cmd=cancel_stage,
which fires a stored CancelFunc on the per-stage context. Stage
subprocesses spawned with exec.CommandContext die with the context,
the agent posts a cancelled outcome, then powers the host off.

Destructive stages mid-run may leave the host in an intermediate
state — the UI confirm dialog warns the operator; recovery is
manual for now.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 13:01:42 -04:00

199 lines
6.0 KiB
Go

package orchestrator
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"
"vetting/internal/db"
"vetting/internal/events"
"vetting/internal/logs"
"vetting/internal/model"
"vetting/internal/store"
)
// setupPickNext wires a real SQLite DB so pickNext can exercise the
// full Hosts/Runs/Runner path. Returns the dispatcher + seeded host ID +
// a cleanup. Host starts with a fresh heartbeat stamp so the default is
// "dispatch would succeed"; callers stale it out as needed.
func setupPickNext(t *testing.T) (*Dispatcher, *store.Hosts, *store.Runs, int64, func()) {
t.Helper()
conn, err := db.Open(filepath.Join(t.TempDir(), "vetting.db"))
if err != nil {
t.Fatalf("open db: %v", err)
}
hosts := &store.Hosts{DB: conn}
runs := &store.Runs{DB: conn}
stages := &store.Stages{DB: conn}
hub := events.NewHub()
runner := &Runner{Runs: runs, Hosts: hosts, Stages: stages, EventHub: hub}
logDir := t.TempDir()
lh, err := logs.NewHub(logDir, hub)
if err != nil {
t.Fatalf("NewHub: %v", err)
}
d := NewDispatcher(3, runs, hosts, runner, lh)
ctx := context.Background()
hostID, err := hosts.Create(ctx, model.Host{
Name: "pn-host",
MAC: "aa:bb:cc:dd:ee:50",
WoLBroadcastIP: "10.0.0.255",
WoLPort: 9,
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
})
if err != nil {
t.Fatalf("create host: %v", err)
}
// Default: heartbeating now.
if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC()); err != nil {
t.Fatalf("stamp: %v", err)
}
cleanup := func() {
lh.Close()
_ = conn.Close()
}
return d, hosts, runs, hostID, cleanup
}
// TestDispatcher_RunLogWritesToHub verifies the plumbing between the
// dispatcher and the per-run log hub: runLog must persist to the on-disk
// file so the detail page's replay + SSE fan-out see the same
// pre-stage diagnostics (picked / sent WoL / heartbeat).
func TestDispatcher_RunLogWritesToHub(t *testing.T) {
dir := t.TempDir()
ev := events.NewHub()
lh, err := logs.NewHub(dir, ev)
if err != nil {
t.Fatalf("NewHub: %v", err)
}
defer lh.Close()
d := &Dispatcher{Logs: lh}
d.runLog(7, "info", "dispatcher: sent WoL packet to aa:bb:cc:dd:ee:ff via 10.0.0.255:9")
body, err := os.ReadFile(filepath.Join(dir, "run-7.log"))
if err != nil {
t.Fatalf("read run log: %v", err)
}
if !strings.Contains(string(body), "dispatcher: sent WoL packet") {
t.Fatalf("run log missing dispatcher line: %q", body)
}
if !strings.Contains(string(body), "INFO") {
t.Fatalf("run log missing level: %q", body)
}
}
// TestDispatcher_RunLogNilHubDoesNotPanic: tests construct Dispatcher
// directly without a hub. runLog must degrade to stderr rather than
// panicking so the dispatcher loop stays alive.
func TestDispatcher_RunLogNilHubDoesNotPanic(t *testing.T) {
d := &Dispatcher{}
d.runLog(1, "info", "fallback path")
}
// TestDispatcher_TransitionsToWaitingRebootNoWoL: happy path. Host is
// heartbeating, run is Queued — one pickNext tick must transition to
// WaitingReboot via the new RebootCommanded trigger and log that the
// host is heartbeating. No "sent WoL packet" line allowed.
func TestDispatcher_TransitionsToWaitingRebootNoWoL(t *testing.T) {
d, _, runs, hostID, cleanup := setupPickNext(t)
defer cleanup()
ctx := context.Background()
runID, err := runs.Create(ctx, hostID, "deadbeef", false)
if err != nil {
t.Fatalf("create run: %v", err)
}
d.pickNext(ctx)
got, err := runs.Get(ctx, runID)
if err != nil {
t.Fatalf("get run: %v", err)
}
if got.State != model.StateWaitingReboot {
t.Fatalf("state = %s, want WaitingReboot", got.State)
}
body, err := os.ReadFile(filepath.Join(d.Logs.PathFor(runID))) //nolint:staticcheck
if err != nil {
t.Fatalf("read log: %v", err)
}
text := string(body)
if strings.Contains(text, "sent WoL packet") {
t.Fatalf("dispatcher should not fire WoL on heartbeating host: %s", text)
}
if !strings.Contains(text, "heartbeating") {
t.Fatalf("missing heartbeating log line: %s", text)
}
}
// TestDispatcher_FailsStaleHeartbeat: host hasn't heartbeat for >60s.
// Dispatcher must refuse, mark the run Failed with failed_stage=dispatch,
// and log at error level — not loop forever on an unreachable box.
func TestDispatcher_FailsStaleHeartbeat(t *testing.T) {
d, hosts, runs, hostID, cleanup := setupPickNext(t)
defer cleanup()
ctx := context.Background()
// Stale: 5m ago is well past the 60s cutoff.
if err := hosts.UpdateLastSeen(ctx, "aa:bb:cc:dd:ee:50", time.Now().UTC().Add(-5*time.Minute)); err != nil {
t.Fatalf("stamp stale: %v", err)
}
runID, err := runs.Create(ctx, hostID, "deadbeef", false)
if err != nil {
t.Fatalf("create run: %v", err)
}
d.pickNext(ctx)
got, err := runs.Get(ctx, runID)
if err != nil {
t.Fatalf("get run: %v", err)
}
if got.State != model.StateFailed {
t.Fatalf("state = %s, want Failed", got.State)
}
if got.FailedStage != "dispatch" {
t.Fatalf("failed_stage = %q, want dispatch", got.FailedStage)
}
body, _ := os.ReadFile(d.Logs.PathFor(runID))
if !strings.Contains(string(body), "quick.sh") {
t.Fatalf("expected quick.sh hint in run log: %s", body)
}
}
// TestDispatcher_FailsNeverSeenHost mirrors the stale-heartbeat test for
// a host that has never heartbeated at all — LastSeenAt is NULL.
func TestDispatcher_FailsNeverSeenHost(t *testing.T) {
d, hosts, runs, _, cleanup := setupPickNext(t)
defer cleanup()
ctx := context.Background()
// Create a fresh host with no heartbeat stamp.
neverID, err := hosts.Create(ctx, model.Host{
Name: "pn-never",
MAC: "aa:bb:cc:dd:ee:51",
WoLBroadcastIP: "10.0.0.255",
WoLPort: 9,
ExpectedSpecYAML: "memory:\n total_gib: 16\n",
})
if err != nil {
t.Fatalf("create host: %v", err)
}
runID, err := runs.Create(ctx, neverID, "deadbeef", false)
if err != nil {
t.Fatalf("create run: %v", err)
}
d.pickNext(ctx)
got, err := runs.Get(ctx, runID)
if err != nil {
t.Fatalf("get run: %v", err)
}
if got.State != model.StateFailed {
t.Fatalf("state = %s, want Failed", got.State)
}
}