Files
test/newpower2.py
2025-12-07 02:07:12 +00:00

340 lines
13 KiB
Python

import requests
import polyline
import json
import psycopg2
import mercantile
import logging
from datetime import datetime, timezone
from abc import ABC, abstractmethod
# --- LOGGING SETUP ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('power2_new.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIGURATION ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
KUBRA_BASE_TEMPLATE = 'https://kubra.io/cluster-data/'
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- PROVIDER CONFIGURATION ---
# To add a new site, just add a dictionary here.
PROVIDERS = [
{
'name': 'AEP-WV',
'type': 'kubra',
'meta_url': "https://kubra.io/stormcenter/api/v1/stormcenters/6674f49e-0236-4ed8-a40a-b31747557ab7/views/8cfe790f-59f3-4ce3-a73f-a9642227411f/currentState?preview=false",
'layer': 'cluster-2',
'quadkeys': ['0320001','0320003','0320010','0320011','0320012','0320013','0320021','0320030','0320031','0320100','0320102','0320120']
},
{
'name': 'AEP-OH',
'type': 'kubra',
'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/9c0735d8-b721-4dce-b80b-558e98ce1083/views/9b2feb80-69f8-4035-925e-f2acbcf1728e/currentState?preview=false',
'layer': 'cluster-1',
'quadkeys': ['0320013','0320010','0320011','0320012','0320003','0320001','0302322','0302233','0302232','0302223','0320102','0320100']
},
{
'name': 'AEP-KY',
'type': 'kubra',
'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/23dcd38e-2573-4e20-a463-959b11cae011/views/60f31606-5702-4a1e-a74c-08d866b7a6fa/currentState?preview=false',
'layer': 'cluster-2',
'quadkeys': ['0320031','0320030','0320021','0320013','0320012','0320011','0320010','0320003','0320001']
},
{
'name': 'FirstEnergy',
'type': 'kubra',
'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/6c715f0e-bbec-465f-98cc-0b81623744be/views/5ed3ddf1-3a6f-4cfd-8957-eba54b5baaad/currentState?preview=false',
'layer': 'cluster-4',
'quadkeys': ['030223','030232','032001','032003','032010','032012']
},
{
'name': 'SouthCentral',
'type': 'simple_json',
'url': 'https://outage.southcentralpower.com/data/outages.json'
},
{
'name': 'Grayson',
'type': 'simple_json',
'url': 'https://outages.graysonrecc.com/data/outages.json'
}
]
# --- DATABASE HANDLER ---
class PowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def upsert_outage(self, data):
sql = """
INSERT INTO newpower
(incidentid, utility, lat, lon, pointgeom, areageom, start_time, etr,
outagen, peakoutage, cause, crew_status, active, last_change, fetch_time, geom)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
ON CONFLICT (pointgeom) DO UPDATE SET
outagen = EXCLUDED.outagen,
peakoutage = GREATEST(newpower.peakoutage, EXCLUDED.outagen),
cause = EXCLUDED.cause,
etr = EXCLUDED.etr,
crew_status = EXCLUDED.crew_status,
last_change = EXCLUDED.last_change,
fetch_time = EXCLUDED.fetch_time,
active = TRUE
"""
peak = data.get('outagen', 0)
params = (
data.get('incidentid'), data.get('utility'), data.get('lat'), data.get('lon'),
data.get('pointgeom'), data.get('areageom'), data.get('start'), data.get('etr'),
data.get('outagen'), peak, data.get('cause'), data.get('crew_status'),
True, data.get('last_change', datetime.now(timezone.utc)), datetime.now(timezone.utc),
data.get('lon'), data.get('lat')
)
with self.conn.cursor() as cursor:
cursor.execute(sql, params)
def run_post_processing(self):
logger.info("Running post-processing...")
with self.conn.cursor() as cursor:
cursor.execute('UPDATE newpower SET county = c.countyname FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.county IS NULL')
cursor.execute('UPDATE newpower SET state = c.state FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.state IS NULL')
cursor.execute('UPDATE newpower SET cwa = f.cwa FROM public.fzone f WHERE ST_Contains(f.geom, newpower.geom) AND newpower.cwa IS NULL')
cursor.execute('UPDATE newpower SET realareageom = ST_LineFromEncodedPolyline(areageom) WHERE areageom IS NOT NULL AND realareageom IS NULL')
cursor.execute("UPDATE newpower SET active = TRUE WHERE fetch_time > NOW() - INTERVAL '30 minutes'")
cursor.execute("UPDATE newpower SET active = FALSE WHERE fetch_time < NOW() - INTERVAL '30 minutes'")
cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("Post-processing complete.")
# --- PROVIDER ARCHITECTURE ---
class BaseProvider(ABC):
"""Abstract base class for all providers"""
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
"""Must return a list of standardized outage dictionaries"""
pass
class SimpleJsonProvider(BaseProvider):
"""Handles sites that return a flat JSON list of outages"""
def fetch(self):
url = self.config.get('url')
if not url:
logger.error(f"Missing URL for {self.name}")
return []
try:
resp = self.session.get(url, verify=False)
if not resp.ok:
logger.error(f"{self.name} returned {resp.status_code}")
return []
data = resp.json()
results = []
for item in data:
results.append(self._normalize(item))
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
# Helper to parse ISO strings safely
def safe_parse(ts):
if not ts: return None
try: return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except: return None
return {
'incidentid': str(item.get('outageRecID')),
'utility': self.name,
'lat': item.get('outagePoint', {}).get('lat'),
'lon': item.get('outagePoint', {}).get('lng'),
'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}",
'areageom': None,
'start': safe_parse(item.get('outageStartTime')),
'etr': safe_parse(item.get('outageEndTime')),
'outagen': item.get('customersOutNow'),
'cause': item.get('cause'),
'crew_status': item.get('outageWorkStatus'),
'last_change': safe_parse(item.get('outageModifiedTime'))
}
class KubraProvider(BaseProvider):
"""Handles Kubra StormCenter recursive quadkey fetching"""
def __init__(self, config, session):
super().__init__(config, session)
self.max_zoom = 14
self.results = []
def fetch(self):
# 1. Dynamic Setup: Get hex keys
meta_url = self.config.get('meta_url')
if not meta_url: return []
hex1, hex2 = self._get_hexes(meta_url)
if not hex1:
logger.error(f"{self.name}: Could not fetch hex keys")
return []
self.base_url = f"{KUBRA_BASE_TEMPLATE}{hex1}/{hex2}/"
self.layer = self.config.get('layer')
quadkeys = self.config.get('quadkeys', [])
# 2. Recursive Fetch
self.results = []
self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0]))
return self.results
def _get_hexes(self, url):
try:
resp = self.session.get(url)
path = resp.json().get('data', {}).get('cluster_interval_generation_data')
parts = path.split('/')
return parts[2], parts[3]
except Exception as e:
logger.error(f"Hex fetch error {self.name}: {e}")
return None, None
def _fetch_recursive(self, quadkeys, seen, zoom):
for q in quadkeys:
suffix = q[-3:][::-1]
url = f"{self.base_url}{suffix}/public/{self.layer}/{q}.json"
if url in seen: continue
seen.add(url)
try:
resp = self.session.get(url)
if not resp.ok or not 'application/json' in resp.headers.get('Content-Type', ''):
continue
for item in resp.json().get('file_data', []):
desc = item.get('desc', {})
if desc.get('cluster', False):
if zoom + 1 > self.max_zoom:
self.results.append(self._normalize(item))
else:
p_geom = item.get('geom', {}).get('p', [])
if p_geom:
next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1)
self._fetch_recursive([next_key], seen, zoom + 1)
else:
self.results.append(self._normalize(item))
neighbors = self._get_neighboring_quadkeys(q)
self._fetch_recursive(neighbors, seen, zoom)
except Exception as e:
logger.error(f"Error reading quadkey {q}: {e}")
def _normalize(self, item):
desc = item.get('desc', {})
geom = item.get('geom', {})
point_poly = geom.get('p', [None])[0]
if not point_poly: return {}
latlon = polyline.decode(point_poly)[0]
def parse_ts(ts_str):
if not ts_str or ts_str == 'ETR-NULL': return None
try: return datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
except:
try: return datetime.strptime(ts_str, "%Y-%m-%dT%H:%M%z")
except: return None
cause = desc.get('cause', {})
if isinstance(cause, dict): cause = cause.get('EN-US')
return {
'incidentid': desc.get('inc_id'),
'utility': self.name,
'lat': latlon[0],
'lon': latlon[1],
'pointgeom': point_poly,
'areageom': geom.get('a'),
'start': parse_ts(desc.get('start_time')),
'etr': parse_ts(desc.get('etr')),
'outagen': desc.get('cust_a', {}).get('val', 0),
'cause': cause or "Pending Investigation",
'crew_status': desc.get('crew_status', {}).get('EN-US'),
'active': True
}
def _get_quadkey_for_point(self, polyline_str, zoom):
latlon = polyline.decode(polyline_str)[0]
return mercantile.quadkey(mercantile.tile(lng=latlon[1], lat=latlon[0], zoom=zoom))
def _get_neighboring_quadkeys(self, quadkey):
tile = mercantile.quadkey_to_tile(quadkey)
neighbors = [
mercantile.Tile(x=tile.x, y=tile.y - 1, z=tile.z),
mercantile.Tile(x=tile.x + 1, y=tile.y, z=tile.z),
mercantile.Tile(x=tile.x, y=tile.y + 1, z=tile.z),
mercantile.Tile(x=tile.x - 1, y=tile.y, z=tile.z),
mercantile.Tile(x=tile.x + 1, y=tile.y - 1, z=tile.z),
mercantile.Tile(x=tile.x + 1, y=tile.y + 1, z=tile.z),
mercantile.Tile(x=tile.x - 1, y=tile.y - 1, z=tile.z),
mercantile.Tile(x=tile.x - 1, y=tile.y + 1, z=tile.z),
]
return [mercantile.quadkey(t) for t in neighbors if t.x >= 0 and t.y >= 0]
# --- REGISTRY ---
# Map string types to Classes
PROVIDER_REGISTRY = {
'kubra': KubraProvider,
'simple_json': SimpleJsonProvider
}
# --- MAIN ---
def main():
S = requests.Session()
S.verify = False
db = PowerDB(DB_CONFIG)
logger.info("Starting Power Scraper...")
for config in PROVIDERS:
p_type = config.get('type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching {p_name}...")
outages = provider.fetch()
count = 0
for outage in outages:
if outage: # Ensure valid data
db.upsert_outage(outage)
count += 1
logger.info(f"Saved {count} records for {p_name}")
except Exception as e:
logger.error(f"Critical error running {p_name}: {e}")
else:
logger.warning(f"Unknown provider type '{p_type}' for {p_name}")
db.run_post_processing()
db.close()
logger.info("Scraping complete.")
if __name__ == "__main__":
main()