Files
test/newpower.py
2025-12-10 01:45:38 +00:00

192 lines
7.2 KiB
Python

import os
import sys
# Add the script's directory to the Python path to ensure modules can be found
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
import requests
import json
import psycopg2
import logging
import re
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
# Import provider classes
from providers.base import BaseCountyProvider
from providers.kubra import KubraCountyProvider
from providers.simple import SimpleCountyJsonProvider
from providers.nisc import NiscCountyProvider
from providers.gwt_rpc import GwtRpcCountyProvider
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(os.path.join(SCRIPT_DIR, 'newpower_county.log')), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIG ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
CONFIG_FILE = os.path.join(SCRIPT_DIR, 'providers.json')
AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours
# --- CONFIG MANAGEMENT ---
def load_providers():
if not os.path.exists(CONFIG_FILE):
logger.error(f"{CONFIG_FILE} not found!")
return []
with open(CONFIG_FILE, 'r') as f:
# Filter for providers that have 'county_url' or 'county_meta_url'
all_providers = json.load(f)
return [p for p in all_providers if p.get('county_type')]
def save_providers(providers):
with open(CONFIG_FILE, 'w') as f:
json.dump(providers, f, indent=4, default=str)
logger.info("Configuration saved to providers.json")
def update_provider_config(provider_name, new_settings):
"""Updates a specific provider in the JSON file safely"""
# This needs to read the raw file, not the filtered one
with open(CONFIG_FILE, 'r') as f:
providers = json.load(f)
updated = False
for p in providers:
if p.get('name') == provider_name:
for key in ['headers', 'body', 'url', 'cookies', 'user_agent']:
if key in new_settings:
p[key] = new_settings[key]
p['last_auto_update'] = datetime.now(timezone.utc).isoformat()
updated = True
break
if updated:
save_providers(providers)
return True
return False
# --- DATABASE ---
class CountyPowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def upsert_and_zero_outages(self, company_name, outage_data, fetch_time):
"""
Atomically updates outage information for a given company.
1. UPSERTS counties with active outages, updating their counts.
2. SETS outage count to 0 for any other county from that company that was not in the active list.
"""
# Prepare data for counties with active outages
active_outage_values = []
reported_counties = []
for item in outage_data:
if all(k in item for k in ['county', 'state', 'company']):
val = (
item['county'], item['state'], item['company'],
item.get('outages'), item.get('served'), fetch_time
)
active_outage_values.append(val)
reported_counties.append(item['county'])
with self.conn.cursor() as cursor:
# Step 1: UPSERT active outages
if active_outage_values:
upsert_sql = """
INSERT INTO newcountyoutages (county, state, company, outages, served, fetch_time)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (county, state, company) DO UPDATE SET
outages = EXCLUDED.outages,
served = COALESCE(EXCLUDED.served, newcountyoutages.served),
fetch_time = EXCLUDED.fetch_time;
"""
cursor.executemany(upsert_sql, active_outage_values)
logger.info(f"Upserted {len(active_outage_values)} active outage records for {company_name}.")
# Step 2: Set outages to 0 for any other county from this company
# This correctly creates point-in-time zero records by updating the fetch_time.
zero_out_sql = """
UPDATE newcountyoutages
SET outages = 0, fetch_time = %s
WHERE company = %s AND county NOT IN %s;
"""
# Ensure reported_counties is not empty to avoid "IN (NULL)"
if not reported_counties:
reported_counties.append("NO_COUNTIES_REPORTED_DUMMY_VALUE")
cursor.execute(zero_out_sql, (fetch_time, company_name, tuple(reported_counties)))
logger.info(f"Zeroed out {cursor.rowcount} resolved outage records for {company_name}.")
def run_post_processing(self):
logger.info("Running post-processing for county data...")
with self.conn.cursor() as cursor:
cursor.execute("""
UPDATE newcountyoutages
SET cwa = c.cwa
FROM public.county c
WHERE c.countyname = newcountyoutages.county
AND c.state = newcountyoutages.state
AND newcountyoutages.cwa IS NULL
""")
cursor.execute("DELETE FROM newcountyoutages WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("County post-processing complete.")
# --- PROVIDERS ---
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra_county': KubraCountyProvider,
'simple_county_json': SimpleCountyJsonProvider,
'nisc_hosted_county': NiscCountyProvider,
'gwt_rpc_county': GwtRpcCountyProvider,
}
# --- MAIN ---
def main():
S = requests.Session()
S.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'})
db = CountyPowerDB(DB_CONFIG)
run_timestamp = datetime.now(timezone.utc)
logger.info("Starting County Power Scraper...")
providers = load_providers()
for config in providers:
p_type = config.get('county_type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching county data for {p_name}...")
outages = provider.fetch()
logger.info(f"Found {len(outages)} active outage records for {p_name}.")
# Process this company's data in a single transaction
db.upsert_and_zero_outages(p_name, outages, run_timestamp)
except Exception as e:
logger.error(f"Error processing {p_name}: {e}")
elif p_type:
logger.warning(f"Unknown provider type '{p_type}' for {p_name}")
db.run_post_processing()
db.close()
logger.info("County scraping complete.")
if __name__ == "__main__":
main()