Files
test/newpower2.py
2025-12-10 04:35:29 +00:00

237 lines
9.6 KiB
Python

import os
import sys
# Add the script's directory to the Python path to ensure modules can be found
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
import requests
import polyline
import json
import psycopg2
import mercantile
import logging
import re
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from pyproj import Transformer
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
# Import provider classes
from providers.base import BaseProvider
from providers.kubra import KubraProvider
from providers.simple import SimpleJsonProvider
from providers.gwt_rpc import GwtRpcProvider
from providers.nisc import NiscHostedProvider
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(os.path.join(SCRIPT_DIR, 'power2_new.log')), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIG ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
CONFIG_FILE = os.path.join(SCRIPT_DIR, 'providers.json')
AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours
# --- CONFIG MANAGEMENT ---
def load_providers():
if not os.path.exists(CONFIG_FILE):
logger.error(f"{CONFIG_FILE} not found!")
return []
with open(CONFIG_FILE, 'r') as f:
# Filter for providers that have a 'type' for point-based scraping
all_providers = json.load(f)
return [p for p in all_providers if p.get('type')]
def save_providers(providers):
with open(CONFIG_FILE, 'w') as f:
json.dump(providers, f, indent=4)
logger.info("Configuration saved to providers.json")
def update_provider_config(provider_name, new_settings):
"""Updates a specific provider in the JSON file safely"""
providers = load_providers()
updated = False
for p in providers:
if p.get('name') == provider_name:
# Update all relevant fields
for key in ['headers', 'body', 'url', 'cookies', 'user_agent']:
if key in new_settings:
p[key] = new_settings[key]
p['last_auto_update'] = datetime.now(timezone.utc).isoformat()
updated = True
break
if updated:
save_providers(providers)
return True
return False
# --- HELPERS ---
def remove_external_curly_braces(s):
"""Extracts the first element if 's' is a list, otherwise returns 's'."""
if isinstance(s, list) and s:
return s[0]
# This regex handles cases like "{...}" which were seen in older data.
if isinstance(s, str) and s.startswith('{') and s.endswith('}'):
return s[1:-1]
return s
# --- DATABASE ---
class PowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def upsert_outage(self, data):
incident_id_from_data = data.get('incidentid')
utility_name = data.get('utility')
lat = data.get('lat')
lon = data.get('lon')
start_time = data.get('start') # This is expected to be a datetime object
unique_outage_key = None
# Prioritize incidentid if it seems valid and is a string
# Check for common unreliable/placeholder values
if incident_id_from_data and isinstance(incident_id_from_data, str) and \
incident_id_from_data.strip() not in ["0", "unknown", "null", "N/A", ""]:
unique_outage_key = f"{utility_name}_{incident_id_from_data.strip()}"
elif lat is not None and lon is not None and utility_name is not None:
# Fallback to a synthesized key based on location and recency
try:
# Round lat/lon to 4 decimal places (approx 11 meters precision)
rounded_lat = round(float(lat), 4)
rounded_lon = round(float(lon), 4)
# If start_time is missing, use the current hour as a fallback bucket.
# This groups recent, location-similar outages without a start time together.
if start_time:
time_bucket = start_time.strftime('%Y%m%d%H') # YYYYMMDDHH
else:
time_bucket = datetime.now(timezone.utc).strftime('%Y%m%d%H')
unique_outage_key = f"{utility_name}_{rounded_lat}_{rounded_lon}_{time_bucket}"
except (ValueError, TypeError) as e:
logger.error(f"Error synthesizing unique_outage_key from lat/lon/start_time: {e}. Data: {data}")
raise ValueError("Failed to synthesize unique_outage_key due to missing or malformed data.")
else:
logger.error(f"Insufficient data to create a unique_outage_key (missing incidentid, or lat/lon/utility). Data: {data}")
raise ValueError("Insufficient data to create a unique_outage_key.")
if unique_outage_key is None:
logger.error(f"Failed to generate a unique_outage_key for data: {data}")
raise ValueError("Unique outage key could not be generated.")
# Ensure the utility name is consistently passed in the data dictionary
if utility_name is None:
logger.warning(f"Utility name missing in outage data for incident {incident_id_from_data}. Using 'UNKNOWN'.")
utility_name = "UNKNOWN"
sql = """
INSERT INTO newpower
(incidentid, utility, lat, lon, pointgeom, areageom, start_time, etr,
outagen, peakoutage, cause, crew_status, active, last_change, fetch_time, geom, unique_outage_key)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s)
ON CONFLICT (unique_outage_key) DO UPDATE SET
outagen = EXCLUDED.outagen,
peakoutage = GREATEST(newpower.peakoutage, EXCLUDED.outagen),
cause = EXCLUDED.cause,
etr = EXCLUDED.etr,
crew_status = EXCLUDED.crew_status,
last_change = EXCLUDED.last_change,
fetch_time = EXCLUDED.fetch_time,
active = TRUE,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
pointgeom = EXCLUDED.pointgeom,
areageom = EXCLUDED.areageom,
geom = EXCLUDED.geom
"""
peak = data.get('outagen', 0)
# Clean areageom before insertion, referencing old power2.py logic
areageom = remove_external_curly_braces(data.get('areageom'))
params = (
data.get('incidentid'), utility_name, lat, lon,
data.get('pointgeom'), areageom, start_time, data.get('etr'),
data.get('outagen'), peak, data.get('cause'), data.get('crew_status'),
True, data.get('last_change', datetime.now(timezone.utc)), datetime.now(timezone.utc), # last_change, fetch_time
lon, lat, unique_outage_key # geom, unique_outage_key
)
with self.conn.cursor() as cursor:
cursor.execute(sql, params)
def run_post_processing(self):
logger.info("Running post-processing...")
with self.conn.cursor() as cursor:
cursor.execute('UPDATE newpower SET county = c.countyname FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.county IS NULL')
cursor.execute('UPDATE newpower SET state = c.state FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.state IS NULL')
cursor.execute('UPDATE newpower SET cwa = f.cwa FROM public.fzone f WHERE ST_Contains(f.geom, newpower.geom) AND newpower.cwa IS NULL')
cursor.execute('UPDATE newpower SET realareageom = ST_LineFromEncodedPolyline(areageom) WHERE areageom IS NOT NULL AND realareageom IS NULL')
cursor.execute("UPDATE newpower SET active = TRUE WHERE fetch_time > NOW() - INTERVAL '30 minutes'")
cursor.execute("UPDATE newpower SET active = FALSE WHERE fetch_time < NOW() - INTERVAL '30 minutes'")
cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("Post-processing complete.")
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra': KubraProvider,
'simple_json': SimpleJsonProvider,
'gwt_rpc': GwtRpcProvider,
'nisc_hosted': NiscHostedProvider,
}
# --- MAIN ---
# --- MAIN (Point Scraper) ---
def main():
S = requests.Session()
S.verify = False
db = PowerDB(DB_CONFIG)
logger.info("Starting Power Scraper...")
providers = load_providers()
for config in providers:
p_type = config.get('type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching {p_name}...")
outages = provider.fetch()
count = 0
for outage in outages:
if outage:
db.upsert_outage(outage)
count += 1
logger.info(f"Saved {count} records for {p_name}")
except Exception as e:
logger.error(f"Error processing {p_name}: {e}")
else:
logger.warning(f"Unknown provider type {p_type} for {p_name}")
db.run_post_processing()
db.close()
logger.info("Scraping complete.")
if __name__ == "__main__":
main()