package store import ( "context" "database/sql" "errors" "fmt" "time" "vetting/internal/model" ) type Runs struct { DB *sql.DB } func (r *Runs) Create(ctx context.Context, hostID int64, tokenHash string, nonDestructive bool) (int64, error) { now := time.Now().UTC() nd := 0 if nonDestructive { nd = 1 } res, err := r.DB.ExecContext(ctx, ` INSERT INTO runs(host_id, state, agent_token_hash, next_boot_target, started_at, non_destructive) VALUES(?,?,?,?,?,?) `, hostID, string(model.StateQueued), tokenHash, "linux", now, nd) if err != nil { return 0, fmt.Errorf("insert run: %w", err) } return res.LastInsertId() } func (r *Runs) SetState(ctx context.Context, runID int64, state model.RunState) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET state = ? WHERE id = ?`, string(state), runID) return err } // RotateTokenHash replaces the stored token hash. Called on each iPXE // fetch so only the most-recently-booted agent can claim the run. func (r *Runs) RotateTokenHash(ctx context.Context, runID int64, hash string) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET agent_token_hash = ? WHERE id = ?`, hash, runID) return err } // SetHoldIP records the agent's LAN IP so the UI can show the ssh // command. Called when the agent POSTs /hold. func (r *Runs) SetHoldIP(ctx context.Context, runID int64, ip string) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET hold_ip = ? WHERE id = ?`, ip, runID) return err } // SetFailedStage records which stage tripped the run; used by the tile // and by reports. Does not change state. func (r *Runs) SetFailedStage(ctx context.Context, runID int64, stage string) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET failed_stage = ? WHERE id = ?`, stage, runID) return err } // ClearFailedStage wipes the failed_stage marker. Called when the // operator overrides a stage and the run re-enters the pipeline. func (r *Runs) ClearFailedStage(ctx context.Context, runID int64) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET failed_stage = NULL WHERE id = ?`, runID) return err } // SetOverrideFlags persists the operator's override decisions (JSON blob // like `{"wipe":true}`). Passed back to the agent on the next heartbeat // so it can resume the held stage with the gate bypassed. func (r *Runs) SetOverrideFlags(ctx context.Context, runID int64, flagsJSON string) error { _, err := r.DB.ExecContext(ctx, `UPDATE runs SET override_flags_json = ? WHERE id = ?`, flagsJSON, runID) return err } func (r *Runs) MarkFailed(ctx context.Context, runID int64, failedStage, holdIP string) error { now := time.Now().UTC() _, err := r.DB.ExecContext(ctx, ` UPDATE runs SET state = ?, result = 'fail', failed_stage = ?, hold_ip = ?, completed_at = ? WHERE id = ? `, string(model.StateFailedHolding), failedStage, holdIP, now, runID) return err } // MarkDispatchFailed records a terminal failure discovered before the run // ever reached a live image, e.g. the dispatcher refused to start because // the host isn't heartbeating. Goes to StateFailed (not FailedHolding) // because there's no live image to ssh into. func (r *Runs) MarkDispatchFailed(ctx context.Context, runID int64, failedStage, result string) error { now := time.Now().UTC() _, err := r.DB.ExecContext(ctx, ` UPDATE runs SET state = ?, result = ?, failed_stage = ?, completed_at = ? WHERE id = ? `, string(model.StateFailed), result, failedStage, now, runID) return err } func (r *Runs) MarkCompleted(ctx context.Context, runID int64, reportPath string) error { now := time.Now().UTC() _, err := r.DB.ExecContext(ctx, ` UPDATE runs SET state = ?, result = 'pass', report_path = ?, completed_at = ? WHERE id = ? `, string(model.StateCompleted), reportPath, now, runID) return err } func (r *Runs) Get(ctx context.Context, id int64) (*model.Run, error) { row := r.DB.QueryRowContext(ctx, ` SELECT id, host_id, state, COALESCE(result,''), COALESCE(failed_stage,''), COALESCE(next_boot_target,''), agent_token_hash, started_at, completed_at, COALESCE(report_path,''), COALESCE(hold_ip,''), COALESCE(override_flags_json,''), COALESCE(non_destructive,0) FROM runs WHERE id = ? `, id) var run model.Run var completedAt sql.NullTime err := row.Scan(&run.ID, &run.HostID, &run.State, &run.Result, &run.FailedStage, &run.NextBootTarget, &run.AgentTokenHash, &run.StartedAt, &completedAt, &run.ReportPath, &run.HoldIP, &run.OverrideFlagsJSON, &run.NonDestructive) if errors.Is(err, sql.ErrNoRows) { return nil, ErrNotFound } if err != nil { return nil, fmt.Errorf("get run: %w", err) } if completedAt.Valid { run.CompletedAt = &completedAt.Time } return &run, nil } // LatestForHost returns the most recent run for a host, or nil if none. func (r *Runs) LatestForHost(ctx context.Context, hostID int64) (*model.Run, error) { row := r.DB.QueryRowContext(ctx, ` SELECT id, host_id, state, COALESCE(result,''), COALESCE(failed_stage,''), COALESCE(next_boot_target,''), agent_token_hash, started_at, completed_at, COALESCE(report_path,''), COALESCE(hold_ip,''), COALESCE(override_flags_json,''), COALESCE(non_destructive,0) FROM runs WHERE host_id = ? ORDER BY id DESC LIMIT 1 `, hostID) var run model.Run var completedAt sql.NullTime err := row.Scan(&run.ID, &run.HostID, &run.State, &run.Result, &run.FailedStage, &run.NextBootTarget, &run.AgentTokenHash, &run.StartedAt, &completedAt, &run.ReportPath, &run.HoldIP, &run.OverrideFlagsJSON, &run.NonDestructive) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, fmt.Errorf("latest run: %w", err) } if completedAt.Valid { run.CompletedAt = &completedAt.Time } return &run, nil } // ListForHost returns the most recent `limit` runs for a host, newest // first. Zero/negative limit falls back to a safe cap so a mistaken call // can't scan the whole history into memory. func (r *Runs) ListForHost(ctx context.Context, hostID int64, limit int) ([]model.Run, error) { if limit <= 0 { limit = 20 } rows, err := r.DB.QueryContext(ctx, ` SELECT id, host_id, state, COALESCE(result,''), COALESCE(failed_stage,''), COALESCE(next_boot_target,''), agent_token_hash, started_at, completed_at, COALESCE(report_path,''), COALESCE(hold_ip,''), COALESCE(override_flags_json,''), COALESCE(non_destructive,0) FROM runs WHERE host_id = ? ORDER BY id DESC LIMIT ? `, hostID, limit) if err != nil { return nil, err } defer rows.Close() var out []model.Run for rows.Next() { var run model.Run var completedAt sql.NullTime if err := rows.Scan(&run.ID, &run.HostID, &run.State, &run.Result, &run.FailedStage, &run.NextBootTarget, &run.AgentTokenHash, &run.StartedAt, &completedAt, &run.ReportPath, &run.HoldIP, &run.OverrideFlagsJSON, &run.NonDestructive); err != nil { return nil, err } if completedAt.Valid { run.CompletedAt = &completedAt.Time } out = append(out, run) } return out, rows.Err() } // ListForHostAll returns every run for a host, newest first. Caps at a // defensive 1000 rows so a runaway host that somehow accumulated tens // of thousands of runs doesn't blow up the page load; typical hosts // finish with < 50. func (r *Runs) ListForHostAll(ctx context.Context, hostID int64) ([]model.Run, error) { return r.ListForHost(ctx, hostID, 1000) } // Active returns all runs in non-terminal states. func (r *Runs) Active(ctx context.Context) ([]model.Run, error) { rows, err := r.DB.QueryContext(ctx, ` SELECT id, host_id, state, COALESCE(result,''), COALESCE(failed_stage,''), COALESCE(next_boot_target,''), agent_token_hash, started_at, completed_at, COALESCE(report_path,''), COALESCE(hold_ip,''), COALESCE(override_flags_json,''), COALESCE(non_destructive,0) FROM runs WHERE state NOT IN ('Completed','Released','Cancelled') ORDER BY id `) if err != nil { return nil, err } defer rows.Close() var out []model.Run for rows.Next() { var run model.Run var completedAt sql.NullTime if err := rows.Scan(&run.ID, &run.HostID, &run.State, &run.Result, &run.FailedStage, &run.NextBootTarget, &run.AgentTokenHash, &run.StartedAt, &completedAt, &run.ReportPath, &run.HoldIP, &run.OverrideFlagsJSON, &run.NonDestructive); err != nil { return nil, err } if completedAt.Valid { run.CompletedAt = &completedAt.Time } out = append(out, run) } return out, rows.Err() } // CompletedOlderThan returns run IDs for terminal (Completed/Released/ // FailedHolding) runs whose completed_at is older than cutoff. Runs with // a NULL completed_at fall back to started_at so a stuck run doesn't get // garbage-collected out from under its own logs. Used by the janitor. func (r *Runs) CompletedOlderThan(ctx context.Context, cutoff time.Time) ([]int64, error) { rows, err := r.DB.QueryContext(ctx, ` SELECT id FROM runs WHERE state IN ('Completed','Released','FailedHolding','Cancelled') AND COALESCE(completed_at, started_at) < ? ORDER BY id `, cutoff) if err != nil { return nil, err } defer rows.Close() var out []int64 for rows.Next() { var id int64 if err := rows.Scan(&id); err != nil { return nil, err } out = append(out, id) } return out, rows.Err() } // FindByMAC returns the current active run for the host with the given MAC, // or nil if the MAC is unknown or has no active run. func (r *Runs) FindActiveByMAC(ctx context.Context, mac string) (*model.Run, error) { row := r.DB.QueryRowContext(ctx, ` SELECT r.id, r.host_id, r.state, COALESCE(r.result,''), COALESCE(r.failed_stage,''), COALESCE(r.next_boot_target,''), r.agent_token_hash, r.started_at, r.completed_at, COALESCE(r.report_path,''), COALESCE(r.hold_ip,''), COALESCE(r.override_flags_json,''), COALESCE(r.non_destructive,0) FROM runs r JOIN hosts h ON h.id = r.host_id WHERE h.mac = ? AND r.state NOT IN ('Completed','Released','Cancelled') ORDER BY r.id DESC LIMIT 1 `, mac) var run model.Run var completedAt sql.NullTime err := row.Scan(&run.ID, &run.HostID, &run.State, &run.Result, &run.FailedStage, &run.NextBootTarget, &run.AgentTokenHash, &run.StartedAt, &completedAt, &run.ReportPath, &run.HoldIP, &run.OverrideFlagsJSON, &run.NonDestructive) if errors.Is(err, sql.ErrNoRows) { return nil, nil } if err != nil { return nil, err } if completedAt.Valid { run.CompletedAt = &completedAt.Time } return &run, nil }