292 lines
16 KiB
Python
292 lines
16 KiB
Python
import requests
|
|
import xml.etree.ElementTree as ET
|
|
import re
|
|
import pprint
|
|
import time
|
|
import traceback
|
|
from datetime import datetime
|
|
import psycopg2 # Import the PostgreSQL adapter
|
|
from psycopg2 import sql # For safe SQL query construction (optional but good)
|
|
|
|
# --- Try importing zoneinfo (Python 3.9+) ---
|
|
try:
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
EASTERN_TZ = ZoneInfo("America/New_York")
|
|
print("Using 'zoneinfo' for timezone handling.")
|
|
except ImportError:
|
|
print("Warning: 'zoneinfo' module not found (requires Python 3.9+). Timezones will be naive.")
|
|
EASTERN_TZ = None
|
|
except ZoneInfoNotFoundError:
|
|
print("Warning: Timezone 'America/New_York' not found in system database.")
|
|
EASTERN_TZ = None
|
|
|
|
# --- Database Connection Parameters ---
|
|
# !! IMPORTANT: Replace with your actual database credentials !!
|
|
# Consider using environment variables or a config file for security.
|
|
DB_PARAMS = {
|
|
"database": "nws",
|
|
"user": "nws",
|
|
"password": "nws",
|
|
"host": "localhost", # e.g., 'localhost' or an IP address
|
|
"port": "5432" # e.g., '5432' (default PostgreSQL port)
|
|
}
|
|
|
|
# --- Function to parse KML (no changes needed from previous version) ---
|
|
def parse_kml_incidents(kml_string):
|
|
# ... (Keep the previous version of this function that creates aware datetimes) ...
|
|
# (Exact code from your last provided block)
|
|
incidents = []
|
|
if not kml_string: return incidents
|
|
try:
|
|
namespaces = {'kml': 'https://www.opengis.net/kml/2.2'}
|
|
root = ET.fromstring(kml_string)
|
|
doc_element = root.find('kml:Document', namespaces=namespaces)
|
|
if doc_element is None: return incidents
|
|
placemarks_found = doc_element.findall('.//kml:Placemark', namespaces=namespaces)
|
|
if not placemarks_found: return incidents
|
|
for placemark in placemarks_found:
|
|
incident_data = {}
|
|
aware_datetime = None
|
|
last_updated_string = None
|
|
incident_data['id'] = placemark.get('id')
|
|
name_element = placemark.find('kml:name', namespaces=namespaces)
|
|
incident_data['name'] = name_element.text.strip() if name_element is not None and name_element.text else None
|
|
description_element = placemark.find('kml:description', namespaces=namespaces)
|
|
final_description = ""
|
|
if description_element is not None and description_element.text:
|
|
raw_description = description_element.text.strip()
|
|
match = re.search(r"Last Updated:\s*(.*)", raw_description, re.IGNORECASE)
|
|
if match: last_updated_string = match.group(1).strip()
|
|
text_without_html = re.sub(r"<[^>]+>", "", raw_description)
|
|
description_cleaned = re.sub(r"\s*Last Updated:.*$", "", text_without_html, flags=re.IGNORECASE).strip()
|
|
final_description = re.sub(r'\s{2,}', ' ', description_cleaned)
|
|
# Ensure description is None if empty, important for constraint check if NULLs matter
|
|
incident_data['description'] = final_description if final_description else None
|
|
if last_updated_string:
|
|
try:
|
|
date_format = "%b %d, %Y %I:%M %p"
|
|
naive_datetime = datetime.strptime(last_updated_string, date_format)
|
|
if EASTERN_TZ: aware_datetime = naive_datetime.replace(tzinfo=EASTERN_TZ)
|
|
else: aware_datetime = naive_datetime
|
|
except (ValueError, TypeError) as dt_error: aware_datetime = None
|
|
incident_data['last_updated'] = aware_datetime
|
|
point_element = placemark.find('.//kml:Point', namespaces=namespaces)
|
|
coords_text = None
|
|
if point_element is not None:
|
|
coords_element = point_element.find('kml:coordinates', namespaces=namespaces)
|
|
if coords_element is not None and coords_element.text: coords_text = coords_element.text.strip()
|
|
if coords_text:
|
|
try:
|
|
parts = coords_text.split(',')
|
|
if len(parts) >= 2:
|
|
incident_data['longitude'] = float(parts[0])
|
|
incident_data['latitude'] = float(parts[1])
|
|
else: incident_data['longitude'], incident_data['latitude'] = None, None
|
|
except (ValueError, IndexError): incident_data['longitude'], incident_data['latitude'] = None, None
|
|
else: incident_data['longitude'], incident_data['latitude'] = None, None
|
|
incidents.append(incident_data)
|
|
except ET.ParseError as e: print(f"XML PARSE ERROR: {e}")
|
|
except Exception as e: print(f"UNEXPECTED PARSING ERROR: {e}"); traceback.print_exc()
|
|
return incidents
|
|
|
|
|
|
# --- Function to fetch KML data (no changes needed) ---
|
|
def fetch_kml_data(url, params, headers=None, timeout=20):
|
|
# ... (Keep the previous version of this function) ...
|
|
# (Exact code from your last provided block)
|
|
try:
|
|
response = requests.get(url, params=params, headers=headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
response.encoding = response.apparent_encoding
|
|
return response.text
|
|
except requests.exceptions.Timeout: print(f"Error: Request timed out after {timeout} seconds."); return None
|
|
except requests.exceptions.HTTPError as e: print(f"Error: HTTP Error: {e}"); return None
|
|
except requests.exceptions.RequestException as e: print(f"Error: Failed to fetch data: {e}"); return None
|
|
|
|
|
|
# --- Function to Upsert data into PostgreSQL ---
|
|
#vvvvvvvvvvvv ONLY CHANGE IS WITHIN THIS FUNCTION vvvvvvvvvvvvvvv
|
|
def upsert_incident(conn, incident):
|
|
"""
|
|
Inserts a new incident record or updates an existing one based on the
|
|
unique constraint on (geom, initial_description).
|
|
|
|
Args:
|
|
conn: An active psycopg2 database connection object.
|
|
incident (dict): A dictionary containing the parsed incident data.
|
|
Expected keys: 'id', 'name', 'description', 'last_updated',
|
|
'latitude', 'longitude'.
|
|
"""
|
|
# Prevent attempt if required constraint fields are missing
|
|
if incident.get('latitude') is None or \
|
|
incident.get('longitude') is None or \
|
|
incident.get('description') is None:
|
|
print(f"Skipping upsert for incident ID {incident.get('id', 'N/A')} due to missing lat/lon or description.")
|
|
return # Don't attempt the SQL
|
|
|
|
# SQL statement using INSERT ON CONFLICT targeting (geom, initial_description)
|
|
upsert_sql = """
|
|
INSERT INTO wv511 (
|
|
id, name, initial_description, latest_description,
|
|
last_updated, geom, last_seen_in_feed
|
|
)
|
|
VALUES (
|
|
%(id)s, %(name)s, %(description)s, %(description)s, -- Use 'description' for both on initial insert
|
|
%(last_updated)s,
|
|
-- Create geometry safely, checking for lat/lon presence again just in case
|
|
CASE WHEN %(latitude)s IS NOT NULL AND %(longitude)s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%(longitude)s, %(latitude)s), 4326)
|
|
ELSE NULL
|
|
END,
|
|
-- Use timezone-aware timestamp if possible, otherwise NOW()
|
|
NOW() AT TIME ZONE 'UTC'
|
|
)
|
|
ON CONFLICT (geom, initial_description) -- <<< CHANGED CONFLICT TARGET
|
|
DO UPDATE SET
|
|
-- Update fields of the EXISTING row that matched on (geom, initial_description)
|
|
name = EXCLUDED.name, -- Update name from the current feed item
|
|
latest_description = EXCLUDED.latest_description, -- Update ONLY the latest_description column
|
|
last_updated = EXCLUDED.last_updated, -- Update the timestamp from the feed
|
|
last_seen_in_feed = NOW() AT TIME ZONE 'UTC', -- Always update when seen again
|
|
-- Decide how to handle ID: update it from feed or keep original? Let's update from feed.
|
|
id = EXCLUDED.id,
|
|
-- No need to update initial_description (it's part of the conflict key)
|
|
-- geom can be updated using EXCLUDED.geom if needed, but it should be the same geometry that caused the conflict
|
|
geom = EXCLUDED.geom -- Ensure geom is updated if needed (e.g., internal representation changes)
|
|
WHERE
|
|
-- Optional but recommended: Ensure the update applies to the exact row causing the conflict.
|
|
-- Handles NULLs correctly if they were allowed in the constraint columns.
|
|
wv511.geom IS NOT DISTINCT FROM EXCLUDED.geom
|
|
AND wv511.initial_description IS NOT DISTINCT FROM EXCLUDED.initial_description;
|
|
|
|
""" # <<< END OF SQL CHANGES
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# Use dictionary-based parameter substitution for clarity
|
|
cur.execute(upsert_sql, incident) # Pass the incident dict directly
|
|
# No explicit commit needed here if conn manages transactions (e.g., autocommit=False)
|
|
# Or if called within a larger transaction block in main.
|
|
# For simplicity here, assume commit happens outside or connection is autocommit.
|
|
|
|
# Keep original minimal error handling as requested
|
|
except psycopg2.Error as e:
|
|
print(f"Database Error during upsert for ID {incident.get('id')}: {e}")
|
|
# Optionally re-raise the exception or handle rollback if needed:
|
|
# conn.rollback() # Rollback should be handled in the main loop's exception block
|
|
raise e # Re-raise to allow main loop to handle rollback
|
|
except Exception as e:
|
|
print(f"Unexpected Python error during upsert for ID {incident.get('id')}: {e}")
|
|
# Optionally rollback and re-raise
|
|
# conn.rollback() # Rollback should be handled in the main loop's exception block
|
|
raise e # Re-raise to allow main loop to handle rollback
|
|
|
|
#^^^^^^^^^^^^^ ONLY CHANGE IS WITHIN THIS FUNCTION ^^^^^^^^^^^^^^^
|
|
|
|
# --- Main execution ---
|
|
# (Exact code from your last provided block)
|
|
if __name__ == "__main__":
|
|
base_url = "http://wv511.ilchost.com/wsvc/gmap.asmx/buildEventsKMLi_Filtered"
|
|
current_millis = int(time.time() * 1000)
|
|
request_params = {'CategoryIDs': '', 'SeverityIDs': '1,2,3,4', 'is511Only': '', 'nocache': str(current_millis)}
|
|
request_headers = { 'User-Agent': 'Mozilla/5.0', 'Accept': '*/*', 'Connection': 'keep-alive' }
|
|
|
|
kml_content = fetch_kml_data(base_url, request_params, headers=request_headers)
|
|
|
|
if kml_content:
|
|
extracted_incidents = parse_kml_incidents(kml_content)
|
|
if extracted_incidents:
|
|
print(f"Successfully parsed {len(extracted_incidents)} incidents.")
|
|
|
|
conn = None # Initialize connection variable
|
|
try:
|
|
# Establish database connection
|
|
print(f"Connecting to database '{DB_PARAMS['database']}' on host '{DB_PARAMS['host']}'...")
|
|
conn = psycopg2.connect(**DB_PARAMS)
|
|
conn.autocommit = False # Manage transactions manually
|
|
print("Database connection successful. Autocommit is OFF.")
|
|
|
|
print(f"Upserting {len(extracted_incidents)} incidents into database...")
|
|
upsert_count = 0
|
|
error_count = 0 # Track errors for transaction control
|
|
|
|
for incident_data in extracted_incidents:
|
|
# Ensure other required keys exist for substitution, even if None
|
|
incident_data.setdefault('latitude', None)
|
|
incident_data.setdefault('longitude', None)
|
|
# Description is checked inside upsert_incident now
|
|
incident_data.setdefault('last_updated', None)
|
|
incident_data.setdefault('name', None)
|
|
incident_data.setdefault('id', None) # Ensure ID key exists
|
|
|
|
try:
|
|
upsert_incident(conn, incident_data)
|
|
# Increment count only if upsert_incident didn't skip and didn't raise error
|
|
# (We rely on error raising for failure count)
|
|
# Check needed because upsert_incident might just return if data missing
|
|
if incident_data.get('latitude') is not None and \
|
|
incident_data.get('longitude') is not None and \
|
|
incident_data.get('description') is not None:
|
|
upsert_count += 1
|
|
|
|
except Exception as e: # Catch errors raised from upsert_incident
|
|
error_count += 1
|
|
print(f"Error during processing incident ID {incident_data.get('id')}, continuing...")
|
|
# The specific error is printed inside upsert_incident
|
|
|
|
|
|
# Transaction Control based on errors
|
|
if error_count > 0:
|
|
print(f"Rolling back transaction due to {error_count} errors.")
|
|
conn.rollback()
|
|
else:
|
|
print(f"Committing transaction for {upsert_count} successful/attempted upserts.")
|
|
conn.commit() # Commit the transaction after processing all incidents IF no errors occurred
|
|
|
|
|
|
except psycopg2.Error as e:
|
|
print(f"Database connection or transaction error: {e}")
|
|
if conn:
|
|
conn.rollback() # Rollback on error
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred during DB operations: {e}")
|
|
traceback.print_exc() # Print full trace for unexpected errors
|
|
if conn:
|
|
conn.rollback()
|
|
finally:
|
|
# Post-update logic from original code (runs regardless of commit/rollback outcome of main transaction)
|
|
# Consider moving this inside the try block before the final 'if conn:'
|
|
# if you only want it to run after a successful commit. Keeping original placement for now.
|
|
if conn: # Check if connection exists before using cursor
|
|
try:
|
|
# Use a 'with' block for cursor safety if preferred, but stick to original pattern
|
|
cursor = conn.cursor()
|
|
updates = ['UPDATE public.wv511 SET county = county.countyname from public.county WHERE ST_Contains(county.geom,wv511.geom) AND public.wv511.county IS NULL', # Added efficiency check
|
|
'UPDATE public.wv511 SET cwa = fzone.cwa from public.fzone WHERE ST_Contains(fzone.geom,wv511.geom) AND public.wv511.cwa IS NULL', # Added efficiency check
|
|
'UPDATE public.wv511 SET st = county.state from public.county WHERE ST_Contains(county.geom,wv511.geom) AND public.wv511.st IS NULL'] # Added efficiency check
|
|
print("Running post-upsert spatial updates...")
|
|
total_affected = 0
|
|
for i, update_sql in enumerate(updates):
|
|
print(f"Executing post-update {i+1}...")
|
|
cursor.execute(update_sql)
|
|
print(f" -> Affected {cursor.rowcount} rows.")
|
|
total_affected += cursor.rowcount
|
|
if total_affected > 0:
|
|
conn.commit() # Commit these updates separately
|
|
print("Post-updates committed.")
|
|
else:
|
|
print("No rows affected by post-updates.")
|
|
cursor.close() # Close cursor explicitly if not using 'with'
|
|
except psycopg2.Error as post_update_e:
|
|
print(f"Error during post-update operations: {post_update_e}")
|
|
conn.rollback() # Rollback post-updates if they fail
|
|
finally:
|
|
if conn:
|
|
conn.close() # Ensure connection is always closed
|
|
print("Database connection closed.")
|
|
else:
|
|
print("Database connection was not established, skipping post-updates and close.")
|
|
|
|
|
|
else: print("\nParsing completed, but no incidents were extracted.")
|
|
else: print("\nFailed to fetch KML data.") |