good luck
Some checks failed
CI / Lint (push) Successful in 58s
CI / Test (push) Successful in 8s
CI / Build & Push (push) Failing after 1m33s

This commit is contained in:
2026-03-29 09:20:21 -04:00
parent b10736d43c
commit 3994943757
23 changed files with 579 additions and 161 deletions

View File

@@ -1,34 +1,40 @@
import requests
from datetime import datetime
import json
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
import requests
from app.config import SCOREBOARD_DATA_FILE
logger = logging.getLogger(__name__)
EASTERN = ZoneInfo("America/New_York")
SCOREBOARD_DATA_FILE = 'app/data/scoreboard_data.json'
def get_scoreboard_data():
now = datetime.now()
start_time_evening = now.replace(hour=19, minute=00, second=0, microsecond=0) # 7:00 PM EST
end_time_evening = now.replace(hour=3, minute=00, second=0, microsecond=0) # 3:00 AM EST
now = datetime.now(EASTERN)
start_time_evening = now.replace(hour=19, minute=0, second=0, microsecond=0)
end_time_morning = now.replace(hour=3, minute=0, second=0, microsecond=0)
if now >= start_time_evening or now < end_time_evening:
# Use now URL
if now >= start_time_evening or now < end_time_morning:
nhle_api_url = "https://api-web.nhle.com/v1/score/now"
else:
# Use current data URL
nhle_api_url = f"https://api-web.nhle.com/v1/score/{now.strftime('%Y-%m-%d')}"
response = requests.get(nhle_api_url)
if response.status_code == 200:
return response.json()
else:
print("Error:", response.status_code)
# Store scoreboard data locally
try:
response = requests.get(nhle_api_url, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error("Failed to fetch scoreboard data from %s: %s", nhle_api_url, e)
return None
def store_scoreboard_data():
scoreboard_data = get_scoreboard_data()
scoreboard_data = get_scoreboard_data()
if scoreboard_data:
with open(SCOREBOARD_DATA_FILE, 'w') as json_file:
with open(SCOREBOARD_DATA_FILE, "w") as json_file:
json.dump(scoreboard_data, json_file)
return scoreboard_data
else:
return None
return scoreboard_data
return None

View File

@@ -1,6 +1,12 @@
import logging
import sqlite3
from datetime import datetime, timedelta
from app.config import DB_PATH
logger = logging.getLogger(__name__)
def process_record(record):
if record == "N/A":
return "N/A"
@@ -9,6 +15,7 @@ def process_record(record):
formatted_parts = [part.zfill(2) for part in parts]
return "-".join(formatted_parts)
def extract_game_info(scoreboard_data):
if not scoreboard_data:
return []
@@ -16,36 +23,60 @@ def extract_game_info(scoreboard_data):
extracted_info = []
for game in scoreboard_data.get("games", []):
game_state = convert_game_state(game["gameState"])
extracted_info.append({
"Home Team": game["homeTeam"]["name"]["default"],
"Home Score": game["homeTeam"]["score"] if game_state != "PRE" else "N/A",
"Away Team": game["awayTeam"]["name"]["default"],
"Away Score": game["awayTeam"]["score"] if game_state != "PRE" else "N/A",
"Home Logo": game["homeTeam"]["logo"],
"Away Logo": game["awayTeam"]["logo"],
"Game State": game_state,
"Period": process_period(game),
"Time Remaining": process_time_remaining(game),
"Time Running": game["clock"]["running"] if game_state == "LIVE" else "N/A",
"Intermission": game["clock"]["inIntermission"] if game_state == "LIVE" else "N/A",
"Priority": calculate_game_priority(game),
"Start Time": process_start_time(game),
"Home Record": process_record(game["homeTeam"]["record"]) if game["gameState"] in ["PRE", "FUT"] else "N/A",
"Away Record": process_record(game["awayTeam"]["record"]) if game["gameState"] in ["PRE", "FUT"] else "N/A",
"Home Shots": game["homeTeam"]["sog"] if game["gameState"] not in ["PRE", "FUT"] else 0,
"Away Shots": game["awayTeam"]["sog"] if game["gameState"] not in ["PRE", "FUT"] else 0,
"Home Power Play": get_power_play_info(game, game["homeTeam"]["name"]["default"]),
"Away Power Play": get_power_play_info(game, game["awayTeam"]["name"]["default"]),
"Last Period Type": get_game_outcome(game, game_state)
})
extracted_info.append(
{
"Home Team": game["homeTeam"]["name"]["default"],
"Home Score": game["homeTeam"]["score"]
if game_state != "PRE"
else "N/A",
"Away Team": game["awayTeam"]["name"]["default"],
"Away Score": game["awayTeam"]["score"]
if game_state != "PRE"
else "N/A",
"Home Logo": game["homeTeam"]["logo"],
"Away Logo": game["awayTeam"]["logo"],
"Game State": game_state,
"Period": process_period(game),
"Time Remaining": process_time_remaining(game),
"Time Running": game["clock"]["running"]
if game_state == "LIVE"
else "N/A",
"Intermission": game["clock"]["inIntermission"]
if game_state == "LIVE"
else "N/A",
"Priority": calculate_game_priority(game),
"Start Time": process_start_time(game),
"Home Record": process_record(game["homeTeam"]["record"])
if game["gameState"] in ["PRE", "FUT"]
else "N/A",
"Away Record": process_record(game["awayTeam"]["record"])
if game["gameState"] in ["PRE", "FUT"]
else "N/A",
"Home Shots": game["homeTeam"]["sog"]
if game["gameState"] not in ["PRE", "FUT"]
else 0,
"Away Shots": game["awayTeam"]["sog"]
if game["gameState"] not in ["PRE", "FUT"]
else 0,
"Home Power Play": get_power_play_info(
game, game["homeTeam"]["name"]["default"]
),
"Away Power Play": get_power_play_info(
game, game["awayTeam"]["name"]["default"]
),
"Last Period Type": get_game_outcome(game, game_state),
}
)
# Sort games based on priority
return sorted(extracted_info, key=lambda x: x["Priority"], reverse=True)
def convert_game_state(game_state):
state_mapping = {"OFF": "FINAL", "CRIT": "LIVE", "FUT": "PRE"}
return state_mapping.get(game_state, game_state)
def process_period(game):
if game["gameState"] in ["PRE", "FUT"]:
return 0
@@ -54,6 +85,7 @@ def process_period(game):
else:
return game["periodDescriptor"]["number"]
def process_time_remaining(game):
if game["gameState"] in ["PRE", "FUT"]:
return "20:00"
@@ -63,17 +95,16 @@ def process_time_remaining(game):
time_remaining = game["clock"]["timeRemaining"]
return "END" if time_remaining == "00:00" else time_remaining
def process_start_time(game):
if game["gameState"] in ["PRE", "FUT"]:
utc_time = game["startTimeUTC"]
est_time = utc_to_est_time(utc_time)
# Check if the hour starts with a zero
if est_time.startswith("0"):
est_time = est_time[1:] # Drop the leading zero
return est_time
return est_time.lstrip("0")
else:
return "N/A"
def get_power_play_info(game, team_name):
if "situation" in game and "situationDescriptions" in game["situation"]:
for situation in game["situation"]["situationDescriptions"]:
@@ -83,14 +114,16 @@ def get_power_play_info(game, team_name):
return f"PP {game['situation']['timeRemaining']}"
return ""
def get_game_outcome(game, game_state):
return game["gameOutcome"]["lastPeriodType"] if game_state == "FINAL" else "N/A"
def calculate_game_priority(game):
# Return 0 if game is in certain states
if game["gameState"] in ["FINAL", "OFF", "PRE", "FUT"]:
return 0
# Get period, time remaining, scores, and other relevant data
period = game.get("periodDescriptor", {}).get("number", 0)
time_remaining = game.get("clock", {}).get("secondsRemaining", 0)
@@ -104,8 +137,14 @@ def calculate_game_priority(game):
away_team_standings = get_team_standings(game["awayTeam"]["name"]["default"])
# Calculate total values of leagueSequence + leagueL10Sequence for each team
home_total = home_team_standings["league_sequence"] + home_team_standings["league_l10_sequence"]
away_total = away_team_standings["league_sequence"] + away_team_standings["league_l10_sequence"]
home_total = (
home_team_standings["league_sequence"]
+ home_team_standings["league_l10_sequence"]
)
away_total = (
away_team_standings["league_sequence"]
+ away_team_standings["league_l10_sequence"]
)
# Calculate the matchup adjustment factor
matchup_multiplier = {5: 1, 4: 1, 3: 1.50, 2: 1.65, 1: 2}.get(period)
@@ -123,11 +162,11 @@ def calculate_game_priority(game):
score_differential_adjustment += 350
elif score_difference > 1:
score_differential_adjustment += 100
if period == 3 and time_remaining <= 300:
score_differential_adjustment = score_differential_adjustment * 2
base_priority -= score_differential_adjustment
base_priority -= score_differential_adjustment
# Adjust base priority based on certain conditions
if period == 3 and time_remaining <= 720:
@@ -142,40 +181,51 @@ def calculate_game_priority(game):
elif score_difference == 1:
base_priority += 30
# Calculate time priority
time_multiplier = {4: 2, 3: 2, 2: 1.5}.get(period, 0.75)
time_priority = ((1200 - time_remaining) / 20) * time_multiplier
print(base_priority)
print(time_priority)
print(matchup_adjustment)
print(score_total)
logger.debug(
"priority components — base: %s, time: %s, matchup: %s, score_total: %s",
base_priority,
time_priority,
matchup_adjustment,
score_total,
)
# Calculate the final priority
final_priority = int(base_priority + time_priority - matchup_adjustment + score_total)
final_priority = int(
base_priority + time_priority - matchup_adjustment + score_total
)
# Pushes the games that are in intermission to the bottom, but retains their sort
if game["clock"]["inIntermission"]:
return (-2000 - time_remaining)
return -2000 - time_remaining
return final_priority
def get_team_standings(team_name):
conn = sqlite3.connect("app/data/nhl_standings.db")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT league_sequence, league_l10_sequence
FROM standings
cursor.execute(
"""
SELECT league_sequence, league_l10_sequence
FROM standings
WHERE team_common_name = ?
""", (team_name,))
""",
(team_name,),
)
result = cursor.fetchone()
conn.close()
return {"league_sequence": result[0] if result else 0, "league_l10_sequence": result[1] if result else 0}
return {
"league_sequence": result[0] if result else 0,
"league_l10_sequence": result[1] if result else 0,
}
def utc_to_est_time(utc_time):
utc_datetime = datetime.strptime(utc_time, "%Y-%m-%dT%H:%M:%SZ")
est_offset = timedelta(hours=-4)
est_datetime = utc_datetime + est_offset
return est_datetime.strftime("%#I:%M %p")
return est_datetime.strftime("%I:%M %p")

View File

@@ -1,13 +1,18 @@
import schedule
import logging
import time
from app.scoreboard.update_nhl_standings_db import update_nhl_standings
import schedule
from app.scoreboard.get_data import store_scoreboard_data
from app.scoreboard.update_nhl_standings_db import update_nhl_standings
logger = logging.getLogger(__name__)
def schedule_tasks():
schedule.every(600).seconds.do(update_nhl_standings)
schedule.every(10).seconds.do(store_scoreboard_data)
logger.info("Background scheduler started")
while True:
schedule.run_pending()
time.sleep(1)

View File

@@ -1,6 +1,13 @@
import logging
import sqlite3
import requests
from app.config import DB_PATH
logger = logging.getLogger(__name__)
def create_standings_table(conn):
cursor = conn.cursor()
cursor.execute("""
@@ -12,54 +19,55 @@ def create_standings_table(conn):
""")
conn.commit()
def truncate_standings_table(conn):
cursor = conn.cursor()
cursor.execute("DELETE FROM standings")
conn.commit()
def insert_standings_info(conn, standings_info):
cursor = conn.cursor()
for team in standings_info:
cursor.execute("""
cursor.execute(
"""
INSERT INTO standings (team_common_name, league_sequence, league_l10_sequence)
VALUES (?, ?, ?)
""", (team["team_common_name"], team["league_sequence"], team["league_l10_sequence"]))
""",
(
team["team_common_name"],
team["league_sequence"],
team["league_l10_sequence"],
),
)
conn.commit()
def extract_standings_info():
url = "https://api-web.nhle.com/v1/standings/now"
response = requests.get(url)
if response.status_code == 200:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
standings_data = response.json()
standings_info = []
for team in standings_data.get("standings", []):
team_info = {
"team_common_name": team["teamCommonName"]["default"],
"league_sequence": team["leagueSequence"],
"league_l10_sequence": team["leagueL10Sequence"]
"league_l10_sequence": team["leagueL10Sequence"],
}
standings_info.append(team_info)
return standings_info
else:
print("Error:", response.status_code)
except requests.RequestException as e:
logger.error("Failed to fetch standings: %s", e)
return None
def update_nhl_standings():
# Connect to SQLite database
conn = sqlite3.connect("app/data/nhl_standings.db")
# Create standings table if it doesn't exist
conn = sqlite3.connect(DB_PATH)
create_standings_table(conn)
# Truncate standings table before inserting new data
truncate_standings_table(conn)
# Extract standings info
standings_info = extract_standings_info()
# Insert standings info into the database
if standings_info:
insert_standings_info(conn, standings_info)
# Close database connection
conn.close()