import requests import json import psycopg2 import logging import os import re from datetime import datetime, timezone, timedelta from abc import ABC, abstractmethod from urllib.parse import urlparse from requests.packages.urllib3.exceptions import InsecureRequestWarning # Import the helper module for auto-repair import get_rpc_config_auto requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # --- LOGGING --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler('newpower_county.log'), logging.StreamHandler()] ) logger = logging.getLogger(__name__) # --- CONFIG --- DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'} CONFIG_FILE = 'providers.json' AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours # --- CONFIG MANAGEMENT --- def load_providers(): if not os.path.exists(CONFIG_FILE): logger.error(f"{CONFIG_FILE} not found!") return [] with open(CONFIG_FILE, 'r') as f: # Filter for providers that have 'county_url' or 'county_meta_url' all_providers = json.load(f) return [p for p in all_providers if p.get('county_type')] def save_providers(providers): with open(CONFIG_FILE, 'w') as f: json.dump(providers, f, indent=4, default=str) logger.info("Configuration saved to providers.json") def update_provider_config(provider_name, new_settings): """Updates a specific provider in the JSON file safely""" # This needs to read the raw file, not the filtered one with open(CONFIG_FILE, 'r') as f: providers = json.load(f) updated = False for p in providers: if p.get('name') == provider_name: for key in ['headers', 'body', 'url', 'cookies', 'user_agent']: if key in new_settings: p[key] = new_settings[key] p['last_auto_update'] = datetime.now(timezone.utc).isoformat() updated = True break if updated: save_providers(providers) return True return False # --- DATABASE --- class CountyPowerDB: def __init__(self, config): self.conn = psycopg2.connect(**config) self.conn.autocommit = True def close(self): self.conn.close() def upsert_and_zero_outages(self, company_name, outage_data): """ Atomically updates outage information for a given company. 1. UPSERTS counties with active outages, updating their counts. 2. SETS outage count to 0 for any other county from that company that was not in the active list. """ current_fetch_time = datetime.now(timezone.utc) # Prepare data for counties with active outages active_outage_values = [] reported_counties = [] for item in outage_data: if all(k in item for k in ['county', 'state', 'company']): val = ( item['county'], item['state'], item['company'], item.get('outages'), item.get('served'), current_fetch_time ) active_outage_values.append(val) reported_counties.append(item['county']) with self.conn.cursor() as cursor: # Step 1: UPSERT active outages if active_outage_values: upsert_sql = """ INSERT INTO newcountyoutages (county, state, company, outages, served, fetch_time) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (county, state, company) DO UPDATE SET outages = EXCLUDED.outages, served = COALESCE(EXCLUDED.served, newcountyoutages.served), fetch_time = EXCLUDED.fetch_time; """ cursor.executemany(upsert_sql, active_outage_values) logger.info(f"Upserted {len(active_outage_values)} active outage records for {company_name}.") # Step 2: Set outages to 0 for any other county from this company # This correctly creates point-in-time zero records by updating the fetch_time. zero_out_sql = """ UPDATE newcountyoutages SET outages = 0, fetch_time = %s WHERE company = %s AND county NOT IN %s; """ # Ensure reported_counties is not empty to avoid "IN (NULL)" if not reported_counties: reported_counties.append("NO_COUNTIES_REPORTED_DUMMY_VALUE") cursor.execute(zero_out_sql, (current_fetch_time, company_name, tuple(reported_counties))) logger.info(f"Zeroed out {cursor.rowcount} resolved outage records for {company_name}.") def run_post_processing(self): logger.info("Running post-processing for county data...") with self.conn.cursor() as cursor: cursor.execute(""" UPDATE newcountyoutages SET cwa = c.cwa FROM public.county c WHERE c.countyname = newcountyoutages.county AND c.state = newcountyoutages.state AND newcountyoutages.cwa IS NULL """) cursor.execute("DELETE FROM newcountyoutages WHERE fetch_time < NOW() - INTERVAL '30 days'") logger.info("County post-processing complete.") # --- PROVIDERS --- class BaseCountyProvider(ABC): def __init__(self, config, session): self.config = config self.session = session self.name = config.get('name', 'Unknown') @abstractmethod def fetch(self): pass class SimpleCountyJsonProvider(BaseCountyProvider): def fetch(self): url = self.config.get('county_url') state = self.config.get('state_filter') try: resp = self.session.get(url, verify=False) if not resp.ok: return [] data = resp.json() results = [] for boundary_group in data: for item in boundary_group.get('boundaries', []): results.append({ 'outages': item.get('customersOutNow'), 'served': item.get('customersServed'), 'county': item.get('name'), 'state': state, 'company': self.name }) return results except Exception as e: logger.error(f"Error fetching {self.name}: {e}") return [] class KubraCountyProvider(BaseCountyProvider): def fetch(self): meta_url = self.config.get('county_meta_url') report_url_suffix = self.config.get('county_report_suffix') try: # 1. Get hexes from meta_url meta_resp = self.session.get(meta_url) meta_data = meta_resp.json() path = meta_data.get('data', {}).get('cluster_interval_generation_data') if not path: return [] # 2. Construct final report URL # The old script's logic reveals the path is composed of a base, # the second hex from the metadata path, and the report suffix. # Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1 path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3'] if len(path_parts) < 4: logger.error(f"Invalid metadata path format for {self.name}: {path}") return [] # This is the single, correct URL format used by the original script. # It uses the fourth element (index 3) from the metadata path. report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}" # 3. Fetch and process report report_resp = self.session.get(report_url) if not report_resp.ok or not report_resp.text: logger.info(f"No county report data available for {self.name} at this time.") return [] report_data = report_resp.json() return self._normalize(report_data) except json.JSONDecodeError: logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.") return [] except requests.exceptions.RequestException as e: logger.error(f"Error fetching Kubra county data for {self.name}: {e}") return [] def _normalize(self, data): results = [] primary_areas = data.get("file_data", {}).get("areas", []) if not primary_areas: return [] first_item_key = primary_areas[0].get("key") if first_item_key == "state": for state_area in primary_areas: for county in state_area.get("areas", []): if county.get("key") == "county": results.append(self._extract_info(county)) elif first_item_key == "county": for county in primary_areas: if county.get("key") == "county": results.append(self._extract_info(county)) return results def _extract_info(self, county_item): return { 'outages': county_item.get('cust_a', {}).get('val'), 'served': county_item.get('cust_s'), 'county': county_item.get('name', '').capitalize(), 'state': county_item.get('state') or self.config.get('state_filter'), 'company': self.name } class NiscCountyProvider(BaseCountyProvider): """ Handles county data from NISC-hosted cloud sources. """ def fetch(self): url = self.config.get('county_url') state = self.config.get('state_filter') try: resp = self.session.get(url, verify=False) if not resp.ok: return [] data = resp.json() results = [] # The structure is typically a list containing one object with a 'boundaries' key for boundary_group in data: for item in boundary_group.get('boundaries', []): results.append({ 'outages': item.get('customersOutNow'), 'served': item.get('customersServed'), 'county': item.get('name'), 'state': state, 'company': self.name }) return results except Exception as e: logger.error(f"Error fetching NISC county data for {self.name}: {e}") return [] class GwtRpcCountyProvider(BaseCountyProvider): """ Handles county data from GWT-RPC sources. """ def __init__(self, config, session): super().__init__(config, session) self.state_filter = config.get('state_filter') self.map_url = config.get('map_url') # Set up session headers and cookies from config self.session.headers.update({ 'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'), 'Accept': '*/*', 'Sec-Fetch-Site': 'same-origin' }) if config.get('cookies'): for cookie in config['cookies']: self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) def attempt_auto_repair(self): if not self.map_url: return False last_update = self.config.get('last_auto_update') if last_update: try: last_dt = datetime.fromisoformat(last_update) if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS): logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).") return False except ValueError: pass logger.info(f"Attempting Auto-Repair for {self.name}...") try: _, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url) if valid_headers and valid_body: logger.info(f"Repair successful! Updating {self.name}.") excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'} clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded} clean_headers['Referer'] = self.map_url # Update in-memory config for the current run self.config.update({ 'headers': clean_headers, 'body': valid_body, 'cookies': valid_cookies, 'user_agent': valid_headers.get('user-agent'), 'last_auto_update': datetime.now(timezone.utc).isoformat() }) # Update session for the current run self.session.cookies.clear() for cookie in valid_cookies: self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) # Save to disk for next time update_provider_config(self.name, self.config) return True except Exception as e: logger.error(f"Auto-repair failed: {e}") return False def fetch(self, is_retry=False): url = self.config.get('url') headers = self.config.get('headers', {}) body = self.config.get('body') if not url or not body: return [] try: parsed_url = urlparse(url) origin = f"{parsed_url.scheme}://{parsed_url.netloc}" correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin req_headers = headers.copy() req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8' req_headers['Referer'] = correct_referer resp = self.session.post(url, headers=req_headers, data=body, verify=False) if "//EX" in resp.text or resp.status_code == 500: logger.error(f"GWT Failure for {self.name} (County Fetch).") if is_retry: return [] if self.attempt_auto_repair(): logger.info("Retrying county fetch with new settings...") # After repair, self.config is updated, so we can just call fetch again. return self.fetch(is_retry=True) return [] if not resp.ok: return [] text = resp.text.replace('//OK', '') return self._extract_county_summary(json.loads(text)) except Exception as e: logger.error(f"County fetch error for {self.name}: {e}") return [] def _extract_county_summary(self, data_list): """ Decodes a GWT-RPC payload to extract outage data for Counties. This logic is adapted from test.py. """ try: # 1. Separate Stream and String Table string_table = None stream_raw = [] for item in data_list: if isinstance(item, list): string_table = item break else: stream_raw.append(item) if not string_table: logger.error(f"String table not found in payload for {self.name}.") return [] # 2. Normalize the Stream stream = [] for token in stream_raw: if isinstance(token, int): stream.append(token) elif isinstance(token, float): stream.append(int(token)) elif isinstance(token, str): try: stream.append(int(float(token))) except ValueError: pass # Ignore non-numeric strings # 3. Decode Logic REGION_SIG = "cc.nisc.oms.clientandserver.v2.pojo.Region/3192921568" INTEGER_SIG = "java.lang.Integer/3438268394" CATEGORY_KEY = "County" def get_index(val): try: return string_table.index(val) + 1 except ValueError: return 0 region_type_id = get_index(REGION_SIG) integer_type_id = get_index(INTEGER_SIG) county_type_id = get_index(CATEGORY_KEY) if region_type_id == 0: logger.error(f"Region type signature not found for {self.name}.") return [] results = [] i = 0 stream_len = len(stream) while i < stream_len: if stream[i] == region_type_id: try: p = i + 1 served = 0 val1 = stream[p] p += 1 if p < stream_len and stream[p] == integer_type_id: served = val1 p += 1 out = 0 val2 = stream[p] p += 1 if p < stream_len and stream[p] == integer_type_id: out = val2 p += 1 name_idx = stream[p] p += 1 cat_idx = stream[p] if cat_idx == county_type_id: name = "Unknown" if 0 < name_idx <= len(string_table): name = string_table[name_idx - 1] results.append({ 'county': name, 'state': self.state_filter, 'company': self.name, 'outages': out, 'served': served }) except IndexError: pass i += 1 return results except (ValueError, IndexError, TypeError) as e: logger.error(f"Could not parse county summary for {self.name}: {e}") return [] # --- REGISTRY --- PROVIDER_REGISTRY = { 'kubra_county': KubraCountyProvider, 'simple_county_json': SimpleCountyJsonProvider, 'nisc_hosted_county': NiscCountyProvider, 'gwt_rpc_county': GwtRpcCountyProvider, } # --- MAIN --- def main(): S = requests.Session() S.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}) db = CountyPowerDB(DB_CONFIG) logger.info("Starting County Power Scraper...") providers = load_providers() for config in providers: p_type = config.get('county_type') p_name = config.get('name') ProviderClass = PROVIDER_REGISTRY.get(p_type) if ProviderClass: try: provider = ProviderClass(config, S) logger.info(f"Fetching county data for {p_name}...") outages = provider.fetch() logger.info(f"Found {len(outages)} active outage records for {p_name}.") # Process this company's data in a single transaction db.upsert_and_zero_outages(p_name, outages) except Exception as e: logger.error(f"Error processing {p_name}: {e}") elif p_type: logger.warning(f"Unknown provider type '{p_type}' for {p_name}") db.run_post_processing() db.close() logger.info("County scraping complete.") if __name__ == "__main__": main()