Files
NHL-Scoreboard/tests/test_playoff_cache.py
T
josh a88e2edef0
CI / Lint (push) Successful in 5s
CI / Test (push) Successful in 9s
CI / Build & Push (push) Successful in 19s
fix: anchor Day N to each round's first game instead of lazy first sighting
The banner read "Day 1 of ~60" on day 2 of the playoffs because the old
anchor recorded whatever date we first polled a playoff game as Day 1.
Now round start dates come from /v1/schedule/playoff-series, so Day N
is authoritative and resets at each round boundary. Drops the noisy
"of ~60" denominator.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 13:03:08 -04:00

276 lines
9.4 KiB
Python

import time
from datetime import datetime
from zoneinfo import ZoneInfo
import pytest
from app import playoff_cache
EASTERN = ZoneInfo("America/New_York")
@pytest.fixture
def tmp_db(tmp_path, monkeypatch):
db_path = tmp_path / "playoff_cache.db"
monkeypatch.setattr(playoff_cache, "DB_PATH", str(db_path))
return str(db_path)
class _Resp:
def __init__(self, payload, status=200):
self._payload = payload
self.status_code = status
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
import requests
raise requests.HTTPError(f"HTTP {self.status_code}")
class TestParseSeriesId:
def test_valid(self):
assert playoff_cache.parse_series_id("2026-A") == ("20252026", "A")
def test_lowercase_rejected(self):
assert playoff_cache.parse_series_id("2026-a") is None
def test_invalid_letter(self):
assert playoff_cache.parse_series_id("2026-Q") is None
def test_malformed(self):
assert playoff_cache.parse_series_id("abc") is None
def test_none(self):
assert playoff_cache.parse_series_id(None) is None
class TestBracket:
def test_refresh_success_stores_payload(self, tmp_db, monkeypatch):
payload = {"series": [{"seriesLetter": "A"}], "year": 2026}
monkeypatch.setattr(
"app.playoff_cache.requests.get",
lambda *a, **kw: _Resp(payload),
)
result = playoff_cache.refresh_bracket(2026)
assert result == payload
cached, fetched = playoff_cache.get_bracket(2026)
assert cached == payload
assert fetched is not None
def test_refresh_failure_returns_none(self, tmp_db, monkeypatch):
import requests
def raiser(*a, **kw):
raise requests.ConnectionError("boom")
monkeypatch.setattr("app.playoff_cache.requests.get", raiser)
assert playoff_cache.refresh_bracket(2026) is None
def test_get_bracket_empty(self, tmp_db):
payload, fetched = playoff_cache.get_bracket(2026)
assert payload is None and fetched is None
class TestFetchSeries:
def test_success_stores_and_returns(self, tmp_db, monkeypatch):
payload = {"seriesLetter": "A", "games": []}
monkeypatch.setattr(
"app.playoff_cache.requests.get",
lambda *a, **kw: _Resp(payload),
)
result = playoff_cache.fetch_series("2026-A")
assert result == payload
def test_invalid_id_returns_none(self, tmp_db):
assert playoff_cache.fetch_series("garbage") is None
def test_cache_hit_skips_network(self, tmp_db, monkeypatch):
payload_cached = {"from": "cache"}
playoff_cache._put(playoff_cache.series_key("20252026", "A"), payload_cached)
def should_not_be_called(*a, **kw):
raise AssertionError("network should not be called within TTL")
monkeypatch.setattr("app.playoff_cache.requests.get", should_not_be_called)
assert playoff_cache.fetch_series("2026-A") == payload_cached
def test_stale_cache_falls_back_on_failure(self, tmp_db, monkeypatch):
import requests
key = playoff_cache.series_key("20252026", "A")
playoff_cache._put(key, {"from": "stale"})
# Force the cached row to look older than the TTL but within MAX_STALE
with playoff_cache._connect() as conn:
old_ts = int(time.time()) - (playoff_cache.SERIES_TTL + 60)
conn.execute(
"UPDATE playoff_cache SET fetched_at = ? WHERE cache_key = ?",
(old_ts, key),
)
conn.commit()
def raiser(*a, **kw):
raise requests.ConnectionError("network gone")
monkeypatch.setattr("app.playoff_cache.requests.get", raiser)
assert playoff_cache.fetch_series("2026-A") == {"from": "stale"}
def test_ancient_stale_cache_returns_none(self, tmp_db, monkeypatch):
import requests
key = playoff_cache.series_key("20252026", "A")
playoff_cache._put(key, {"from": "ancient"})
with playoff_cache._connect() as conn:
ancient_ts = int(time.time()) - (playoff_cache.MAX_STALE_SECONDS + 60)
conn.execute(
"UPDATE playoff_cache SET fetched_at = ? WHERE cache_key = ?",
(ancient_ts, key),
)
conn.commit()
monkeypatch.setattr(
"app.playoff_cache.requests.get",
lambda *a, **kw: (_ for _ in ()).throw(requests.ConnectionError("x")),
)
assert playoff_cache.fetch_series("2026-A") is None
class TestRefreshRoundStartDates:
def test_no_bracket_returns_none(self, tmp_db):
assert playoff_cache.refresh_round_start_dates(2026) is None
def test_anchors_round_to_earliest_game(self, tmp_db, monkeypatch):
playoff_cache._put(
playoff_cache.bracket_key(2026),
{"series": [{"seriesLetter": "A", "playoffRound": 1}]},
)
series_payload = {
"games": [
{"startTimeUTC": "2026-04-19T23:00:00Z"},
{"startTimeUTC": "2026-04-18T23:00:00Z"},
{"startTimeUTC": "2026-04-21T23:00:00Z"},
]
}
monkeypatch.setattr(
"app.playoff_cache.requests.get",
lambda *a, **kw: _Resp(series_payload),
)
merged = playoff_cache.refresh_round_start_dates(2026)
assert merged == {"1": "2026-04-18"}
assert playoff_cache.get_round_start_date(1).isoformat() == "2026-04-18"
def test_merges_multiple_rounds_min_per_round(self, tmp_db, monkeypatch):
playoff_cache._put(
playoff_cache.bracket_key(2026),
{
"series": [
{"seriesLetter": "A", "playoffRound": 1},
{"seriesLetter": "B", "playoffRound": 1},
{"seriesLetter": "I", "playoffRound": 2},
]
},
)
payloads = {
"A": {"games": [{"startTimeUTC": "2026-04-19T23:00:00Z"}]},
"B": {"games": [{"startTimeUTC": "2026-04-18T23:00:00Z"}]},
"I": {"games": [{"startTimeUTC": "2026-04-29T23:00:00Z"}]},
}
def fake_get(url, *a, **kw):
letter = url.rstrip("/").rsplit("/", 1)[-1].upper()
return _Resp(payloads[letter])
monkeypatch.setattr("app.playoff_cache.requests.get", fake_get)
merged = playoff_cache.refresh_round_start_dates(2026)
assert merged == {"1": "2026-04-18", "2": "2026-04-29"}
def test_preserves_existing_rounds_on_merge(self, tmp_db, monkeypatch):
playoff_cache._put(
playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18", "2": "2026-04-29"}
)
playoff_cache._put(
playoff_cache.bracket_key(2026),
{"series": [{"seriesLetter": "M", "playoffRound": 3}]},
)
monkeypatch.setattr(
"app.playoff_cache.requests.get",
lambda *a, **kw: _Resp(
{"games": [{"startTimeUTC": "2026-05-15T23:00:00Z"}]}
),
)
merged = playoff_cache.refresh_round_start_dates(2026)
assert merged["1"] == "2026-04-18"
assert merged["2"] == "2026-04-29"
assert merged["3"] == "2026-05-15"
class TestDayNForRound:
def test_no_round_num(self, tmp_db):
assert playoff_cache.day_n_for_round(None) is None
def test_round_not_anchored(self, tmp_db):
assert playoff_cache.day_n_for_round(1) is None
def test_day_one(self, tmp_db):
playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"})
now = datetime(2026, 4, 18, 20, 0, tzinfo=EASTERN)
assert playoff_cache.day_n_for_round(1, now=now) == 1
def test_day_two(self, tmp_db):
playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"})
now = datetime(2026, 4, 19, 10, 0, tzinfo=EASTERN)
assert playoff_cache.day_n_for_round(1, now=now) == 2
def test_day_five(self, tmp_db):
playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"})
now = datetime(2026, 4, 22, 20, 0, tzinfo=EASTERN)
assert playoff_cache.day_n_for_round(1, now=now) == 5
def test_round_two_resets_to_day_one(self, tmp_db):
playoff_cache._put(
playoff_cache.ROUND_DATES_KEY,
{"1": "2026-04-18", "2": "2026-04-29"},
)
now = datetime(2026, 4, 29, 20, 0, tzinfo=EASTERN)
assert playoff_cache.day_n_for_round(2, now=now) == 1
def test_before_start_returns_none(self, tmp_db):
playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"2": "2026-04-29"})
now = datetime(2026, 4, 20, 20, 0, tzinfo=EASTERN)
assert playoff_cache.day_n_for_round(2, now=now) is None
class TestSchema:
def test_table_created_on_first_use(self, tmp_db):
# Accessing _get triggers create_cache_table
payload, fetched = playoff_cache._get("missing")
assert payload is None
conn = playoff_cache._connect()
try:
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name='playoff_cache'"
)
assert cur.fetchone() is not None
finally:
conn.close()
def test_put_upserts(self, tmp_db):
playoff_cache._put("k", {"v": 1})
playoff_cache._put("k", {"v": 2})
cached, _ = playoff_cache._get("k")
assert cached == {"v": 2}