import json import psycopg2 from datetime import datetime, timezone import re from typing import List, Tuple # Use context manager for connection def get_db_connection(): return psycopg2.connect( host='localhost', database='nws', user='nws', password='nws' ) def decode_vtec(pvtec_string: str) -> List[str]: pattern = r"/([A-Z]+)\.([A-Z]+)\.([A-Z]+)\.([A-Z]+)\.([A-Z])\.([0-9]+)\.([0-9A-Z]+)T([0-9A-Z]+)-([0-9A-Z]+)T([0-9A-Z]+)/" match = re.match(pattern, pvtec_string) return ( [match.group(1), match.group(2), match.group(4), match.group(5), match.group(6)] if match else (print("P-VTEC string format not recognized.") or []) ) def run_warning_checks(): with get_db_connection() as conn: with conn.cursor() as cursor: current_time = datetime.utcnow() # Batch process EXP/CAN warnings cursor.execute(""" SELECT issue, endtime, etin, pil, canexp, warnexpired, svstype, ugc, vtectext, warntype, year, office, sig, index FROM warntracker WHERE svstype IN ('EXP', 'CAN') AND workedon IS NOT TRUE """) expired_can_warnings = cursor.fetchall() for warning in expired_can_warnings: issue, endtime, etin, _, _, _, _, ugc, vtectext, warntype, year, office, _, index = warning # Combine related updates into single query cursor.execute(""" UPDATE warntracker SET canxugc = array( SELECT unnest(canxugc) UNION SELECT unnest(%s) ), warnexpired = CASE WHEN %s < %s THEN TRUE ELSE warnexpired END, workedon = TRUE WHERE etin = %s AND svstype = 'NEW' AND year = %s AND warntype = %s AND office = %s """, (ugc, endtime, current_time, etin, year, warntype, office)) # Batch process CON warnings cursor.execute(""" SELECT issue, endtime, etin, pil, canexp, warnexpired, svstype, ugc, vtectext, warntype, year, workedon, office, index FROM warntracker WHERE svstype = 'CON' AND current_timestamp > issue AND (EXTRACT(EPOCH FROM (current_timestamp - endtime))/60) < 120 AND canexp IS NOT TRUE AND workedon IS NOT TRUE """) continuing_warnings = cursor.fetchall() for warning in continuing_warnings: issue, endtime, etin, _, _, _, _, ugc, _, warntype, _, _, office, index = warning # Combine updates into fewer queries cursor.execute(""" UPDATE warntracker SET followup = TRUE, followups = COALESCE(followups, 0) + 1, workedon = TRUE WHERE etin = %s AND svstype = 'NEW' AND warntype = %s AND office = %s """, (etin, warntype, office)) cursor.execute(""" UPDATE warntracker SET canexp = TRUE, warnexpired = CASE WHEN %s < %s THEN TRUE ELSE warnexpired END, workedon = TRUE WHERE canxugc @> %s AND etin = %s """, (endtime, current_time, ugc, etin)) # Final cleanup in one query cursor.execute("UPDATE warntracker SET canexp = TRUE WHERE canxugc @> ugc") conn.commit() if __name__ == "__main__": svrinfo = [] # Consider removing if not used run_warning_checks()