from flask import Flask, render_template import requests from datetime import datetime, timedelta from waitress import serve import sqlite3 import threading import time import schedule import json app = Flask(__name__) # Configuration scoreboard_data = None # Data Retrieval def get_nhle_scoreboard(): now = datetime.now() start_time_evening = now.replace(hour=23, minute=0, second=0, microsecond=0) # 7:00 PM EST end_time_evening = now.replace(hour=8, minute=0, second=0, microsecond=0) # 3:00 AM EST if now >= start_time_evening or now < end_time_evening: # Use now URL nhle_api_url = "https://api-web.nhle.com/v1/score/now" else: # Use current data URL nhle_api_url = f"https://api-web.nhle.com/v1/score/{now.strftime('%Y-%m-%d')}" response = requests.get(nhle_api_url) if response.status_code == 200: return response.json() else: print("Error:", response.status_code) # Store scoreboard data locally def store_scoreboard_data(): global scoreboard_data scoreboard_data = get_nhle_scoreboard() # Schedule the task to run every 10 seconds def schedule_task(): schedule.every(10).seconds.do(store_scoreboard_data) while True: schedule.run_pending() time.sleep(1) # Data Processing def extract_game_info(): global scoreboard_data if scoreboard_data: extracted_info = [] for game in scoreboard_data.get("games", []): home_team = game["homeTeam"]["name"]["default"] away_team = game["awayTeam"]["name"]["default"] home_logo = game["homeTeam"]["logo"] away_logo = game["awayTeam"]["logo"] game_state = convert_game_state(game["gameState"]) period, time_remaining, time_running, is_intermission = process_time_and_period(game_state, game) home_score, away_score, home_shots, away_shots = process_scores_and_shots(game_state, game) start_time_str, home_record, away_record = process_start_time_and_records(game_state, game) game_priority = calculate_game_priority(game) # Get power play information home_power_play = get_power_play_info(game, home_team) away_power_play = get_power_play_info(game, away_team) # Get game outcome last_period_type = get_game_outcome(game_state, game) extracted_info.append({ "Home Team": home_team, "Home Score": home_score, "Away Team": away_team, "Away Score": away_score, "Home Logo": home_logo, "Away Logo": away_logo, "Game State": game_state, "Period": period, "Time Remaining": time_remaining, "Time Running": time_running, "Intermission": is_intermission, "Priority": game_priority, "Start Time": start_time_str, "Home Record": home_record, "Away Record": away_record, "Home Shots": home_shots, "Away Shots": away_shots, "Home Power Play": home_power_play, "Away Power Play": away_power_play, "Last Period Type": last_period_type }) # Sort games based on priority sorted_info = sorted(extracted_info, key=lambda x: x["Priority"], reverse=True) return sorted_info def convert_game_state(game_state): if game_state == "OFF": return "FINAL" elif game_state == "CRIT": return "LIVE" elif game_state == "FUT": return "PRE" else: return game_state def process_time_and_period(game_state, game): if game_state in ["PRE", "FUT"]: return 0, "20:00", False, False elif game_state in ["FINAL", "OFF"]: return "N/A", "00:00", False, False else: period = game["periodDescriptor"]["number"] time_remaining = game["clock"]["timeRemaining"] if time_remaining == "00:00": time_remaining = "END" time_running = game["clock"]["running"] is_intermission = game["clock"]["inIntermission"] return period, time_remaining, time_running, is_intermission def process_scores_and_shots(game_state, game): if game_state in ["PRE", "FUT"]: return 0, 0, 0, 0 else: return game["homeTeam"]["score"], game["awayTeam"]["score"], game["homeTeam"]["sog"], game["awayTeam"]["sog"] def process_start_time_and_records(game_state, game): if game_state in ["PRE", "FUT"]: start_time_utc = game["startTimeUTC"] start_time_str = utc_to_est_time(start_time_utc) home_record = game["homeTeam"]["record"] away_record = game["awayTeam"]["record"] game_state = "PRE" else: start_time_str = "N/A" home_record = "N/A" away_record = "N/A" return start_time_str, home_record, away_record def get_power_play_info(game, team_name): if "situation" in game and "situationDescriptions" in game["situation"]: for situation in game["situation"]["situationDescriptions"]: if situation == "PP" and game["awayTeam"]["name"]["default"] == team_name: return f"PP {game['situation']['timeRemaining']}" elif situation == "PP" and game["homeTeam"]["name"]["default"] == team_name: return f"PP {game['situation']['timeRemaining']}" return "" # Return empty string if team is not on power play def get_game_outcome(game_state, game): if game_state == "FINAL": last_period_type = game["gameOutcome"]["lastPeriodType"] else: last_period_type = "N/A" return last_period_type def calculate_game_priority(game): if game["gameState"] in ["FINAL", "OFF", "PRE", "FUT"] or game["clock"]["inIntermission"]: return 0 else: # Get standings information from the database for home and away teams home_team_standings = get_team_standings(game["homeTeam"]["name"]["default"]) away_team_standings = get_team_standings(game["awayTeam"]["name"]["default"]) # Calculate total values of leagueSequence + leagueL10Sequence for each team home_team_total = home_team_standings["league_sequence"] + home_team_standings["league_l10_sequence"] away_team_total = away_team_standings["league_sequence"] + away_team_standings["league_l10_sequence"] # Calculate the priority adjustment factor by subtracting away team's total from home team's total matchup_adjustment = home_team_total + away_team_total period = game.get("periodDescriptor", {}).get("number", 0) time_remaining = game.get("clock", {}).get("secondsRemaining", 0) home_score = game["homeTeam"]["score"] away_score = game["awayTeam"]["score"] score_difference = abs(home_score - away_score) score_total = (home_score + away_score) * 20 if period == 4: base_priority = 400 elif period == 3: base_priority = 300 elif period == 2: base_priority = 200 else: base_priority = 100 if score_difference > 3: base_priority -= 500 elif score_difference > 2: base_priority -= 350 elif score_difference > 1: base_priority -= 100 if score_difference == 0 and period == 3 and time_remaining <= 600: base_priority += 100 time_priority = (1200 - time_remaining) / 20 # Add the priority adjustment factor to the base priority return int(base_priority + time_priority - matchup_adjustment + score_total) def get_team_standings(team_name): conn = sqlite3.connect("nhl_standings.db") cursor = conn.cursor() cursor.execute(""" SELECT league_sequence, league_l10_sequence FROM standings WHERE team_common_name = ? """, (team_name,)) result = cursor.fetchone() conn.close() if result: return {"league_sequence": result[0], "league_l10_sequence": result[1]} else: return {"league_sequence": 0, "league_l10_sequence": 0} def utc_to_est_time(utc_time): utc_datetime = datetime.strptime(utc_time, "%Y-%m-%dT%H:%M:%SZ") est_offset = timedelta(hours=-5) est_datetime = utc_datetime + est_offset est_time_str = est_datetime.strftime("%I:%M %p") return est_time_str # Routes @app.route('/') def index(): game_info = extract_game_info() if game_info: live_games_exist = any(game["Game State"] == "LIVE" for game in game_info) pre_games_exist = any(game["Game State"] == "PRE" for game in game_info) final_games_exist = any(game["Game State"] == "FINAL" for game in game_info) return render_template('index.html', games=game_info, live_games_exist=live_games_exist, pre_games_exist=pre_games_exist, final_games_exist=final_games_exist) else: print("Failed to retrieve scoreboard data") return "Failed to retrieve scoreboard data" if __name__ == '__main__': store_scoreboard_data() threading.Thread(target=schedule_task).start() serve(app, host="0.0.0.0", port=2897)