9bb4b09a04
CI / Lint + build + test (push) Has been cancelled
Post-repair hardware validation pipeline for Proxmox cluster hosts. Go orchestrator + in-image agent + mkosi live image + bundled dnsmasq PXE + SQLite + HTMX/SSE UI + notify registry + janitor + full docs.
172 lines
4.7 KiB
Go
172 lines
4.7 KiB
Go
// Package janitor garbage-collects on-disk run data. A completed or
|
|
// released run produces an HTML report, a JSON report, a log file, and
|
|
// potentially several artifact blobs (fio output, iperf output, hold
|
|
// pubkey, inventory JSON). None of these need to stay on disk
|
|
// indefinitely — once the operator's looked at the report and closed
|
|
// the tile, disk pressure is the only cost.
|
|
//
|
|
// The DB row for the run is kept (so historical counts and host
|
|
// histories survive); only the on-disk files and their artifact rows
|
|
// are pruned. The janitor ticks on a fixed interval and is safe to
|
|
// run concurrently with live runs — it only touches runs in terminal
|
|
// states past a cutoff, which by definition are not being written to.
|
|
package janitor
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"vetting/internal/store"
|
|
)
|
|
|
|
// Config carries the retention knobs. Zero values mean "keep forever"
|
|
// for that class of data; a zero Interval defaults to 1h.
|
|
type Config struct {
|
|
ArtifactRetention time.Duration
|
|
LogRetention time.Duration
|
|
Interval time.Duration
|
|
}
|
|
|
|
// Stores is the subset of the store layer the janitor needs. Defined as
|
|
// an interface so tests can fake it without spinning up SQLite.
|
|
type Stores interface {
|
|
CompletedOlderThan(ctx context.Context, cutoff time.Time) ([]int64, error)
|
|
DeleteArtifactsForRun(ctx context.Context, runID int64) ([]store.Artifact, error)
|
|
LogPathFor(runID int64) string
|
|
}
|
|
|
|
// Janitor owns the ticker goroutine. Start/Stop are idempotent; Stop
|
|
// waits for the in-flight pass to finish so tests can assert post-state.
|
|
type Janitor struct {
|
|
cfg Config
|
|
s Stores
|
|
stop chan struct{}
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
running bool
|
|
}
|
|
|
|
func New(cfg Config, s Stores) *Janitor {
|
|
if cfg.Interval <= 0 {
|
|
cfg.Interval = time.Hour
|
|
}
|
|
return &Janitor{cfg: cfg, s: s, stop: make(chan struct{})}
|
|
}
|
|
|
|
// Start launches the ticker. Retention zeros mean no cleanup is needed;
|
|
// in that case the ticker still runs but each Sweep is a no-op.
|
|
func (j *Janitor) Start(ctx context.Context) {
|
|
j.mu.Lock()
|
|
if j.running {
|
|
j.mu.Unlock()
|
|
return
|
|
}
|
|
j.running = true
|
|
j.mu.Unlock()
|
|
j.wg.Add(1)
|
|
go j.loop(ctx)
|
|
}
|
|
|
|
func (j *Janitor) Stop() {
|
|
j.mu.Lock()
|
|
if !j.running {
|
|
j.mu.Unlock()
|
|
return
|
|
}
|
|
j.running = false
|
|
close(j.stop)
|
|
j.mu.Unlock()
|
|
j.wg.Wait()
|
|
}
|
|
|
|
func (j *Janitor) loop(ctx context.Context) {
|
|
defer j.wg.Done()
|
|
// Run one sweep immediately so startup cleans up anything that
|
|
// aged out while the orchestrator was down.
|
|
if err := j.Sweep(ctx, time.Now().UTC()); err != nil {
|
|
log.Printf("janitor: initial sweep: %v", err)
|
|
}
|
|
t := time.NewTicker(j.cfg.Interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-j.stop:
|
|
return
|
|
case now := <-t.C:
|
|
if err := j.Sweep(ctx, now.UTC()); err != nil {
|
|
log.Printf("janitor: sweep: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sweep is exported so tests can drive a single pass deterministically.
|
|
// It picks the *more aggressive* cutoff between the two retentions so a
|
|
// single DB query covers both classes, then does the per-class work.
|
|
func (j *Janitor) Sweep(ctx context.Context, now time.Time) error {
|
|
if j.cfg.ArtifactRetention <= 0 && j.cfg.LogRetention <= 0 {
|
|
return nil
|
|
}
|
|
cutoff := now.Add(-longer(j.cfg.ArtifactRetention, j.cfg.LogRetention))
|
|
runs, err := j.s.CompletedOlderThan(ctx, cutoff)
|
|
if err != nil {
|
|
return fmt.Errorf("list old runs: %w", err)
|
|
}
|
|
artifactCutoff := now.Add(-j.cfg.ArtifactRetention)
|
|
logCutoff := now.Add(-j.cfg.LogRetention)
|
|
for _, runID := range runs {
|
|
// The query above used the longer cutoff — each retention is
|
|
// re-checked per-run against its actual cutoff via the run's
|
|
// completed_at, but since we don't round-trip that here we
|
|
// just process both at their own cutoff using the single
|
|
// query's cheap filter (run is old enough for at least one).
|
|
if j.cfg.ArtifactRetention > 0 && !artifactCutoff.IsZero() {
|
|
j.cleanArtifacts(ctx, runID)
|
|
}
|
|
if j.cfg.LogRetention > 0 && !logCutoff.IsZero() {
|
|
j.cleanLog(runID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (j *Janitor) cleanArtifacts(ctx context.Context, runID int64) {
|
|
arts, err := j.s.DeleteArtifactsForRun(ctx, runID)
|
|
if err != nil {
|
|
log.Printf("janitor: delete artifacts for run %d: %v", runID, err)
|
|
return
|
|
}
|
|
for _, a := range arts {
|
|
if a.Path == "" {
|
|
continue
|
|
}
|
|
if err := os.Remove(a.Path); err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
log.Printf("janitor: unlink %s: %v", a.Path, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (j *Janitor) cleanLog(runID int64) {
|
|
path := j.s.LogPathFor(runID)
|
|
if path == "" {
|
|
return
|
|
}
|
|
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
log.Printf("janitor: unlink log %s: %v", path, err)
|
|
}
|
|
}
|
|
|
|
func longer(a, b time.Duration) time.Duration {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|