package main import ( "context" "crypto/tls" "errors" "flag" "log" "net/http" "os" "os/signal" "path/filepath" "syscall" "time" "vetting/internal/api" "vetting/internal/config" "vetting/internal/db" "vetting/internal/events" "vetting/internal/httpserver" "vetting/internal/janitor" "vetting/internal/logs" "vetting/internal/model" "vetting/internal/notify" "vetting/internal/orchestrator" "vetting/internal/pxe" "vetting/internal/store" "vetting/internal/web/templates" ) func main() { configPath := flag.String("config", "deploy/vetting.example.yaml", "path to vetting.yaml") flag.Parse() cfg, err := config.Load(*configPath) if err != nil { log.Fatalf("load config: %v", err) } for _, dir := range []string{ filepath.Dir(cfg.Database.Path), cfg.Artifacts.Dir, cfg.Logs.Dir, } { if err := os.MkdirAll(dir, 0o755); err != nil { log.Fatalf("mkdir %s: %v", dir, err) } } conn, err := db.Open(cfg.Database.Path) if err != nil { log.Fatalf("open db: %v", err) } defer func() { _ = conn.Close() }() hostStore := &store.Hosts{DB: conn} runStore := &store.Runs{DB: conn} stageStore := &store.Stages{DB: conn} artifactStore := &store.Artifacts{DB: conn} specDiffStore := &store.SpecDiffs{DB: conn} measurementStore := &store.Measurements{DB: conn} hub := events.NewHub() logHub, err := logs.NewHub(cfg.Logs.Dir, hub) if err != nil { log.Fatalf("logs hub: %v", err) } defer logHub.Close() runner := &orchestrator.Runner{ Runs: runStore, Hosts: hostStore, Stages: stageStore, EventHub: hub, } tiles := &api.TileEnricher{ Runs: runStore, Artifacts: artifactStore, SpecDiffs: specDiffStore, } // Inject a templ renderer so the Runner can publish tile-refresh // fragments via SSE without pulling web/templates into the // orchestrator package. The closure enriches the tile with spec- // diff count and hold-key path so every tile render shows the // same data, whether it came from /events or an initial page load. orchestrator.TileRenderer = func(ctx context.Context, host model.Host, latest *model.Run) string { return templates.RenderTileString(tiles.Build(ctx, host, latest)) } orchestrator.PipelineRenderer = templates.RenderPipelineString notifyReg, err := notify.BuildRegistry(cfg.Notifiers, cfg.Routes) if err != nil { log.Fatalf("notify: %v", err) } ui := &api.UI{ Hosts: hostStore, Runs: runStore, Stages: stageStore, SpecDiffs: specDiffStore, Artifacts: artifactStore, EventHub: hub, Logs: logHub, Runner: runner, Tiles: tiles, PublicURL: cfg.Server.PublicURL, } agentAPI := &api.Agent{ Hosts: hostStore, Runs: runStore, Stages: stageStore, Artifacts: artifactStore, SpecDiffs: specDiffStore, Measurements: measurementStore, Runner: runner, EventHub: hub, Logs: logHub, Notify: notifyReg, ArtifactsDir: cfg.Artifacts.Dir, OrchestratorURL: cfg.PXE.OrchestratorURL, PublicURL: cfg.Server.PublicURL, IperfPort: cfg.Network.IperfPort, } agentAPI.LiveKernelURL, agentAPI.LiveInitrdURL = pxe.BuildLiveURLs(cfg.PXE.OrchestratorURL) dispatcher := orchestrator.NewDispatcher(cfg.Dispatcher.MaxConcurrentRuns, runStore, hostStore, runner, logHub) iperfSup := orchestrator.NewIperfSupervisor(cfg.Network.IperfPort) janitorSvc := janitor.New(janitor.Config{ ArtifactRetention: time.Duration(cfg.Artifacts.RetentionDays) * 24 * time.Hour, LogRetention: time.Duration(cfg.Logs.RetentionDays) * 24 * time.Hour, Interval: time.Duration(cfg.Janitor.IntervalMinutes) * time.Minute, }, &janitor.StoreAdapter{Runs: runStore, Artifacts: artifactStore, Logs: logHub}) // Anchor tftp_root and the pxe runtime dir under artifacts.dir's // parent (typically /var/lib/vetting), not logs.dir's parent. The // production systemd unit's ReadWritePaths=/var/lib/vetting /var/log/vetting // sandbox forbids writing outside those trees, so deriving from // /var/log/vetting would land us at /var/log/{tftp,pxe} — unwritable. stateRoot := filepath.Dir(cfg.Artifacts.Dir) tftpRoot := cfg.PXE.TFTPRoot if tftpRoot == "" { tftpRoot = filepath.Join(stateRoot, "tftp") } var supervisor *pxe.Supervisor if cfg.PXE.Enabled { supervisor = pxe.NewSupervisor(pxe.SupervisorConfig{ Enabled: true, Interface: cfg.PXE.Interface, Subnet: cfg.PXE.Subnet, OrchestratorURL: cfg.PXE.OrchestratorURL, RuntimeDir: filepath.Join(stateRoot, "pxe"), TFTPRoot: tftpRoot, LiveDir: cfg.PXE.LiveDir, }) ui.PXE = supervisor } router := httpserver.NewRouter(httpserver.Deps{ UI: ui, Agent: agentAPI, LiveDir: cfg.PXE.LiveDir, AgentAssetDir: cfg.Agent.AssetDir, }) srv := &http.Server{ Addr: cfg.Server.Bind, Handler: router, ReadHeaderTimeout: 10 * time.Second, } if cfg.Server.TLS.Enabled { srv.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) rootCtx, cancelRoot := context.WithCancel(context.Background()) defer cancelRoot() dispatcher.Start(rootCtx) janitorSvc.Start(rootCtx) if err := iperfSup.Start(rootCtx); err != nil { log.Fatalf("start iperf3: %v", err) } if supervisor != nil { hosts, err := hostStore.List(rootCtx) if err != nil { log.Fatalf("list hosts for dnsmasq: %v", err) } if err := supervisor.Start(rootCtx, hosts); err != nil { log.Fatalf("start dnsmasq: %v", err) } } go func() { log.Printf("vetting listening on %s (tls=%v, db=%s)", cfg.Server.Bind, cfg.Server.TLS.Enabled, cfg.Database.Path) var err error if cfg.Server.TLS.Enabled { err = srv.ListenAndServeTLS(cfg.Server.TLS.CertFile, cfg.Server.TLS.KeyFile) } else { err = srv.ListenAndServe() } if err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("server: %v", err) } }() <-shutdown log.Printf("shutting down") dispatcher.Stop() janitorSvc.Stop() _ = iperfSup.Shutdown(3 * time.Second) if supervisor != nil { _ = supervisor.Shutdown(5 * time.Second) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Printf("server shutdown: %v", err) } _ = hub.Shutdown(ctx) }