import requests import json import psycopg2 import psycopg2.extensions from datetime import datetime, timezone, timedelta import re import time import sys # --- Configuration --- DRY_RUN = False # Set to False for actual DB writes # --- Database Connection --- DB_CONN_INFO = { 'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws' } conn = None cursor = None try: conn = psycopg2.connect(**DB_CONN_INFO) cursor = conn.cursor() print("Database connection successful.") except psycopg2.OperationalError as e: print(f"WARNING: Database connection failed: {e}. Backfill checks for existing VTEC will not work.") # exit() # --- Requests Session --- S = requests.Session() headers = { 'user-agent': 'wx.stoat.org Backfill Script v3, john.peck@noaa.gov', 'Accept': 'application/json', 'Cache-Control': 'no-cache', } # --- Configuration --- BACKFILL_DAYS = 2 WFOS_FOR_BACKFILL = ['RLX'] COW_API_BASE_URL = "https://mesonet.agron.iastate.edu/api/1/cow.json" PHENOMENA_SIG_PAIRS = [ ('SV', 'W'), ('TO', 'W'), ('FF', 'W'), ('MA', 'W'), ('SQ', 'W'), ('DS', 'W') ] # --- IEM COW API Backfill (Refactored to construct full VTEC) --- def backfill_from_cow(session, db_cursor, db_conn, days_back, wfo_list): """Queries IEM COW API, constructs the full VTEC string, and inserts if missing.""" print(f"\nStarting IEM COW API backfill simulation for the last {days_back} days...") end_time_utc = datetime.now(timezone.utc) start_time_utc = end_time_utc - timedelta(days=days_back) start_ts_str = start_time_utc.strftime('%Y-%m-%dT%H:%MZ') end_ts_str = end_time_utc.strftime('%Y-%m-%dT%H:%MZ') cow_processed = 0 cow_checked_exists = 0 cow_would_insert_svr = 0 cow_would_insert_tracker = 0 cow_skipped_no_link = 0 cow_skipped_bad_vtec_parse = 0 cow_skipped_missing_times = 0 # Counter for missing times needed for VTEC key for wfo in wfo_list: print(f" Querying COW for WFO: {wfo}") for phenom, sig in PHENOMENA_SIG_PAIRS: params = { 'wfo': wfo, 'phenomena': phenom, 'significance': sig, 'begints': start_ts_str, 'endts': end_ts_str, 'limit': 1000 } try: # ... (API request and initial JSON parsing) ... response = session.get(COW_API_BASE_URL, params=params, headers=headers, timeout=90) response.raise_for_status() data = response.json() except requests.exceptions.RequestException as e: print(f" ERROR: Failed fetch for {wfo}/{phenom}.{sig}: {e}") continue except json.JSONDecodeError as e: print(f" ERROR: Failed JSON parse for {wfo}/{phenom}.{sig}: {e}") continue if 'events' not in data or 'features' not in data['events']: continue event_features = data['events']['features'] if not event_features: continue print(f" Found {len(event_features)} {phenom}.{sig} event features for WFO {wfo}.") for feature in event_features: try: cow_processed += 1 properties = feature.get('properties') geometry = feature.get('geometry') feature_id = feature.get('id') # For logging if not properties or not geometry: print(f" DEBUG: Skipping COW feature - missing properties or geometry. ID: {feature_id}") continue # --- Extract VTEC components from Link --- link = properties.get('link') if not link or not isinstance(link, str): print(f" WARNING: Skipping COW feature - Missing or invalid 'link'. ID: {feature_id}, ProdID: {properties.get('product_id')}") cow_skipped_no_link += 1 continue try: vtec_short_form = link.strip('/').split('/')[-1] vtec_parts = vtec_short_form.split('-') if len(vtec_parts) != 7: raise ValueError("Incorrect number of VTEC parts") # Assign parsed components v_action = vtec_parts[2].upper() v_office_raw = vtec_parts[3].upper() v_phenomena = vtec_parts[4].upper() v_significance = vtec_parts[5].upper() v_etn = vtec_parts[6] v_office_k = v_office_raw # Keep KXXX format for VTEC string v_office_3letter = v_office_raw[1:] if v_office_raw.startswith('K') and len(v_office_raw) == 4 else v_office_raw # For office column except Exception as e: print(f" WARNING: Skipping COW feature - Failed VTEC parse from link '{link}': {e}. ID: {feature_id}") cow_skipped_bad_vtec_parse += 1 continue # --- Extract and Parse Timestamps --- issue_str = properties.get('issue') expire_str = properties.get('expire') if not issue_str or not expire_str: print(f" WARNING: Skipping COW feature - Missing issue or expire time needed for full VTEC key. VTEC(short): {vtec_short_form}, ID: {feature_id}") cow_skipped_missing_times += 1 continue # Cannot construct full VTEC without both times try: issuetime = datetime.fromisoformat(issue_str.replace('Z', '+00:00')) endtime = datetime.fromisoformat(expire_str.replace('Z', '+00:00')) except ValueError as e: print(f" WARNING: Could not parse timestamp for COW VTEC(short) {vtec_short_form}. Issue: '{issue_str}', Expire: '{expire_str}'. Error: {e}. ID: {feature_id}") cow_skipped_missing_times += 1 continue # --- Construct Full VTEC String for Database --- # Format: /O.ACTION.KXXX.PHENOM.SIG.ETN.YYMMDDTHHMMZ-YYMMDDTHHMMZ/ # Note: NWS VTEC uses 2-digit year (%y) try: issue_vtec_time = issuetime.strftime('%y%m%dT%H%MZ') expire_vtec_time = endtime.strftime('%y%m%dT%H%MZ') # Ensure K is present for office ID in VTEC string if not v_office_k.startswith('K') and len(v_office_k) == 3: v_office_k_formatted = 'K' + v_office_k else: v_office_k_formatted = v_office_k # Assume it's already KXXX or something else # Assemble the string # Using 'O' for the operational mode, matching the DB example full_vtec_key = f"/O.{v_action}.{v_office_k_formatted}.{v_phenomena}.{v_significance}.{v_etn}.{issue_vtec_time}-{expire_vtec_time}/" except Exception as e: print(f" ERROR: Failed to format VTEC time strings for {vtec_short_form}. Error: {e}. ID: {feature_id}") cow_skipped_missing_times += 1 # Count as time-related issue continue # --- Extract other needed properties --- product_id = properties.get('product_id') prop_wfo = properties.get('wfo') office_col_val = prop_wfo if prop_wfo else v_office_3letter # Office for the dedicated 'office' column ugc_list = properties.get('ar_ugc') prop_year = properties.get('year') # Use property year if available, else derive # --- Further Validation --- if not product_id or not office_col_val: print(f" DEBUG: Skipping COW feature {full_vtec_key} - missing product_id or office. ID: {feature_id}") continue if geometry['type'] not in ('Polygon', 'MultiPolygon'): print(f" DEBUG: Skipping COW feature {full_vtec_key} - non-polygon geometry. ID: {feature_id}") continue # --- Check if Full VTEC exists in DB --- exists_in_svr = False exists_in_tracker = False if db_cursor: try: db_cursor.execute("SELECT 1 FROM svr WHERE vtec = %s", (full_vtec_key,)) exists_in_svr = db_cursor.fetchone() is not None db_cursor.execute("SELECT 1 FROM warntracker WHERE vtectext = %s", (full_vtec_key,)) exists_in_tracker = db_cursor.fetchone() is not None cow_checked_exists += 1 except Exception as e: print(f" ERROR: DB check failed for VTEC {full_vtec_key}: {e}") continue # else: Assume missing if no DB conn/cursor # --- Skip if already exists --- is_svr_tor_type = v_phenomena in ('SV', 'TO') and v_significance == 'W' if exists_in_tracker and (not is_svr_tor_type or exists_in_svr): continue # --- Prepare final data for insertion --- year = prop_year if prop_year else issuetime.year # Prefer property year, fallback to issue time gisgeom_str = json.dumps(geometry) pil_parts = product_id.split('-') pil = pil_parts[-1] if len(pil_parts) > 1 else product_id ugc_param = list(ugc_list) if isinstance(ugc_list, (list, tuple)) else [] # --- Simulate insert into 'svr' table if missing --- if is_svr_tor_type and not exists_in_svr: svr_warntype = f"{v_phenomena}{office_col_val}" # e.g., TORRLX sql_svr = """ INSERT INTO svr (nwspoly, issue, endtime, warntype, vtec) VALUES (ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326), %s, %s, %s, %s) ON CONFLICT (issue,warntype) DO NOTHING """ # Use the constructed full_vtec_key for the 'vtec' column params_svr = (gisgeom_str, issuetime, endtime, svr_warntype, full_vtec_key) if DRY_RUN: print(f" [DRY RUN] Would execute COW svr backfill:") print(f" SQL: INSERT INTO svr ... VTEC='{full_vtec_key}'") cow_would_insert_svr += 1 else: # Actual Insert try: db_cursor.execute(sql_svr, params_svr) except Exception as e: print(f" ERROR: Insert failed for svr table, VTEC {full_vtec_key}: {e}") if db_conn: db_conn.rollback() # --- Simulate insert into 'warntracker' table if missing --- if not exists_in_tracker: # This check might now be slightly redundant if ON CONFLICT works, but keep it for logic flow sql_tracker = """ INSERT INTO warntracker ( nwspoly, issue, endtime, warntype, vtectext, etin, svstype, pil, ugc, year, office, sig ) VALUES ( ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT (issue, vtectext) DO NOTHING -- **** MODIFIED HERE **** """ params_tracker = ( gisgeom_str, issuetime, endtime, v_significance, full_vtec_key, v_etn, v_phenomena, pil, ugc_param, year, office_col_val, v_action ) if DRY_RUN: print(f" [DRY RUN] Would execute COW warntracker backfill:") print(f" SQL: INSERT INTO warntracker ... VTEC='{full_vtec_key}'") cow_would_insert_tracker += 1 else: # Actual Insert try: db_cursor.execute(sql_tracker, params_tracker) except Exception as e: print(f" ERROR: Insert failed for warntracker table, VTEC {full_vtec_key}: {e}") # print(f" DEBUG PARAMS: {params_tracker}") # Uncomment for deep debug if db_conn: db_conn.rollback() except Exception as e: # Catch unexpected errors during feature processing link_err = feature.get('properties', {}).get('link', 'N/A') print(f" ERROR: Unexpected error processing COW feature (Link: {link_err}): {e}", exc_info=True) # --- Commit after each WFO/Phenom/Sig batch (if not DRY_RUN) --- if not DRY_RUN and db_conn: try: db_conn.commit() except Exception as e: print(f"ERROR: Failed commit after {wfo}/{phenom}.{sig}: {e}") time.sleep(0.2) # --- Final Summary --- if DRY_RUN: print(f"\nCOW API backfill simulation complete. Processed Features: {cow_processed}, Checked DB: {cow_checked_exists}") print(f" Skipped (No/Bad Link): {cow_skipped_no_link}, Skipped (Bad VTEC Parse): {cow_skipped_bad_vtec_parse}, Skipped (Missing Times): {cow_skipped_missing_times}") print(f" Would Insert SVR: {cow_would_insert_svr}, Would Insert Tracker: {cow_would_insert_tracker}") else: print(f"\nCOW API backfill complete (live run). Processed Features: {cow_processed}.") print(f" Skipped (No/Bad Link): {cow_skipped_no_link}, Skipped (Bad VTEC Parse): {cow_skipped_bad_vtec_parse}, Skipped (Missing Times): {cow_skipped_missing_times}") # --- Main Execution --- if __name__ == "__main__": print(f"Script started at {datetime.now(timezone.utc)}") if DRY_RUN: print("--- RUNNING IN DRY RUN MODE - NO DATABASE CHANGES WILL BE MADE ---") # Optional: Run NWS check first if needed # check_nws_api_products(S, cursor, conn) # Run COW backfill backfill_from_cow(S, cursor, conn, days_back=BACKFILL_DAYS, wfo_list=WFOS_FOR_BACKFILL) # Close connection if cursor: cursor.close() if conn: conn.close(); print("Database connection closed.") print(f"Script finished at {datetime.now(timezone.utc)}")