Files
Vetting/internal/store/measurements.go
T
josh 9bb4b09a04
CI / Lint + build + test (push) Has been cancelled
Initial commit: full Phases 1-6 implementation
Post-repair hardware validation pipeline for Proxmox cluster hosts.
Go orchestrator + in-image agent + mkosi live image + bundled dnsmasq
PXE + SQLite + HTMX/SSE UI + notify registry + janitor + full docs.
2026-04-17 21:32:10 -04:00

86 lines
2.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"context"
"database/sql"
"fmt"
"time"
"vetting/internal/model"
)
// Measurements persists timestamped numeric samples: temps, fan speeds,
// PSU voltages, fio IOPS, iperf throughput, SMART attributes. The schema
// stores (kind, key, value, unit) so Phase 5 reports can group freely
// without new tables per source.
type Measurements struct {
DB *sql.DB
}
func (m *Measurements) Create(ctx context.Context, in model.Measurement) (int64, error) {
if in.TS.IsZero() {
in.TS = time.Now().UTC()
}
res, err := m.DB.ExecContext(ctx, `
INSERT INTO measurements(run_id, stage_id, ts, kind, key, value, unit)
VALUES(?,?,?,?,?,?,?)
`, in.RunID, nullInt64(in.StageID), in.TS, in.Kind, in.Key, in.Value, in.Unit)
if err != nil {
return 0, fmt.Errorf("insert measurement: %w", err)
}
return res.LastInsertId()
}
// CreateBatch inserts a batch in one transaction. The sensor endpoint
// hands us ~520 samples per tick; a single commit keeps SQLite happy.
func (m *Measurements) CreateBatch(ctx context.Context, rows []model.Measurement) error {
if len(rows) == 0 {
return nil
}
tx, err := m.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
now := time.Now().UTC()
for _, r := range rows {
if r.TS.IsZero() {
r.TS = now
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO measurements(run_id, stage_id, ts, kind, key, value, unit)
VALUES(?,?,?,?,?,?,?)
`, r.RunID, nullInt64(r.StageID), r.TS, r.Kind, r.Key, r.Value, r.Unit); err != nil {
return fmt.Errorf("insert measurement: %w", err)
}
}
return tx.Commit()
}
// ListForRun returns all measurements for a run. Callers filter by kind
// in memory; the row count is small per run (≈thousands).
func (m *Measurements) ListForRun(ctx context.Context, runID int64) ([]model.Measurement, error) {
rows, err := m.DB.QueryContext(ctx, `
SELECT id, run_id, stage_id, ts, kind, key, value, COALESCE(unit,'')
FROM measurements WHERE run_id = ? ORDER BY ts, id
`, runID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []model.Measurement
for rows.Next() {
var meas model.Measurement
var stageID sql.NullInt64
if err := rows.Scan(&meas.ID, &meas.RunID, &stageID, &meas.TS, &meas.Kind, &meas.Key, &meas.Value, &meas.Unit); err != nil {
return nil, err
}
if stageID.Valid {
v := stageID.Int64
meas.StageID = &v
}
out = append(out, meas)
}
return out, rows.Err()
}