Files
test/newpower.py
2025-12-10 05:43:47 +00:00

176 lines
6.3 KiB
Python

import os
import sys
# Add the script's directory to the Python path to ensure modules can be found
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
import requests
import json
import psycopg2
import logging
import re
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
# Import provider classes
from providers.base import BaseCountyProvider
from providers.kubra import KubraCountyProvider
from providers.simple import SimpleCountyJsonProvider
from providers.nisc import NiscCountyProvider
from providers.gwt_rpc import GwtRpcCountyProvider
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(os.path.join(SCRIPT_DIR, 'newpower_county.log')), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIG ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
CONFIG_FILE = os.path.join(SCRIPT_DIR, 'providers.json')
AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours
# --- CONFIG MANAGEMENT ---
def load_providers():
if not os.path.exists(CONFIG_FILE):
logger.error(f"{CONFIG_FILE} not found!")
return []
with open(CONFIG_FILE, 'r') as f:
# Filter for providers that have 'county_url' or 'county_meta_url'
all_providers = json.load(f)
return [p for p in all_providers if p.get('county_type')]
def save_providers(providers):
with open(CONFIG_FILE, 'w') as f:
json.dump(providers, f, indent=4, default=str)
logger.info("Configuration saved to providers.json")
def update_provider_config(provider_name, new_settings):
"""Updates a specific provider in the JSON file safely"""
# This needs to read the raw file, not the filtered one
with open(CONFIG_FILE, 'r') as f:
providers = json.load(f)
updated = False
for p in providers:
if p.get('name') == provider_name:
for key in ['headers', 'body', 'url', 'cookies', 'user_agent']:
if key in new_settings:
p[key] = new_settings[key]
p['last_auto_update'] = datetime.now(timezone.utc).isoformat()
updated = True
break
if updated:
save_providers(providers)
return True
return False
# --- DATABASE ---
class CountyPowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def insert_outage_snapshot(self, outage_data, fetch_time):
"""
Inserts a snapshot of county outage data for a given fetch time.
This creates a historical record of outages.
"""
all_values = []
for item in outage_data:
if all(k in item for k in ['county', 'state', 'company']):
# Standardize county name to title case (e.g., "MERCER" -> "Mercer")
county_name = item['county'].title() if isinstance(item['county'], str) else item['county']
val = (
item.get('outages'),
item.get('served'),
county_name,
item['state'],
fetch_time,
item['company']
)
all_values.append(val)
with self.conn.cursor() as cursor:
if all_values:
# Use a simple INSERT to create a historical record for each run.
# The column order matches the old power3.py script for the `countyoutages` table.
sql = """
INSERT INTO newcountyoutages (outages, served, county, state, fetch_time, company)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.executemany(sql, all_values)
logger.info(f"Inserted {len(all_values)} county outage records for this run.")
def run_post_processing(self):
logger.info("Running post-processing for county data...")
with self.conn.cursor() as cursor:
cursor.execute("""
UPDATE newcountyoutages
SET cwa = c.cwa
FROM public.county c
WHERE c.countyname = newcountyoutages.county
AND c.state = newcountyoutages.state
AND newcountyoutages.cwa IS NULL
""")
cursor.execute("DELETE FROM newcountyoutages WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("County post-processing complete.")
# --- PROVIDERS ---
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra_county': KubraCountyProvider,
'simple_county_json': SimpleCountyJsonProvider,
'nisc_hosted_county': NiscCountyProvider,
'gwt_rpc_county': GwtRpcCountyProvider,
}
# --- MAIN ---
def main():
S = requests.Session()
S.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'})
db = CountyPowerDB(DB_CONFIG)
run_timestamp = datetime.now(timezone.utc)
logger.info("Starting County Power Scraper...")
providers = load_providers()
for config in providers:
p_type = config.get('county_type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching county data for {p_name}...")
outages = provider.fetch()
logger.info(f"Found {len(outages)} active outage records for {p_name}.")
# Insert all collected outages as a new snapshot for this run time.
db.insert_outage_snapshot(outages, run_timestamp)
except Exception as e:
logger.error(f"Error processing {p_name}: {e}")
elif p_type:
logger.warning(f"Unknown provider type '{p_type}' for {p_name}")
db.run_post_processing()
db.close()
logger.info("County scraping complete.")
if __name__ == "__main__":
main()