This commit is contained in:
2025-12-07 12:38:11 +00:00
parent 9e1c17b2f6
commit a765aa23f4
13 changed files with 613 additions and 789 deletions

1
__init__.py Normal file
View File

@@ -0,0 +1 @@
# This file makes the 'providers' directory a Python package.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

23
base.py Normal file
View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
class BaseProvider(ABC):
"""Abstract base class for point-based outage providers."""
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass
class BaseCountyProvider(ABC):
"""Abstract base class for county-based outage providers."""
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass

255
gwt_rpc.py Normal file
View File

@@ -0,0 +1,255 @@
import logging
import json
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from pyproj import Transformer
import get_rpc_config_auto
from base import BaseProvider, BaseCountyProvider
logger = logging.getLogger(__name__)
class GwtRpcBaseProvider:
"""Base class for GWT-RPC providers to share common logic like auto-repair."""
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
self.map_url = config.get('map_url')
self.state_filter = config.get('state_filter')
self.AUTO_UPDATE_COOLDOWN_HOURS = 4
# Set up session headers and cookies from config
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'),
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin'
})
if config.get('cookies'):
for cookie in config['cookies']:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
def attempt_auto_repair(self):
if not self.map_url: return False
last_update = self.config.get('last_auto_update')
if last_update:
try:
last_dt = datetime.fromisoformat(last_update)
if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - last_dt < timedelta(hours=self.AUTO_UPDATE_COOLDOWN_HOURS):
logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).")
return False
except ValueError: pass
logger.info(f"Attempting Auto-Repair for {self.name}...")
try:
# This function needs to be defined in the main script context to save config
from newpower import update_provider_config as update_county_config
except ImportError:
from newpower2 import update_provider_config as update_point_config
update_county_config = update_point_config # Fallback
try:
_, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url)
if valid_headers and valid_body:
logger.info(f"Repair successful! Updating {self.name}.")
excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'}
clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded}
clean_headers['Referer'] = self.map_url
new_settings = {
'headers': clean_headers, 'body': valid_body, 'cookies': valid_cookies,
'user_agent': valid_headers.get('user-agent'),
}
# Update in-memory config for the current run
self.config.update(new_settings)
self.config['last_auto_update'] = datetime.now(timezone.utc).isoformat()
# Update session for the current run
self.session.cookies.clear()
for cookie in valid_cookies:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
# Save to disk for next time
update_county_config(self.name, self.config)
return True
except Exception as e:
logger.error(f"Auto-repair failed: {e}")
return False
def _fetch_rpc_data(self, is_retry=False):
url = self.config.get('url')
headers = self.config.get('headers', {})
body = self.config.get('body')
if not url or not body: return None
parsed_url = urlparse(url)
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin
req_headers = headers.copy()
req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8'
req_headers['Referer'] = correct_referer
resp = self.session.post(url, headers=req_headers, data=body, verify=False)
if "//EX" in resp.text or resp.status_code == 500:
logger.error(f"GWT Failure for {self.name}.")
if is_retry: return None
if self.attempt_auto_repair():
logger.info("Retrying fetch with new settings...")
return self._fetch_rpc_data(is_retry=True)
return None
if not resp.ok: return None
return json.loads(resp.text.replace('//OK', ''))
class GwtRpcCountyProvider(GwtRpcBaseProvider, BaseCountyProvider):
def fetch(self):
try:
data = self._fetch_rpc_data()
if data:
return self._extract_county_summary(data)
return []
except Exception as e:
logger.error(f"County fetch error for {self.name}: {e}")
return []
def _extract_county_summary(self, data_list):
try:
string_table = next((item for item in data_list if isinstance(item, list)), None)
if not string_table: return []
stream_raw = [item for item in data_list if not isinstance(item, list)]
stream = [int(token) for token in stream_raw if isinstance(token, (int, float, str)) and str(token).replace('.','',1).isdigit()]
REGION_SIG = "cc.nisc.oms.clientandserver.v2.pojo.Region/3192921568"
INTEGER_SIG = "java.lang.Integer/3438268394"
CATEGORY_KEY = "County"
def get_index(val):
try: return string_table.index(val) + 1
except ValueError: return 0
region_type_id = get_index(REGION_SIG)
integer_type_id = get_index(INTEGER_SIG)
county_type_id = get_index(CATEGORY_KEY)
if region_type_id == 0: return []
results = []
i = 0
while i < len(stream):
if stream[i] == region_type_id:
try:
p = i + 1
served = stream[p] if stream[p+1] == integer_type_id else 0
p += 2 if served > 0 else 1
out = stream[p] if stream[p+1] == integer_type_id else 0
p += 2 if out > 0 else 1
name_idx, cat_idx = stream[p], stream[p+1]
if cat_idx == county_type_id:
name = string_table[name_idx - 1] if 0 < name_idx <= len(string_table) else "Unknown"
results.append({'county': name, 'state': self.state_filter, 'company': self.name, 'outages': out, 'served': served})
except IndexError: pass
i += 1
return results
except Exception as e:
logger.error(f"Could not parse county summary for {self.name}: {e}")
return []
class GwtRpcProvider(GwtRpcBaseProvider, BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
self.STATE_BOUNDS = {
'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7},
'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5},
'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9},
'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1}
}
if config.get('epsg'):
try:
self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True)
except: logger.error(f"EPSG Error for {self.name}")
def fetch(self):
try:
data = self._fetch_rpc_data()
if data:
return self._extract_outages(data)
return []
except Exception as e:
logger.error(f"Fetch error {self.name}: {e}")
return []
def _extract_outages(self, data_list):
results = []
try:
string_table = next((item for item in data_list if isinstance(item, list)), None)
if not string_table: return []
stream_raw = [item for item in data_list if not isinstance(item, list)]
stream = [int(token) for token in stream_raw if isinstance(token, (int, float))]
OUTAGE_SIG_KEYWORD = ".pojo.Outage/"
outage_sig_full = next((s for s in string_table if OUTAGE_SIG_KEYWORD in s), None)
if not outage_sig_full: return []
outage_type_id = string_table.index(outage_sig_full) + 1
i = 0
while i < len(stream):
if stream[i] == outage_type_id:
try:
p = i + 1
outagen = stream[p]; p += 1
crew_status_idx = stream[p]; p += 1
cause_idx = stream[p]; p += 1
etr_high = stream[p]; p += 1
etr_low = stream[p]; p += 1; p += 1
start_high = stream[p]; p += 1
start_low = stream[p]; p += 1; p += 1
coord_x = stream[p]; p += 1
coord_y = stream[p]; p += 1
lat, lon = None, None
if self.transformer and coord_x and coord_y:
try:
lon, lat = self.transformer.transform(coord_x, coord_y)
if not self._is_valid(lat, lon): lat, lon = None, None
except: pass
if lat and lon:
start_ms = (start_high << 32) | start_low
etr_ms = (etr_high << 32) | etr_low
start_time = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc) if start_ms > 0 else None
etr_time = datetime.fromtimestamp(etr_ms / 1000, tz=timezone.utc) if etr_ms > 0 else None
cause = string_table[cause_idx - 1].strip() if 0 < cause_idx <= len(string_table) else "Unknown"
crew_status = string_table[crew_status_idx - 1].strip() if 0 < crew_status_idx <= len(string_table) else "Unknown"
results.append({
'incidentid': f"{self.name}-{lat:.5f}-{lon:.5f}", 'utility': self.name,
'lat': lat, 'lon': lon, 'pointgeom': f"{lat:.5f},{lon:.5f}",
'start': start_time, 'etr': etr_time, 'outagen': outagen,
'cause': cause, 'crew_status': crew_status,
'last_change': datetime.now(timezone.utc)
})
except (IndexError, TypeError):
pass
i += 1
return results
except Exception as e:
logger.error(f"Could not parse point outages for {self.name}: {e}")
return []
def _is_valid(self, lat, lon):
if not self.state_filter: return True
b = self.STATE_BOUNDS.get(self.state_filter)
if not b: return True
return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max']

173
kubra.py Normal file
View File

@@ -0,0 +1,173 @@
import requests
import json
import logging
import polyline
import mercantile
from datetime import datetime
from base import BaseProvider, BaseCountyProvider
logger = logging.getLogger(__name__)
class KubraCountyProvider(BaseCountyProvider):
def fetch(self):
meta_url = self.config.get('county_meta_url')
report_url_suffix = self.config.get('county_report_suffix')
try:
# 1. Get hexes from meta_url
meta_resp = self.session.get(meta_url)
meta_data = meta_resp.json()
path = meta_data.get('data', {}).get('cluster_interval_generation_data')
if not path: return []
# 2. Construct final report URL
# The old script's logic reveals the path is composed of a base,
# the second hex from the metadata path, and the report suffix.
# Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1
path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3']
if len(path_parts) < 4:
logger.error(f"Invalid metadata path format for {self.name}: {path}")
return []
# This is the single, correct URL format used by the original script.
# It uses the fourth element (index 3) from the metadata path.
report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}"
# 3. Fetch and process report
report_resp = self.session.get(report_url)
if not report_resp.ok or not report_resp.text:
logger.info(f"No county report data available for {self.name} at this time.")
return []
report_data = report_resp.json()
return self._normalize(report_data)
except json.JSONDecodeError:
logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching Kubra county data for {self.name}: {e}")
return []
def _normalize(self, data):
results = []
primary_areas = data.get("file_data", {}).get("areas", [])
if not primary_areas: return []
first_item_key = primary_areas[0].get("key")
if first_item_key == "state":
for state_area in primary_areas:
for county in state_area.get("areas", []):
if county.get("key") == "county":
results.append(self._extract_info(county))
elif first_item_key == "county":
for county in primary_areas:
if county.get("key") == "county":
results.append(self._extract_info(county))
return results
def _extract_info(self, county_item):
return {
'outages': county_item.get('cust_a', {}).get('val'),
'served': county_item.get('cust_s'),
'county': county_item.get('name', '').capitalize(),
'state': county_item.get('state') or self.config.get('state_filter'),
'company': self.name
}
class KubraProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.max_zoom = 14
self.results = []
self.base_url_template = 'https://kubra.io/cluster-data/'
def fetch(self):
meta_url = self.config.get('meta_url')
if not meta_url: return []
# Fetch hexes ONCE per run, not in the recursive loop.
self.hex1, self.hex2 = self._get_hexes(meta_url)
if not self.hex1 or not self.hex2:
logger.error(f"[{self.name}] Could not get session hex keys. Aborting fetch for this provider.")
return []
quadkeys = self.config.get('quadkeys', [])
self.results = []
self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0]))
return self.results
def _get_hexes(self, url):
try:
resp = self.session.get(url)
path = resp.json().get('data', {}).get('cluster_interval_generation_data')
parts = path.split('/')
return parts[2], parts[3]
except: return None, None
def _fetch_recursive(self, quadkeys, seen, zoom):
for q in quadkeys:
suffix = q[-3:][::-1]
url = f"{self.base_url_template}{suffix}/{self.hex1}/{self.hex2}/public/{self.config.get('layer')}/{q}.json"
try:
resp = self.session.get(url)
if not resp.ok:
continue
file_data = resp.json().get('file_data', [])
for item in file_data:
desc = item.get('desc')
# This mirrors the safe logic from the original power2.py's 'kubra' function.
# If 'desc' is missing, assume it's a cluster to be safe and drill down.
is_cluster = True if desc is None else desc.get('cluster', False)
# If it's a cluster and we haven't hit max zoom, drill down.
if is_cluster and zoom + 1 <= self.max_zoom:
p_geom = item.get('geom', {}).get('p', [])
if p_geom:
next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1)
self._fetch_recursive([next_key], seen, zoom + 1)
else:
# Otherwise, it's a final outage record. Process it.
self.results.append(self._normalize(item))
except Exception as e:
logger.error(f"[{self.name}] Unhandled exception in _fetch_recursive for {q}: {e}", exc_info=True)
def _normalize(self, item):
# Ensure 'desc' is a dictionary, even if it's missing from the item. This prevents the AttributeError.
desc = item.get('desc') or {}
geom = item.get('geom', {})
p = geom.get('p', [None])[0] if geom.get('p') else None
if not p:
return {}
latlon = polyline.decode(p)[0]
def ts(s):
if not s or s=='ETR-NULL': return None
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
except:
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M%z")
except: return None
cause_dict = desc.get('cause')
cause = cause_dict.get('EN-US', "Pending") if cause_dict else "Pending"
crew_dict = desc.get('crew_status')
crew_status = crew_dict.get('EN-US') if crew_dict else None
return {
'incidentid': desc.get('inc_id'), 'utility': self.name,
'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'),
'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')),
'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': cause,
'crew_status': crew_status, 'active': True
}
def _get_quadkey_for_point(self, p, z):
ll = polyline.decode(p)[0]
return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z))
def _get_neighbors(self, q):
t = mercantile.quadkey_to_tile(q)
return [mercantile.quadkey(n) for n in mercantile.neighbors(t)]

View File

@@ -1,17 +1,27 @@
import os
import sys
# Add the script's directory to the Python path to ensure modules can be found
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import requests
import json
import psycopg2
import logging
import os
import re
from datetime import datetime, timezone, timedelta
from abc import ABC, abstractmethod
from urllib.parse import urlparse
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
# Import provider classes
from base import BaseCountyProvider
from kubra import KubraCountyProvider
from simple import SimpleCountyJsonProvider
from nisc import NiscCountyProvider
from gwt_rpc import GwtRpcCountyProvider
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
@@ -134,314 +144,6 @@ class CountyPowerDB:
# --- PROVIDERS ---
class BaseCountyProvider(ABC):
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass
class SimpleCountyJsonProvider(BaseCountyProvider):
def fetch(self):
url = self.config.get('county_url')
state = self.config.get('state_filter')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
for boundary_group in data:
for item in boundary_group.get('boundaries', []):
results.append({
'outages': item.get('customersOutNow'),
'served': item.get('customersServed'),
'county': item.get('name'),
'state': state,
'company': self.name
})
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
class KubraCountyProvider(BaseCountyProvider):
def fetch(self):
meta_url = self.config.get('county_meta_url')
report_url_suffix = self.config.get('county_report_suffix')
try:
# 1. Get hexes from meta_url
meta_resp = self.session.get(meta_url)
meta_data = meta_resp.json()
path = meta_data.get('data', {}).get('cluster_interval_generation_data')
if not path: return []
# 2. Construct final report URL
# The old script's logic reveals the path is composed of a base,
# the second hex from the metadata path, and the report suffix.
# Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1
path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3']
if len(path_parts) < 4:
logger.error(f"Invalid metadata path format for {self.name}: {path}")
return []
# This is the single, correct URL format used by the original script.
# It uses the fourth element (index 3) from the metadata path.
report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}"
# 3. Fetch and process report
report_resp = self.session.get(report_url)
if not report_resp.ok or not report_resp.text:
logger.info(f"No county report data available for {self.name} at this time.")
return []
report_data = report_resp.json()
return self._normalize(report_data)
except json.JSONDecodeError:
logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching Kubra county data for {self.name}: {e}")
return []
def _normalize(self, data):
results = []
primary_areas = data.get("file_data", {}).get("areas", [])
if not primary_areas: return []
first_item_key = primary_areas[0].get("key")
if first_item_key == "state":
for state_area in primary_areas:
for county in state_area.get("areas", []):
if county.get("key") == "county":
results.append(self._extract_info(county))
elif first_item_key == "county":
for county in primary_areas:
if county.get("key") == "county":
results.append(self._extract_info(county))
return results
def _extract_info(self, county_item):
return {
'outages': county_item.get('cust_a', {}).get('val'),
'served': county_item.get('cust_s'),
'county': county_item.get('name', '').capitalize(),
'state': county_item.get('state') or self.config.get('state_filter'),
'company': self.name
}
class NiscCountyProvider(BaseCountyProvider):
""" Handles county data from NISC-hosted cloud sources. """
def fetch(self):
url = self.config.get('county_url')
state = self.config.get('state_filter')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
# The structure is typically a list containing one object with a 'boundaries' key
for boundary_group in data:
for item in boundary_group.get('boundaries', []):
results.append({
'outages': item.get('customersOutNow'),
'served': item.get('customersServed'),
'county': item.get('name'),
'state': state,
'company': self.name
})
return results
except Exception as e:
logger.error(f"Error fetching NISC county data for {self.name}: {e}")
return []
class GwtRpcCountyProvider(BaseCountyProvider):
""" Handles county data from GWT-RPC sources. """
def __init__(self, config, session):
super().__init__(config, session)
self.state_filter = config.get('state_filter')
self.map_url = config.get('map_url')
# Set up session headers and cookies from config
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'),
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin'
})
if config.get('cookies'):
for cookie in config['cookies']:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
def attempt_auto_repair(self):
if not self.map_url: return False
last_update = self.config.get('last_auto_update')
if last_update:
try:
last_dt = datetime.fromisoformat(last_update)
if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS):
logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).")
return False
except ValueError: pass
logger.info(f"Attempting Auto-Repair for {self.name}...")
try:
_, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url)
if valid_headers and valid_body:
logger.info(f"Repair successful! Updating {self.name}.")
excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'}
clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded}
clean_headers['Referer'] = self.map_url
# Update in-memory config for the current run
self.config.update({
'headers': clean_headers, 'body': valid_body, 'cookies': valid_cookies,
'user_agent': valid_headers.get('user-agent'),
'last_auto_update': datetime.now(timezone.utc).isoformat()
})
# Update session for the current run
self.session.cookies.clear()
for cookie in valid_cookies:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
# Save to disk for next time
update_provider_config(self.name, self.config)
return True
except Exception as e:
logger.error(f"Auto-repair failed: {e}")
return False
def fetch(self, is_retry=False):
url = self.config.get('url')
headers = self.config.get('headers', {})
body = self.config.get('body')
if not url or not body: return []
try:
parsed_url = urlparse(url)
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin
req_headers = headers.copy()
req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8'
req_headers['Referer'] = correct_referer
resp = self.session.post(url, headers=req_headers, data=body, verify=False)
if "//EX" in resp.text or resp.status_code == 500:
logger.error(f"GWT Failure for {self.name} (County Fetch).")
if is_retry: return []
if self.attempt_auto_repair():
logger.info("Retrying county fetch with new settings...")
# After repair, self.config is updated, so we can just call fetch again.
return self.fetch(is_retry=True)
return []
if not resp.ok: return []
text = resp.text.replace('//OK', '')
return self._extract_county_summary(json.loads(text))
except Exception as e:
logger.error(f"County fetch error for {self.name}: {e}")
return []
def _extract_county_summary(self, data_list):
"""
Decodes a GWT-RPC payload to extract outage data for Counties.
This logic is adapted from test.py.
"""
try:
# 1. Separate Stream and String Table
string_table = None
stream_raw = []
for item in data_list:
if isinstance(item, list):
string_table = item
break
else:
stream_raw.append(item)
if not string_table:
logger.error(f"String table not found in payload for {self.name}.")
return []
# 2. Normalize the Stream
stream = []
for token in stream_raw:
if isinstance(token, int):
stream.append(token)
elif isinstance(token, float):
stream.append(int(token))
elif isinstance(token, str):
try:
stream.append(int(float(token)))
except ValueError:
pass # Ignore non-numeric strings
# 3. Decode Logic
REGION_SIG = "cc.nisc.oms.clientandserver.v2.pojo.Region/3192921568"
INTEGER_SIG = "java.lang.Integer/3438268394"
CATEGORY_KEY = "County"
def get_index(val):
try: return string_table.index(val) + 1
except ValueError: return 0
region_type_id = get_index(REGION_SIG)
integer_type_id = get_index(INTEGER_SIG)
county_type_id = get_index(CATEGORY_KEY)
if region_type_id == 0:
logger.error(f"Region type signature not found for {self.name}.")
return []
results = []
i = 0
stream_len = len(stream)
while i < stream_len:
if stream[i] == region_type_id:
try:
p = i + 1
served = 0
val1 = stream[p]
p += 1
if p < stream_len and stream[p] == integer_type_id:
served = val1
p += 1
out = 0
val2 = stream[p]
p += 1
if p < stream_len and stream[p] == integer_type_id:
out = val2
p += 1
name_idx = stream[p]
p += 1
cat_idx = stream[p]
if cat_idx == county_type_id:
name = "Unknown"
if 0 < name_idx <= len(string_table):
name = string_table[name_idx - 1]
results.append({
'county': name, 'state': self.state_filter,
'company': self.name, 'outages': out, 'served': served
})
except IndexError:
pass
i += 1
return results
except (ValueError, IndexError, TypeError) as e:
logger.error(f"Could not parse county summary for {self.name}: {e}")
return []
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra_county': KubraCountyProvider,

View File

@@ -1,13 +1,16 @@
import os
import sys
# Add the script's directory to the Python path to ensure modules can be found
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import requests
import polyline
import json
import psycopg2
import mercantile
import logging
import os
import re
from datetime import datetime, timezone, timedelta
from abc import ABC, abstractmethod
from urllib.parse import urlparse
from pyproj import Transformer
from requests.packages.urllib3.exceptions import InsecureRequestWarning
@@ -15,6 +18,13 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
# Import provider classes
from base import BaseProvider
from kubra import KubraProvider
from simple import SimpleJsonProvider
from gwt_rpc import GwtRpcProvider
from nisc import NiscHostedProvider
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
@@ -114,482 +124,6 @@ class PowerDB:
cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("Post-processing complete.")
# --- PROVIDERS ---
class BaseProvider(ABC):
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass
class SimpleJsonProvider(BaseProvider):
def fetch(self):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
for item in data:
results.append(self._normalize(item))
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
def safe_parse(ts):
if not ts: return None
try: return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except: return None
return {
'incidentid': str(item.get('outageRecID')), 'utility': self.name,
'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'),
'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}",
'areageom': None, 'start': safe_parse(item.get('outageStartTime')),
'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'),
'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'),
'last_change': safe_parse(item.get('outageModifiedTime'))
}
class KubraProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.max_zoom = 14
self.results = []
self.base_url_template = 'https://kubra.io/cluster-data/'
def fetch(self):
meta_url = self.config.get('meta_url')
if not meta_url: return []
# Fetch hexes ONCE per run, not in the recursive loop.
self.hex1, self.hex2 = self._get_hexes(meta_url)
if not self.hex1 or not self.hex2:
logger.error(f"[{self.name}] Could not get session hex keys. Aborting fetch for this provider.")
return []
quadkeys = self.config.get('quadkeys', [])
self.results = []
self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0]))
return self.results
def _get_hexes(self, url):
try:
resp = self.session.get(url)
path = resp.json().get('data', {}).get('cluster_interval_generation_data')
parts = path.split('/')
return parts[2], parts[3]
except: return None, None
def _fetch_recursive(self, quadkeys, seen, zoom):
for q in quadkeys:
suffix = q[-3:][::-1]
url = f"{self.base_url_template}{suffix}/{self.hex1}/{self.hex2}/public/{self.config.get('layer')}/{q}.json"
try:
resp = self.session.get(url)
if not resp.ok:
continue
file_data = resp.json().get('file_data', [])
for item in file_data:
desc = item.get('desc')
# This mirrors the safe logic from the original power2.py's 'kubra' function.
# If 'desc' is missing, assume it's a cluster to be safe and drill down.
is_cluster = True if desc is None else desc.get('cluster', False)
# If it's a cluster and we haven't hit max zoom, drill down.
if is_cluster and zoom + 1 <= self.max_zoom:
p_geom = item.get('geom', {}).get('p', [])
if p_geom:
next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1)
self._fetch_recursive([next_key], seen, zoom + 1)
else:
# Otherwise, it's a final outage record. Process it.
self.results.append(self._normalize(item))
except Exception as e:
logger.error(f"[{self.name}] Unhandled exception in _fetch_recursive for {q}: {e}", exc_info=True)
def _normalize(self, item):
# Ensure 'desc' is a dictionary, even if it's missing from the item. This prevents the AttributeError.
desc = item.get('desc') or {}
geom = item.get('geom', {})
p = geom.get('p', [None])[0] if geom.get('p') else None
if not p:
return {}
latlon = polyline.decode(p)[0]
def ts(s):
if not s or s=='ETR-NULL': return None
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
except:
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M%z")
except: return None
cause_dict = desc.get('cause')
cause = cause_dict.get('EN-US', "Pending") if cause_dict else "Pending"
crew_dict = desc.get('crew_status')
crew_status = crew_dict.get('EN-US') if crew_dict else None
return {
'incidentid': desc.get('inc_id'), 'utility': self.name,
'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'),
'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')),
'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': cause,
'crew_status': crew_status, 'active': True
}
def _get_quadkey_for_point(self, p, z):
ll = polyline.decode(p)[0]
return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z))
def _get_neighbors(self, q):
t = mercantile.quadkey_to_tile(q)
return [mercantile.quadkey(n) for n in mercantile.neighbors(t)]
class GwtRpcProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
self.state_filter = config.get('state_filter')
self.map_url = config.get('map_url')
# 1. Base Headers
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'),
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin'
})
parsed_url = urlparse(config.get('url'))
self.session.headers.update({'Origin': f"{parsed_url.scheme}://{parsed_url.netloc}"})
# 2. Load Cookies
if config.get('cookies'):
for cookie in config['cookies']:
try:
self.session.cookies.set(
cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']
)
except: pass
self.STATE_BOUNDS = {
'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7},
'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5},
'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9},
'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1}
}
if config.get('epsg'):
try:
self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True)
except: logger.error(f"EPSG Error for {self.name}")
def attempt_auto_repair(self):
if not self.map_url: return False
# --- Cooldown Check ---
last_update = self.config.get('last_auto_update')
if last_update:
try:
last_dt = datetime.fromisoformat(last_update)
if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS):
logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).")
return False
except ValueError: pass
logger.info(f"Attempting Auto-Repair for {self.name}...")
try:
# Expecting 4 values: data, headers, cookies, body
new_data, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url)
if valid_headers and valid_body:
logger.info(f"Repair successful! Updating {self.name}.")
# Clean Headers (Blacklist approach)
excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'}
clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded}
# Ensure Referer is set correctly for next time
clean_headers['Referer'] = self.map_url
# Update In-Memory Config
current_time = datetime.now(timezone.utc).isoformat()
self.config['headers'] = clean_headers
self.config['body'] = valid_body
self.config['cookies'] = valid_cookies
self.config['user_agent'] = valid_headers.get('user-agent')
self.config['last_auto_update'] = current_time
# Update Session
self.session.cookies.clear()
for cookie in valid_cookies:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
if valid_headers.get('user_agent'):
self.session.headers.update({'User-Agent': valid_headers.get('user-agent')})
# Save to Disk
new_settings = {
'headers': clean_headers,
'body': valid_body,
'cookies': valid_cookies,
'user_agent': valid_headers.get('user-agent')
}
update_provider_config(self.name, new_settings)
return True
except Exception as e:
logger.error(f"Auto-repair failed: {e}")
return False
def fetch(self, is_retry=False):
url = self.config.get('url')
headers = self.config.get('headers', {})
body = self.config.get('body')
if not url: return []
try:
# 3. Dynamic Origin Update
parsed_url = urlparse(url)
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Priority: Configured Referer > Module Base > Origin
correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin
ua = headers.get('User-Agent', self.session.headers['User-Agent'])
if "Headless" in ua: # Fallback safety
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
self.session.headers.update({
'Origin': origin,
'Referer': correct_referer,
'User-Agent': ua
})
# Prime if missing cookies or map_url is defined
if self.map_url and not self.config.get('cookies'):
try: self.session.get(correct_referer, verify=False, timeout=10)
except: pass
req_headers = headers.copy()
req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8'
req_headers['Referer'] = correct_referer
req_headers['User-Agent'] = ua
# Only fetch if we have a body
if body:
resp = self.session.post(url, headers=req_headers, data=body, verify=False)
else:
resp = type('obj', (object,), {'status_code': 500, 'text': 'No Body', 'ok': False})()
# 5. Error Handling & Retry
failed = False
if "//EX" in resp.text: failed = True
if resp.status_code == 500: failed = True
if failed:
logger.error(f"GWT Failure for {self.name} (Status: {resp.status_code}).")
if is_retry:
logger.error(f"Retry failed for {self.name}. Aborting.")
return []
if self.attempt_auto_repair():
logger.info("Retrying fetch with new settings...")
return self.fetch(is_retry=True)
else:
return []
if not resp.ok: return []
text = resp.text
if text.startswith('//OK'): text = text[4:]
return self._extract_outages(json.loads(text))
except Exception as e:
logger.error(f"Fetch error {self.name}: {e}")
return []
def _extract_outages(self, data_list):
"""
Decodes a GWT-RPC payload to extract detailed point outage information.
This is a more robust implementation that replaces the previous heuristic-based coordinate search.
"""
results = []
try:
# 1. Separate Stream and String Table
string_table = next((item for item in data_list if isinstance(item, list)), None)
if not string_table: return []
stream_raw = [item for item in data_list if not isinstance(item, list)]
# 2. Normalize the Stream
stream = [int(token) for token in stream_raw if isinstance(token, (int, float))]
# 3. Define Signatures and Helper
# The signature can vary (e.g., cc.nisc... vs coop.nisc...).
# We search for a common, unique substring.
OUTAGE_SIG_KEYWORD = ".pojo.Outage/"
outage_sig_full = next((s for s in string_table if OUTAGE_SIG_KEYWORD in s), None)
if not outage_sig_full:
logger.error(f"Outage type signature not found for {self.name}.")
return []
# Get the 1-based index of the found signature
outage_type_id = string_table.index(outage_sig_full) + 1
# 4. Decode Logic
i = 0
stream_len = len(stream)
while i < stream_len:
if stream[i] == outage_type_id:
try:
p = i + 1
# Field extraction based on observed GWT stream structure
outagen = stream[p] if p < stream_len else 0; p += 1
crew_status_idx = stream[p] if p < stream_len else 0; p += 1
cause_idx = stream[p] if p < stream_len else 0; p += 1
etr_high = stream[p] if p < stream_len else 0; p += 1
etr_low = stream[p] if p < stream_len else 0; p += 1
p += 1 # Skip long type ID
start_high = stream[p] if p < stream_len else 0; p += 1
start_low = stream[p] if p < stream_len else 0; p += 1
p += 1 # Skip long type ID
coord_x = stream[p] if p < stream_len else 0; p += 1
coord_y = stream[p] if p < stream_len else 0; p += 1
# Process coordinates
lat, lon = None, None
if self.transformer and coord_x and coord_y:
try:
lon, lat = self.transformer.transform(coord_x, coord_y)
if not self._is_valid(lat, lon): lat, lon = None, None
except: pass
if lat and lon:
# Process timestamps (GWT sends 64-bit longs as two 32-bit integers)
start_ms = (start_high << 32) | start_low
etr_ms = (etr_high << 32) | etr_low
start_time = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc) if start_ms > 0 else None
etr_time = datetime.fromtimestamp(etr_ms / 1000, tz=timezone.utc) if etr_ms > 0 else None
# Resolve strings from table
cause = string_table[cause_idx - 1].strip() if 0 < cause_idx <= len(string_table) else "Unknown"
crew_status = string_table[crew_status_idx - 1].strip() if 0 < crew_status_idx <= len(string_table) else "Unknown"
results.append({
'incidentid': f"{self.name}-{lat:.5f}-{lon:.5f}",
'utility': self.name,
'lat': lat, 'lon': lon,
'pointgeom': f"{lat:.5f},{lon:.5f}",
'areageom': None,
'start': start_time,
'etr': etr_time,
'outagen': outagen,
'cause': cause,
'crew_status': crew_status,
'active': True,
'last_change': datetime.now(timezone.utc)
})
except (IndexError, TypeError):
pass # Move to the next potential object
i += 1
return results
except Exception as e:
logger.error(f"Could not parse point outages for {self.name}: {e}")
return []
def _is_valid(self, lat, lon):
if not self.state_filter: return True
b = self.STATE_BOUNDS.get(self.state_filter)
if not b: return True
return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max']
class NiscHostedProvider(BaseProvider):
"""
Handles NISC Cloud Coop format (JSON with PROJ coordinate strings).
Example: Buckeye REC
"""
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
proj_str = config.get('proj_string')
if proj_str:
try:
# Create transformer from the custom PROJ string to WGS84
# always_xy=True ensures we assume (Lon, Lat) output order
self.transformer = Transformer.from_proj(proj_str, "EPSG:4326", always_xy=True)
except Exception as e:
logger.error(f"Failed to initialize projection for {self.name}: {e}")
def fetch(self, is_retry=False):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok:
logger.error(f"{self.name} HTTP {resp.status_code}")
return []
data = resp.json()
outages_list = data.get('outages', [])
results = []
for item in outages_list:
results.append(self._normalize(item))
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
# Coordinates
x, y = item.get('x'), item.get('y')
lat, lon = None, None
if x is not None and y is not None and self.transformer:
try:
# Transform custom projection X/Y to Lon/Lat
lon, lat = self.transformer.transform(x, y)
except: pass
# Timestamps (Epoch Milliseconds)
start_ts = None
time_off = item.get('timeOff')
if time_off:
try:
# Convert ms to seconds
start_ts = datetime.fromtimestamp(time_off / 1000, tz=timezone.utc)
except: pass
return {
'incidentid': str(item.get('id')),
'utility': self.name,
'lat': lat,
'lon': lon,
'pointgeom': f"{lat},{lon}" if lat else None,
'areageom': None,
'start': start_ts,
'etr': None,
'outagen': item.get('nbrOut', 1),
'cause': "Unknown",
'crew_status': "Unknown",
'active': True,
'last_change': datetime.now(timezone.utc)
}
# --- REGISTRY ---
PROVIDER_REGISTRY = {
@@ -597,9 +131,10 @@ PROVIDER_REGISTRY = {
'simple_json': SimpleJsonProvider,
'gwt_rpc': GwtRpcProvider,
'nisc_hosted': NiscHostedProvider,
}
# --- MAIN ---
# --- MAIN (Point Scraper) ---
def main():

80
nisc.py Normal file
View File

@@ -0,0 +1,80 @@
import logging
from datetime import datetime, timezone
from pyproj import Transformer
from base import BaseProvider, BaseCountyProvider
logger = logging.getLogger(__name__)
class NiscCountyProvider(BaseCountyProvider):
""" Handles county data from NISC-hosted cloud sources. """
def fetch(self):
url = self.config.get('county_url')
state = self.config.get('state_filter')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
# The structure is typically a list containing one object with a 'boundaries' key
for boundary_group in data:
for item in boundary_group.get('boundaries', []):
results.append({
'outages': item.get('customersOutNow'),
'served': item.get('customersServed'),
'county': item.get('name'),
'state': state,
'company': self.name
})
return results
except Exception as e:
logger.error(f"Error fetching NISC county data for {self.name}: {e}")
return []
class NiscHostedProvider(BaseProvider):
"""
Handles NISC Cloud Coop format (JSON with PROJ coordinate strings).
Example: Buckeye REC
"""
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
proj_str = config.get('proj_string')
if proj_str:
try:
self.transformer = Transformer.from_proj(proj_str, "EPSG:4326", always_xy=True)
except Exception as e:
logger.error(f"Failed to initialize projection for {self.name}: {e}")
def fetch(self):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok:
logger.error(f"{self.name} HTTP {resp.status_code}")
return []
data = resp.json()
return [self._normalize(item) for item in data.get('outages', [])]
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
x, y = item.get('x'), item.get('y')
lat, lon = None, None
if x is not None and y is not None and self.transformer:
try:
lon, lat = self.transformer.transform(x, y)
except: pass
time_off = item.get('timeOff')
start_ts = datetime.fromtimestamp(time_off / 1000, tz=timezone.utc) if time_off else None
return {
'incidentid': str(item.get('id')), 'utility': self.name,
'lat': lat, 'lon': lon, 'pointgeom': f"{lat},{lon}" if lat else None,
'start': start_ts, 'outagen': item.get('nbrOut', 1),
'cause': "Unknown", 'crew_status': "Unknown", 'active': True,
'last_change': datetime.now(timezone.utc)
}

55
simple.py Normal file
View File

@@ -0,0 +1,55 @@
import logging
from datetime import datetime
from base import BaseProvider, BaseCountyProvider
logger = logging.getLogger(__name__)
class SimpleCountyJsonProvider(BaseCountyProvider):
def fetch(self):
url = self.config.get('county_url')
state = self.config.get('state_filter')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
for boundary_group in data:
for item in boundary_group.get('boundaries', []):
results.append({
'outages': item.get('customersOutNow'),
'served': item.get('customersServed'),
'county': item.get('name'),
'state': state,
'company': self.name
})
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
class SimpleJsonProvider(BaseProvider):
def fetch(self):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
return [self._normalize(item) for item in data]
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
def safe_parse(ts):
if not ts: return None
try: return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except: return None
return {
'incidentid': str(item.get('outageRecID')), 'utility': self.name,
'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'),
'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}",
'areageom': None, 'start': safe_parse(item.get('outageStartTime')),
'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'),
'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'),
'last_change': safe_parse(item.get('outageModifiedTime'))
}