Files
josh 3656af9823
CI / Lint + build + test (push) Successful in 1m47s
Release / release (push) Successful in 10m8s
feat(end-of-run): reboot to local disk instead of powering off
Completed runs now reboot the host and fall through iPXE to the next
boot device (local disk) instead of powering off. Three coordinated
changes:

- pxe/ipxe: NoActiveRunScript exits iPXE (drops to next boot entry)
  instead of `sleep 10; poweroff`. Without this, a Completed reboot
  just loops through PXE and gets told to poweroff.
- api/agent_handlers: heartbeat returns cmd=reboot (was cmd=shutdown)
  when the run reaches Completed.
- agent/runner: runs `systemctl reboot` (with `shutdown -r now`
  fallback) in response to cmd=reboot.

Operator cancel still powers off — powerOffAndReturn is unchanged
because a cancel means the operator wants the host idle so they can
walk up to it, not back in rotation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 22:45:11 -04:00

763 lines
23 KiB
Go

// Package agent implements the in-live-image control loop.
//
// Phase 4 scope: after /claim, the agent walks through every stage the
// orchestrator advertises, dispatching on the stage name to a function
// in agent/tests. Each stage posts a /result; the response carries the
// orchestrator's next_state, which the loop uses to pick the next
// stage. Stages the orchestrator owns (SpecValidate, Reporting) resolve
// server-side inside /result so the agent never sees them as "its turn".
//
// Terminal states:
// - FailedHolding → request hold key, install authorized_keys, wait
// on heartbeats for a retry_stage directive.
// - Completed → heartbeat carries cmd=reboot; agent runs
// `systemctl reboot` and exits. The host comes back through iPXE,
// finds no active run, and exits iPXE into the next boot device.
//
// Thermal sidecar runs from the moment the agent claims until ctx
// cancel; it posts a handful of /sys/class/hwmon samples every 5s.
package agent
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"vetting/agent/bootstate"
"vetting/agent/probes"
"vetting/agent/tests"
"vetting/internal/spec"
)
// stageCancel holds the cancel func for the in-flight stage ctx so the
// heartbeat loop can fire it when the orchestrator returns
// cmd=cancel_stage. Stored as an atomic.Value so the heartbeat goroutine
// can read without locking; writes happen only on the main loop.
var stageCancel atomic.Value // context.CancelFunc
// Run is the long-lived entry point. It blocks until ctx is cancelled
// or a fatal error makes progress impossible.
func Run(ctx context.Context, p *bootstate.Params) error {
c := NewClient(p.OrchestratorURL, p.RunID, p.Token, p.TLSCertFPR)
fwd := newLogForwarder(ctx, c)
defer fwd.close()
ip := localIP()
fwd.info(fmt.Sprintf("agent starting on %s (run=%d mac=%s)", ip, p.RunID, p.MAC))
if err := callWithBackoff(ctx, "hello", func(ctx context.Context) error {
return c.Hello(ctx)
}); err != nil {
fwd.warn("hello never succeeded: " + err.Error())
}
var claim *ClaimResponse
if err := callWithBackoff(ctx, "claim", func(ctx context.Context) error {
r, err := c.Claim(ctx, ip)
if err != nil {
return err
}
claim = r
return nil
}); err != nil {
return err
}
fwd.info(fmt.Sprintf("claimed run; stages=%v current_state=%s", claim.Stages, claim.CurrentState))
mux := NewSensorMux(ctx, c)
defer mux.Close()
go thermalSidecar(ctx, mux, fwd)
hbCh := make(chan HeartbeatResponse, 4)
go heartbeatLoop(ctx, c, fwd, hbCh)
// Run every stage the orchestrator advertises. Stages owned by the
// orchestrator (SpecValidate, Reporting) resolve inside /result and
// flip next_state forward past themselves, so they simply never match
// our dispatch table.
//
// Start stage comes from claim.CurrentState so a re-claim after an
// agent crash resumes at the stage the run was parked at, instead of
// blindly replaying Inventory and letting the orchestrator silently
// advance past the crashed stage (the Orion OOM bug). A fresh claim
// naturally lands on InventoryCheck, which maps back to "Inventory".
nextStage := stageForState(claim.CurrentState)
if nextStage == "" {
nextStage = "Inventory"
}
if nextStage != "Inventory" {
fwd.warn(fmt.Sprintf("resuming mid-pipeline at %s (claim current_state=%s) — likely agent restart after crash",
nextStage, claim.CurrentState))
}
for nextStage != "" {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fwd.info("stage: starting " + nextStage)
outcome := runStageCancellable(ctx, nextStage, claim, fwd, c, mux, overrideFlags{})
if outcome.Cancelled {
fwd.warn("stage cancelled by operator; posting result and exiting")
_, _ = postResult(ctx, c, nextStage, outcome)
return powerOffAndReturn(fwd)
}
resp, err := postResult(ctx, c, nextStage, outcome)
if err != nil {
fwd.error("submit result for " + nextStage + ": " + err.Error())
return err
}
fwd.info(fmt.Sprintf("stage %s → next_state=%s", nextStage, resp.NextState))
if resp.NextState == "FailedHolding" {
if err := requestHold(ctx, c, fwd); err != nil {
return err
}
// Park and wait for an override directive.
return waitForOverride(ctx, c, fwd, mux, hbCh, claim)
}
if resp.NextState == "Completed" || resp.NextState == "" {
fwd.info("pipeline complete")
<-ctx.Done()
return ctx.Err()
}
nextStage = stageForState(resp.NextState)
if nextStage == "" {
// next_state is something we don't map (e.g. SpecValidate — but
// the orchestrator's /result already resolved it and handed us
// back a further-along state). Defensive bail so we don't loop.
fwd.warn("no stage maps to state " + resp.NextState + "; parking")
<-ctx.Done()
return ctx.Err()
}
}
<-ctx.Done()
return ctx.Err()
}
// runStage dispatches on stage name. The Inventory stage is special —
// it runs the inventory probe and passes the result as the /result body
// (the orchestrator persists it as an artifact). Every other stage
// returns a tests.Outcome which postResult marshals generically.
func runStage(ctx context.Context, stage string, claim *ClaimResponse, fwd *logForwarder, c *Client, mux *SensorMux, ovr overrideFlags) stageOutcome {
fwd.SetStage(stage)
defer fwd.ClearStage()
deps := newDeps(ctx, c, fwd, mux, ovr, claim, stage)
switch stage {
case "Inventory":
fwd.info("Inventory: probing host hardware")
log := probes.Logger{Info: fwd.info, Warn: fwd.warn}
inv := &spec.Inventory{}
var subs []tests.SubStepReport
runSub := func(name string, fn func()) {
start := time.Now()
fn()
subs = append(subs, tests.SubStepReport{
Name: name,
Passed: true,
StartedAt: start,
CompletedAt: time.Now(),
})
}
runSub("CPU", func() { inv.CPU = probes.CPU(log) })
runSub("Memory", func() { inv.Memory = probes.Memory(log) })
runSub("Disks", func() { inv.Disks = probes.Disks(log) })
runSub("NICs", func() { inv.NICs = probes.NICs(log) })
runSub("GPUs", func() { inv.GPUs = probes.GPUs(log) })
runSub("System", func() { inv.System = probes.System(log) })
runSub("Baseboard", func() { inv.Baseboard = probes.Baseboard(log) })
runSub("PSU", func() { inv.PSU = probes.PSU(log) })
runSub("OS", func() { inv.OS = probes.OS(log) })
summary := inventorySummary(inv)
fwd.info("Inventory: " + summary)
return stageOutcome{
Outcome: tests.Outcome{
Passed: true,
Summary: summary,
SubSteps: subs,
},
Inventory: inv,
}
case "Firmware":
fwd.info("Firmware: probing firmware versions")
snaps, warns := probes.Firmware(ctx)
for _, w := range warns {
fwd.warn(w)
}
summary := firmwareSummary(snaps)
fwd.info("Firmware: " + summary)
return stageOutcome{
Outcome: tests.Outcome{
Passed: true,
Summary: summary,
Extras: map[string]any{
"warnings": warns,
"snapshots": len(snaps),
},
},
Firmware: snaps,
}
case "SMART":
return stageOutcome{Outcome: tests.SMART(ctx, deps)}
case "CPUStress":
return stageOutcome{Outcome: tests.CPUStress(ctx, deps)}
case "Storage":
return stageOutcome{Outcome: tests.Storage(ctx, deps)}
case "Network":
duration := deps.NetworkKnobs.Duration
if duration <= 0 {
duration = 10 * time.Second
}
return stageOutcome{Outcome: tests.Network(ctx, deps, tests.NetworkConfig{
OrchestratorURL: c.BaseURL,
IperfPort: claim.IperfPort,
Duration: duration,
})}
case "Burn":
return stageOutcome{Outcome: tests.Burn(ctx, deps, tests.BurnConfig{
OrchestratorURL: c.BaseURL,
IperfPort: claim.IperfPort,
})}
case "GPU":
return stageOutcome{Outcome: tests.GPU(ctx, deps)}
case "PSU":
return stageOutcome{Outcome: tests.PSU(ctx, deps)}
}
return stageOutcome{Outcome: tests.Outcome{
Passed: false,
Message: "unknown stage " + stage,
}}
}
type stageOutcome struct {
Outcome tests.Outcome
Inventory *spec.Inventory // only for Inventory stage
Firmware []probes.FirmwareSnapshot // only for Firmware stage
Cancelled bool // set when the stage was cut short by operator cancel
}
// runStageCancellable wraps runStage in a per-stage context so the
// heartbeat loop's cancel_stage directive can kill whatever subprocess
// is currently running. If the derived context was cancelled while the
// stage executed, the outcome is rewritten as a cancellation record so
// the orchestrator has something to persist.
func runStageCancellable(parent context.Context, stage string, claim *ClaimResponse, fwd *logForwarder, c *Client, mux *SensorMux, ovr overrideFlags) stageOutcome {
stageCtx, cancel := context.WithCancel(parent)
stageCancel.Store(cancel)
defer func() {
cancel()
stageCancel.Store(context.CancelFunc(nil))
}()
out := runStage(stageCtx, stage, claim, fwd, c, mux, ovr)
// If the parent is still live but the stage ctx was cancelled, the
// operator fired a cancel — mark the outcome so the caller can exit
// the pipeline cleanly. Plain ctx-cancel on ctx.Done (e.g. shutdown)
// is handled elsewhere by the main loop's select.
if parent.Err() == nil && stageCtx.Err() != nil {
out.Cancelled = true
out.Outcome.Passed = false
if out.Outcome.Message == "" {
out.Outcome.Message = "stage cancelled by operator"
}
out.Outcome.Summary = "cancelled"
}
return out
}
// powerOffAndReturn shuts the host down after an operator cancel. Same
// best-effort poweroff path as the shutdown heartbeat cmd.
func powerOffAndReturn(fwd *logForwarder) error {
fwd.info("cancel: powering off host")
if err := exec.Command("systemctl", "poweroff").Run(); err != nil {
fwd.warn("systemctl poweroff failed: " + err.Error())
_ = exec.Command("shutdown", "-h", "now").Run()
}
return nil
}
type overrideFlags struct {
Wipe bool `json:"wipe"`
}
func newDeps(ctx context.Context, c *Client, fwd *logForwarder, mux *SensorMux, ovr overrideFlags, claim *ClaimResponse, stage string) tests.Deps {
var expected []tests.ExpectedDisk
for _, e := range claim.ExpectedDisks {
expected = append(expected, tests.ExpectedDisk{Serial: e.Serial, SizeGB: e.SizeGB})
}
return tests.Deps{
Info: fwd.info,
Warn: fwd.warn,
Error: fwd.error,
OverrideWipe: ovr.Wipe,
NonDestructive: claim.NonDestructive,
ExpectedDisks: expected,
StageTimeout: stageTimeout(claim, stage),
CPUStressKnobs: tests.CPUStressKnobs{
CPUPass: parseDur(claim.StageConfig.CPUStress.CPUPass),
MemPass: parseDur(claim.StageConfig.CPUStress.MemPass),
EDACPoll: parseDur(claim.StageConfig.CPUStress.EDACPoll),
},
StorageKnobs: tests.StorageKnobs{
Mode: claim.StageConfig.Storage.Mode,
FioSize: claim.StageConfig.Storage.FioSize,
FioTime: parseDur(claim.StageConfig.Storage.FioTime),
FioBS: claim.StageConfig.Storage.FioBS,
FioRW: claim.StageConfig.Storage.FioRW,
Verify: claim.StageConfig.Storage.Verify,
},
NetworkKnobs: tests.NetworkKnobs{
Duration: parseDur(claim.StageConfig.Network.Duration),
},
BurnKnobs: tests.BurnKnobs{
Duration: parseDur(claim.StageConfig.Burn.Duration),
CPUWorkers: claim.StageConfig.Burn.CPUWorkers,
MemPct: claim.StageConfig.Burn.MemPct,
FioOnSpare: claim.StageConfig.Burn.FioOnSpare,
IperfParallel: claim.StageConfig.Burn.IperfParallel,
},
Sensor: func(_ context.Context, samples []tests.Sample) error {
out := make([]SensorSample, 0, len(samples))
for _, s := range samples {
out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit})
}
mux.Send(out)
return nil
},
}
}
// stageTimeout reads claim.StageConfig.StageTimeouts[stage] and falls
// back to 2 minutes (the pre-Phase-2 default). Malformed entries log and
// fall back — we'd rather run the stage than refuse on a typo.
func stageTimeout(claim *ClaimResponse, stage string) time.Duration {
if claim == nil || claim.StageConfig.StageTimeouts == nil {
return 2 * time.Minute
}
raw, ok := claim.StageConfig.StageTimeouts[stage]
if !ok || raw == "" {
return 2 * time.Minute
}
d, err := time.ParseDuration(raw)
if err != nil || d <= 0 {
return 2 * time.Minute
}
return d
}
// parseDur is the permissive duration parser for the knob wire shape.
// Empty strings / parse failures yield 0 so callers can treat a zero
// value as "use the compile-time default" without a nil-check dance.
func parseDur(s string) time.Duration {
if s == "" {
return 0
}
d, err := time.ParseDuration(s)
if err != nil || d < 0 {
return 0
}
return d
}
// postResult marshals stageOutcome for the /result endpoint. The
// Inventory shape is special-cased: it includes the inventory blob so
// the orchestrator can persist it and run server-side spec diff.
func postResult(ctx context.Context, c *Client, stage string, s stageOutcome) (*ResultResponse, error) {
summary, _ := s.Outcome.MarshalSummary()
body := map[string]any{
"stage": stage,
"passed": s.Outcome.Passed,
}
if len(summary) > 2 {
body["summary"] = json.RawMessage(summary)
}
if s.Outcome.Message != "" {
body["message"] = s.Outcome.Message
}
if s.Inventory != nil {
body["inventory"] = s.Inventory
}
if len(s.Firmware) > 0 {
body["firmware"] = s.Firmware
}
if len(s.Outcome.SubSteps) > 0 {
wire := make([]SubStepReport, 0, len(s.Outcome.SubSteps))
for _, ss := range s.Outcome.SubSteps {
w := SubStepReport{
Name: ss.Name,
Passed: ss.Passed,
Skipped: ss.Skipped,
Summary: ss.SummaryJSON,
}
if !ss.StartedAt.IsZero() {
w.StartedAt = ss.StartedAt.UTC().Format(time.RFC3339Nano)
}
if !ss.CompletedAt.IsZero() {
w.CompletedAt = ss.CompletedAt.UTC().Format(time.RFC3339Nano)
}
wire = append(wire, w)
}
body["sub_steps"] = wire
}
return c.Result(ctx, body)
}
// stageForState maps a RunState string back to the stage executor name.
// Every stage-name is the same as its state except Inventory↔InventoryCheck.
func stageForState(state string) string {
switch state {
case "InventoryCheck":
return "Inventory"
case "Firmware", "SMART", "CPUStress", "Storage", "Network", "Burn", "GPU", "PSU":
return state
}
// SpecValidate and Reporting are orchestrator-owned; we never see
// them as next_state because /result resolves past them.
return ""
}
// waitForOverride parks the agent in FailedHolding. It listens for a
// heartbeat directive that tells it to retry a stage (e.g. Storage
// with wipe-override armed) and re-enters runStage from that point.
func waitForOverride(ctx context.Context, c *Client, fwd *logForwarder, mux *SensorMux, hb <-chan HeartbeatResponse, claim *ClaimResponse) error {
fwd.info("holding: awaiting operator decision (heartbeat directive or ctx cancel)")
for {
select {
case <-ctx.Done():
return ctx.Err()
case cmd, ok := <-hb:
if !ok {
return nil
}
if cmd.Cmd != "retry_stage" || cmd.Stage == "" {
continue
}
fwd.info("operator override: retrying stage " + cmd.Stage)
var ovr overrideFlags
if len(cmd.OverrideFlags) > 0 {
_ = json.Unmarshal(cmd.OverrideFlags, &ovr)
}
outcome := runStageCancellable(ctx, cmd.Stage, claim, fwd, c, mux, ovr)
if outcome.Cancelled {
fwd.warn("stage cancelled by operator; posting result and exiting")
_, _ = postResult(ctx, c, cmd.Stage, outcome)
return powerOffAndReturn(fwd)
}
resp, err := postResult(ctx, c, cmd.Stage, outcome)
if err != nil {
fwd.error("override: submit result: " + err.Error())
continue
}
fwd.info(fmt.Sprintf("override stage %s → next_state=%s", cmd.Stage, resp.NextState))
if resp.NextState == "FailedHolding" {
// Still broken; keep holding.
continue
}
if resp.NextState == "Completed" {
return nil
}
// Successful retry — continue walking the pipeline from the
// state the orchestrator advanced us into.
if nextStage := stageForState(resp.NextState); nextStage != "" {
for nextStage != "" {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
fwd.info("stage: starting " + nextStage)
out := runStageCancellable(ctx, nextStage, claim, fwd, c, mux, overrideFlags{})
if out.Cancelled {
fwd.warn("stage cancelled by operator; posting result and exiting")
_, _ = postResult(ctx, c, nextStage, out)
return powerOffAndReturn(fwd)
}
rr, err := postResult(ctx, c, nextStage, out)
if err != nil {
return err
}
if rr.NextState == "FailedHolding" || rr.NextState == "Completed" || rr.NextState == "" {
return nil
}
nextStage = stageForState(rr.NextState)
}
}
return nil
}
}
}
// requestHold fetches the per-run pubkey and installs it into
// /root/.ssh/authorized_keys so the operator can SSH in.
func requestHold(ctx context.Context, c *Client, fwd *logForwarder) error {
fwd.warn("entering FailedHolding; requesting hold key")
resp, err := c.Hold(ctx, localIP())
if err != nil {
fwd.error("hold request failed: " + err.Error())
return err
}
authPath := "/root/.ssh/authorized_keys"
if err := os.MkdirAll(filepath.Dir(authPath), 0o700); err != nil {
fwd.error("mkdir .ssh: " + err.Error())
return err
}
f, err := os.OpenFile(authPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
fwd.error("open authorized_keys: " + err.Error())
return err
}
defer func() { _ = f.Close() }()
if _, err := fmt.Fprintln(f, resp.AuthorizedKey); err != nil {
fwd.error("write authorized_keys: " + err.Error())
return err
}
fwd.info("hold key installed; SSH is available to root@" + localIP())
return nil
}
func inventorySummary(inv *spec.Inventory) string {
populated := 0
for _, d := range inv.Memory.Modules {
if d.Populated {
populated++
}
}
slots := ""
if len(inv.Memory.Modules) > 0 {
slots = fmt.Sprintf(" (%d/%d slots)", populated, len(inv.Memory.Modules))
}
return fmt.Sprintf("cpu=%q cores=%d ram=%dGiB%s disks=%d nics=%d gpus=%d psu=%d",
inv.CPU.Model, inv.CPU.LogicalCores, inv.Memory.TotalGiB, slots,
len(inv.Disks), len(inv.NICs), len(inv.GPUs), len(inv.PSU))
}
// firmwareSummary renders the one-liner surfaced in the stage tile:
// per-component counts so an operator can see "bios=1 nic=2 nvme_fw=1"
// without opening the report.
func firmwareSummary(snaps []probes.FirmwareSnapshot) string {
counts := map[string]int{}
for _, s := range snaps {
counts[s.Component]++
}
if len(counts) == 0 {
return "no firmware readable"
}
keys := []string{"bios", "bmc", "nic", "hba", "nvme_fw", "microcode"}
parts := make([]string, 0, len(keys))
for _, k := range keys {
if n := counts[k]; n > 0 {
parts = append(parts, fmt.Sprintf("%s=%d", k, n))
}
}
return strings.Join(parts, " ")
}
// thermalSidecar posts a batch of /sys/class/hwmon samples every 5s.
// Idempotent: a dead sensor just drops out of the next batch. Errors
// are logged but never fatal — we'd rather have a run with partial
// thermal data than kill the agent over an I/O hiccup.
func thermalSidecar(ctx context.Context, mux *SensorMux, fwd *logForwarder) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
samples := probes.Thermals()
if len(samples) == 0 {
continue
}
out := make([]SensorSample, 0, len(samples))
for _, s := range samples {
out = append(out, SensorSample{Kind: s.Kind, Key: s.Key, Value: s.Value, Unit: s.Unit})
}
mux.Send(out)
}
}
}
func heartbeatLoop(ctx context.Context, c *Client, fwd *logForwarder, out chan<- HeartbeatResponse) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
hbCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
resp, err := c.Heartbeat(hbCtx)
cancel()
if err != nil {
fwd.warn("heartbeat error: " + err.Error())
continue
}
if resp.Cmd == "abort" {
fwd.warn("orchestrator said abort; stopping loop")
return
}
if resp.Cmd == "reboot" {
fwd.info("orchestrator said reboot; rebooting host")
// Best effort: systemd then sysvinit fallback. Either way,
// return so the agent process stops issuing heartbeats.
if err := exec.Command("systemctl", "reboot").Run(); err != nil {
fwd.warn("systemctl reboot failed: " + err.Error())
_ = exec.Command("shutdown", "-r", "now").Run()
}
return
}
if resp.Cmd == "cancel_stage" {
fwd.warn("orchestrator said cancel_stage; cancelling in-flight stage ctx")
if v := stageCancel.Load(); v != nil {
if fn, ok := v.(context.CancelFunc); ok && fn != nil {
fn()
}
}
continue
}
if resp.Cmd == "retry_stage" {
select {
case out <- *resp:
default:
}
}
}
}
}
func callWithBackoff(ctx context.Context, label string, f func(context.Context) error) error {
backoff := 2 * time.Second
for attempt := 1; ; attempt++ {
callCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
err := f(callCtx)
cancel()
if err == nil {
return nil
}
if attempt > 20 {
return err
}
log.Printf("agent: %s attempt %d failed: %v (retry in %s)", label, attempt, err, backoff)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
if backoff < 30*time.Second {
backoff *= 2
}
}
}
func localIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, a := range addrs {
ipnet, ok := a.(*net.IPNet)
if !ok || ipnet.IP.IsLoopback() {
continue
}
v4 := ipnet.IP.To4()
if v4 != nil {
return v4.String()
}
}
return ""
}
// ----- log forwarder -----------------------------------------------------
type logForwarder struct {
c *Client
mu sync.Mutex
buf []LogLine
stage string // set via SetStage; empties via ClearStage
wg sync.WaitGroup
cancel context.CancelFunc
}
func newLogForwarder(parent context.Context, c *Client) *logForwarder {
ctx, cancel := context.WithCancel(parent)
f := &logForwarder{c: c, cancel: cancel}
f.wg.Add(1)
go f.loop(ctx)
return f
}
func (f *logForwarder) loop(ctx context.Context) {
defer f.wg.Done()
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
f.flush()
return
case <-t.C:
f.flush()
}
}
}
func (f *logForwarder) push(level, text string) {
stamp := time.Now().UTC().Format(time.RFC3339Nano)
log.Printf("[%s] %s", level, text)
f.mu.Lock()
f.buf = append(f.buf, LogLine{TS: stamp, Level: level, Stage: f.stage, Text: text})
f.mu.Unlock()
}
func (f *logForwarder) info(s string) { f.push("info", s) }
func (f *logForwarder) warn(s string) { f.push("warn", s) }
func (f *logForwarder) error(s string) { f.push("error", s) }
// SetStage tags subsequent log lines with a stage name so the orchestrator
// can fan them out on a per-stage SSE event. Safe to call concurrently
// with push — we take the same mutex.
func (f *logForwarder) SetStage(stage string) {
f.mu.Lock()
f.stage = stage
f.mu.Unlock()
}
// ClearStage reverts to untagged (framing-level) logging. Defer this
// on entry to runStage so hold/override paths don't leak stage context.
func (f *logForwarder) ClearStage() {
f.mu.Lock()
f.stage = ""
f.mu.Unlock()
}
func (f *logForwarder) flush() {
f.mu.Lock()
if len(f.buf) == 0 {
f.mu.Unlock()
return
}
lines := f.buf
f.buf = nil
f.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := f.c.Log(ctx, lines); err != nil {
log.Printf("log forward failed: %v", err)
}
}
func (f *logForwarder) close() {
f.cancel()
f.wg.Wait()
}