import time from datetime import datetime from zoneinfo import ZoneInfo import pytest from app import playoff_cache EASTERN = ZoneInfo("America/New_York") @pytest.fixture def tmp_db(tmp_path, monkeypatch): db_path = tmp_path / "playoff_cache.db" monkeypatch.setattr(playoff_cache, "DB_PATH", str(db_path)) return str(db_path) class _Resp: def __init__(self, payload, status=200): self._payload = payload self.status_code = status def json(self): return self._payload def raise_for_status(self): if self.status_code >= 400: import requests raise requests.HTTPError(f"HTTP {self.status_code}") class TestParseSeriesId: def test_valid(self): assert playoff_cache.parse_series_id("2026-A") == ("20252026", "A") def test_lowercase_rejected(self): assert playoff_cache.parse_series_id("2026-a") is None def test_invalid_letter(self): assert playoff_cache.parse_series_id("2026-Q") is None def test_malformed(self): assert playoff_cache.parse_series_id("abc") is None def test_none(self): assert playoff_cache.parse_series_id(None) is None class TestBracket: def test_refresh_success_stores_payload(self, tmp_db, monkeypatch): payload = {"series": [{"seriesLetter": "A"}], "year": 2026} monkeypatch.setattr( "app.playoff_cache.requests.get", lambda *a, **kw: _Resp(payload), ) result = playoff_cache.refresh_bracket(2026) assert result == payload cached, fetched = playoff_cache.get_bracket(2026) assert cached == payload assert fetched is not None def test_refresh_failure_returns_none(self, tmp_db, monkeypatch): import requests def raiser(*a, **kw): raise requests.ConnectionError("boom") monkeypatch.setattr("app.playoff_cache.requests.get", raiser) assert playoff_cache.refresh_bracket(2026) is None def test_get_bracket_empty(self, tmp_db): payload, fetched = playoff_cache.get_bracket(2026) assert payload is None and fetched is None class TestFetchSeries: def test_success_stores_and_returns(self, tmp_db, monkeypatch): payload = {"seriesLetter": "A", "games": []} monkeypatch.setattr( "app.playoff_cache.requests.get", lambda *a, **kw: _Resp(payload), ) result = playoff_cache.fetch_series("2026-A") assert result == payload def test_invalid_id_returns_none(self, tmp_db): assert playoff_cache.fetch_series("garbage") is None def test_cache_hit_skips_network(self, tmp_db, monkeypatch): payload_cached = {"from": "cache"} playoff_cache._put(playoff_cache.series_key("20252026", "A"), payload_cached) def should_not_be_called(*a, **kw): raise AssertionError("network should not be called within TTL") monkeypatch.setattr("app.playoff_cache.requests.get", should_not_be_called) assert playoff_cache.fetch_series("2026-A") == payload_cached def test_stale_cache_falls_back_on_failure(self, tmp_db, monkeypatch): import requests key = playoff_cache.series_key("20252026", "A") playoff_cache._put(key, {"from": "stale"}) # Force the cached row to look older than the TTL but within MAX_STALE with playoff_cache._connect() as conn: old_ts = int(time.time()) - (playoff_cache.SERIES_TTL + 60) conn.execute( "UPDATE playoff_cache SET fetched_at = ? WHERE cache_key = ?", (old_ts, key), ) conn.commit() def raiser(*a, **kw): raise requests.ConnectionError("network gone") monkeypatch.setattr("app.playoff_cache.requests.get", raiser) assert playoff_cache.fetch_series("2026-A") == {"from": "stale"} def test_ancient_stale_cache_returns_none(self, tmp_db, monkeypatch): import requests key = playoff_cache.series_key("20252026", "A") playoff_cache._put(key, {"from": "ancient"}) with playoff_cache._connect() as conn: ancient_ts = int(time.time()) - (playoff_cache.MAX_STALE_SECONDS + 60) conn.execute( "UPDATE playoff_cache SET fetched_at = ? WHERE cache_key = ?", (ancient_ts, key), ) conn.commit() monkeypatch.setattr( "app.playoff_cache.requests.get", lambda *a, **kw: (_ for _ in ()).throw(requests.ConnectionError("x")), ) assert playoff_cache.fetch_series("2026-A") is None class TestRefreshRoundStartDates: def test_no_bracket_returns_none(self, tmp_db): assert playoff_cache.refresh_round_start_dates(2026) is None def test_anchors_round_to_earliest_game(self, tmp_db, monkeypatch): playoff_cache._put( playoff_cache.bracket_key(2026), {"series": [{"seriesLetter": "A", "playoffRound": 1}]}, ) series_payload = { "games": [ {"startTimeUTC": "2026-04-19T23:00:00Z"}, {"startTimeUTC": "2026-04-18T23:00:00Z"}, {"startTimeUTC": "2026-04-21T23:00:00Z"}, ] } monkeypatch.setattr( "app.playoff_cache.requests.get", lambda *a, **kw: _Resp(series_payload), ) merged = playoff_cache.refresh_round_start_dates(2026) assert merged == {"1": "2026-04-18"} assert playoff_cache.get_round_start_date(1).isoformat() == "2026-04-18" def test_merges_multiple_rounds_min_per_round(self, tmp_db, monkeypatch): playoff_cache._put( playoff_cache.bracket_key(2026), { "series": [ {"seriesLetter": "A", "playoffRound": 1}, {"seriesLetter": "B", "playoffRound": 1}, {"seriesLetter": "I", "playoffRound": 2}, ] }, ) payloads = { "A": {"games": [{"startTimeUTC": "2026-04-19T23:00:00Z"}]}, "B": {"games": [{"startTimeUTC": "2026-04-18T23:00:00Z"}]}, "I": {"games": [{"startTimeUTC": "2026-04-29T23:00:00Z"}]}, } def fake_get(url, *a, **kw): letter = url.rstrip("/").rsplit("/", 1)[-1].upper() return _Resp(payloads[letter]) monkeypatch.setattr("app.playoff_cache.requests.get", fake_get) merged = playoff_cache.refresh_round_start_dates(2026) assert merged == {"1": "2026-04-18", "2": "2026-04-29"} def test_preserves_existing_rounds_on_merge(self, tmp_db, monkeypatch): playoff_cache._put( playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18", "2": "2026-04-29"} ) playoff_cache._put( playoff_cache.bracket_key(2026), {"series": [{"seriesLetter": "M", "playoffRound": 3}]}, ) monkeypatch.setattr( "app.playoff_cache.requests.get", lambda *a, **kw: _Resp( {"games": [{"startTimeUTC": "2026-05-15T23:00:00Z"}]} ), ) merged = playoff_cache.refresh_round_start_dates(2026) assert merged["1"] == "2026-04-18" assert merged["2"] == "2026-04-29" assert merged["3"] == "2026-05-15" class TestDayNForRound: def test_no_round_num(self, tmp_db): assert playoff_cache.day_n_for_round(None) is None def test_round_not_anchored(self, tmp_db): assert playoff_cache.day_n_for_round(1) is None def test_day_one(self, tmp_db): playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"}) now = datetime(2026, 4, 18, 20, 0, tzinfo=EASTERN) assert playoff_cache.day_n_for_round(1, now=now) == 1 def test_day_two(self, tmp_db): playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"}) now = datetime(2026, 4, 19, 10, 0, tzinfo=EASTERN) assert playoff_cache.day_n_for_round(1, now=now) == 2 def test_day_five(self, tmp_db): playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18"}) now = datetime(2026, 4, 22, 20, 0, tzinfo=EASTERN) assert playoff_cache.day_n_for_round(1, now=now) == 5 def test_round_two_resets_to_day_one(self, tmp_db): playoff_cache._put( playoff_cache.ROUND_DATES_KEY, {"1": "2026-04-18", "2": "2026-04-29"}, ) now = datetime(2026, 4, 29, 20, 0, tzinfo=EASTERN) assert playoff_cache.day_n_for_round(2, now=now) == 1 def test_before_start_returns_none(self, tmp_db): playoff_cache._put(playoff_cache.ROUND_DATES_KEY, {"2": "2026-04-29"}) now = datetime(2026, 4, 20, 20, 0, tzinfo=EASTERN) assert playoff_cache.day_n_for_round(2, now=now) is None class TestSchema: def test_table_created_on_first_use(self, tmp_db): # Accessing _get triggers create_cache_table payload, fetched = playoff_cache._get("missing") assert payload is None conn = playoff_cache._connect() try: cur = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' " "AND name='playoff_cache'" ) assert cur.fetchone() is not None finally: conn.close() def test_put_upserts(self, tmp_db): playoff_cache._put("k", {"v": 1}) playoff_cache._put("k", {"v": 2}) cached, _ = playoff_cache._get("k") assert cached == {"v": 2}