Files
test/newpower2.py
2025-12-07 03:58:57 +00:00

494 lines
20 KiB
Python

import requests
import polyline
import json
import psycopg2
import mercantile
import logging
import os
import re
from datetime import datetime, timezone, timedelta
from abc import ABC, abstractmethod
from urllib.parse import urlparse
from pyproj import Transformer
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module for auto-repair
import get_rpc_config_auto
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('power2_new.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIG ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
CONFIG_FILE = 'providers.json'
AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours
# --- CONFIG MANAGEMENT ---
def load_providers():
if not os.path.exists(CONFIG_FILE):
logger.error(f"{CONFIG_FILE} not found!")
return []
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
def save_providers(providers):
with open(CONFIG_FILE, 'w') as f:
json.dump(providers, f, indent=4)
logger.info("Configuration saved to providers.json")
def update_provider_config(provider_name, new_settings):
"""Updates a specific provider in the JSON file safely"""
providers = load_providers()
updated = False
for p in providers:
if p.get('name') == provider_name:
# Update all relevant fields
for key in ['headers', 'body', 'url', 'cookies', 'user_agent']:
if key in new_settings:
p[key] = new_settings[key]
p['last_auto_update'] = datetime.now(timezone.utc).isoformat()
updated = True
break
if updated:
save_providers(providers)
return True
return False
# --- DATABASE ---
class PowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def upsert_outage(self, data):
sql = """
INSERT INTO newpower
(incidentid, utility, lat, lon, pointgeom, areageom, start_time, etr,
outagen, peakoutage, cause, crew_status, active, last_change, fetch_time, geom)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
ON CONFLICT (pointgeom) DO UPDATE SET
outagen = EXCLUDED.outagen,
peakoutage = GREATEST(newpower.peakoutage, EXCLUDED.outagen),
cause = EXCLUDED.cause,
etr = EXCLUDED.etr,
crew_status = EXCLUDED.crew_status,
last_change = EXCLUDED.last_change,
fetch_time = EXCLUDED.fetch_time,
active = TRUE
"""
peak = data.get('outagen', 0)
params = (
data.get('incidentid'), data.get('utility'), data.get('lat'), data.get('lon'),
data.get('pointgeom'), data.get('areageom'), data.get('start'), data.get('etr'),
data.get('outagen'), peak, data.get('cause'), data.get('crew_status'),
True, data.get('last_change', datetime.now(timezone.utc)), datetime.now(timezone.utc),
data.get('lon'), data.get('lat')
)
with self.conn.cursor() as cursor:
cursor.execute(sql, params)
def run_post_processing(self):
logger.info("Running post-processing...")
with self.conn.cursor() as cursor:
cursor.execute('UPDATE newpower SET county = c.countyname FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.county IS NULL')
cursor.execute('UPDATE newpower SET state = c.state FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.state IS NULL')
cursor.execute('UPDATE newpower SET cwa = f.cwa FROM public.fzone f WHERE ST_Contains(f.geom, newpower.geom) AND newpower.cwa IS NULL')
cursor.execute('UPDATE newpower SET realareageom = ST_LineFromEncodedPolyline(areageom) WHERE areageom IS NOT NULL AND realareageom IS NULL')
cursor.execute("UPDATE newpower SET active = TRUE WHERE fetch_time > NOW() - INTERVAL '30 minutes'")
cursor.execute("UPDATE newpower SET active = FALSE WHERE fetch_time < NOW() - INTERVAL '30 minutes'")
cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("Post-processing complete.")
# --- PROVIDERS ---
class BaseProvider(ABC):
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass
class SimpleJsonProvider(BaseProvider):
def fetch(self):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
for item in data:
results.append(self._normalize(item))
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
def safe_parse(ts):
if not ts: return None
try: return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except: return None
return {
'incidentid': str(item.get('outageRecID')), 'utility': self.name,
'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'),
'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}",
'areageom': None, 'start': safe_parse(item.get('outageStartTime')),
'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'),
'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'),
'last_change': safe_parse(item.get('outageModifiedTime'))
}
class KubraProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.max_zoom = 14
self.results = []
self.base_url_template = 'https://kubra.io/cluster-data/'
def fetch(self):
meta_url = self.config.get('meta_url')
if not meta_url: return []
hex1, hex2 = self._get_hexes(meta_url)
if not hex1: return []
self.base_url = f"{self.base_url_template}{hex1}/{hex2}/"
self.layer = self.config.get('layer')
quadkeys = self.config.get('quadkeys', [])
self.results = []
self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0]))
return self.results
def _get_hexes(self, url):
try:
resp = self.session.get(url)
path = resp.json().get('data', {}).get('cluster_interval_generation_data')
parts = path.split('/')
return parts[2], parts[3]
except: return None, None
def _fetch_recursive(self, quadkeys, seen, zoom):
for q in quadkeys:
suffix = q[-3:][::-1]
url = f"{self.base_url}{suffix}/public/{self.layer}/{q}.json"
if url in seen: continue
seen.add(url)
try:
resp = self.session.get(url)
if not resp.ok: continue
for item in resp.json().get('file_data', []):
if item.get('desc', {}).get('cluster', False):
if zoom + 1 <= self.max_zoom:
p_geom = item.get('geom', {}).get('p', [])
if p_geom:
next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1)
self._fetch_recursive([next_key], seen, zoom + 1)
else: self.results.append(self._normalize(item))
else:
self.results.append(self._normalize(item))
self._fetch_recursive(self._get_neighbors(q), seen, zoom)
except: pass
def _normalize(self, item):
desc = item.get('desc', {})
geom = item.get('geom', {})
p = geom.get('p', [None])[0]
if not p: return {}
latlon = polyline.decode(p)[0]
def ts(s):
if not s or s=='ETR-NULL': return None
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
except: return None
return {
'incidentid': desc.get('inc_id'), 'utility': self.name,
'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'),
'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')),
'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': desc.get('cause', {}).get('EN-US', "Pending"),
'crew_status': desc.get('crew_status', {}).get('EN-US'), 'active': True
}
def _get_quadkey_for_point(self, p, z):
ll = polyline.decode(p)[0]
return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z))
def _get_neighbors(self, q):
t = mercantile.quadkey_to_tile(q)
return [mercantile.quadkey(n) for n in mercantile.neighbors(t)]
class GwtRpcProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
self.state_filter = config.get('state_filter')
self.map_url = config.get('map_url')
# 1. Base Headers
self.session.headers.update({
'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'),
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin'
})
parsed_url = urlparse(config.get('url'))
self.session.headers.update({'Origin': f"{parsed_url.scheme}://{parsed_url.netloc}"})
# 2. Load Cookies
if config.get('cookies'):
for cookie in config['cookies']:
try:
self.session.cookies.set(
cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']
)
except: pass
self.STATE_BOUNDS = {
'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7},
'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5},
'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9},
'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1}
}
if config.get('epsg'):
try:
self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True)
except: logger.error(f"EPSG Error for {self.name}")
def attempt_auto_repair(self):
if not self.map_url: return False
# --- Cooldown Check ---
last_update = self.config.get('last_auto_update')
if last_update:
try:
last_dt = datetime.fromisoformat(last_update)
if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS):
logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).")
return False
except ValueError: pass
logger.info(f"Attempting Auto-Repair for {self.name}...")
try:
# Expecting 4 values: data, headers, cookies, body
new_data, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url)
if valid_headers and valid_body:
logger.info(f"Repair successful! Updating {self.name}.")
# Clean Headers (Blacklist approach)
excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'}
clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded}
# Ensure Referer is set correctly for next time
clean_headers['Referer'] = self.map_url
# Update In-Memory Config
current_time = datetime.now(timezone.utc).isoformat()
self.config['headers'] = clean_headers
self.config['body'] = valid_body
self.config['cookies'] = valid_cookies
self.config['user_agent'] = valid_headers.get('user-agent')
self.config['last_auto_update'] = current_time
# Update Session
self.session.cookies.clear()
for cookie in valid_cookies:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'])
if valid_headers.get('user_agent'):
self.session.headers.update({'User-Agent': valid_headers.get('user-agent')})
# Save to Disk
new_settings = {
'headers': clean_headers,
'body': valid_body,
'cookies': valid_cookies,
'user_agent': valid_headers.get('user-agent')
}
update_provider_config(self.name, new_settings)
return True
except Exception as e:
logger.error(f"Auto-repair failed: {e}")
return False
def fetch(self, is_retry=False):
url = self.config.get('url')
headers = self.config.get('headers', {})
body = self.config.get('body')
if not url: return []
try:
# 3. Dynamic Origin Update
parsed_url = urlparse(url)
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Priority: Configured Referer > Module Base > Origin
correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin
ua = headers.get('User-Agent', self.session.headers['User-Agent'])
if "Headless" in ua: # Fallback safety
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
self.session.headers.update({
'Origin': origin,
'Referer': correct_referer,
'User-Agent': ua
})
# Prime if missing cookies or map_url is defined
if self.map_url and not self.config.get('cookies'):
try: self.session.get(correct_referer, verify=False, timeout=10)
except: pass
req_headers = headers.copy()
req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8'
req_headers['Referer'] = correct_referer
req_headers['User-Agent'] = ua
# Only fetch if we have a body
if body:
resp = self.session.post(url, headers=req_headers, data=body, verify=False)
else:
resp = type('obj', (object,), {'status_code': 500, 'text': 'No Body', 'ok': False})()
# 5. Error Handling & Retry
failed = False
if "//EX" in resp.text: failed = True
if resp.status_code == 500: failed = True
if failed:
logger.error(f"GWT Failure for {self.name} (Status: {resp.status_code}).")
if is_retry:
logger.error(f"Retry failed for {self.name}. Aborting.")
return []
if self.attempt_auto_repair():
logger.info("Retrying fetch with new settings...")
return self.fetch(is_retry=True)
else:
return []
if not resp.ok: return []
text = resp.text
if text.startswith('//OK'): text = text[4:]
return self._extract_outages(json.loads(text))
except Exception as e:
logger.error(f"Fetch error {self.name}: {e}")
return []
def _extract_outages(self, data_list):
results = []
if not self.transformer: return []
processed = set()
stride = 2
for i in range(len(data_list) - stride):
val1 = data_list[i]
val2 = data_list[i+stride]
if (isinstance(val1, (int, float)) and isinstance(val2, (int, float)) and
abs(val1) > 100000 and abs(val2) > 100000):
lat, lon = None, None
try:
res_lon, res_lat = self.transformer.transform(val2, val1)
if self._is_valid(res_lat, res_lon): lat, lon = res_lat, res_lon
except: pass
if not lat:
try:
res_lon, res_lat = self.transformer.transform(val1, val2)
if self._is_valid(res_lat, res_lon): lat, lon = res_lat, res_lon
except: pass
if lat and lon:
k = f"{lat:.4f},{lon:.4f}"
if k in processed: continue
processed.add(k)
oid = str(abs(hash(k)))
for o in range(1, 15):
idx = i - o
if idx >= 0 and isinstance(data_list[idx], str):
s = data_list[idx]
if len(s) < 20 and "java" not in s and "http" not in s: oid = s; break
results.append({
'incidentid': oid, 'utility': self.name,
'lat': lat, 'lon': lon, 'pointgeom': k, 'areageom': None,
'start': datetime.now(timezone.utc), 'etr': None, 'outagen': 1,
'cause': "Unknown", 'crew_status': "Unknown", 'active': True,
'last_change': datetime.now(timezone.utc)
})
return results
def _is_valid(self, lat, lon):
if not self.state_filter: return True
b = self.STATE_BOUNDS.get(self.state_filter)
if not b: return True
return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max']
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra': KubraProvider,
'simple_json': SimpleJsonProvider,
'gwt_rpc': GwtRpcProvider
}
# --- MAIN ---
def main():
S = requests.Session()
S.verify = False
db = PowerDB(DB_CONFIG)
logger.info("Starting Power Scraper...")
providers = load_providers()
for config in providers:
p_type = config.get('type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching {p_name}...")
outages = provider.fetch()
count = 0
for outage in outages:
if outage:
db.upsert_outage(outage)
count += 1
logger.info(f"Saved {count} records for {p_name}")
except Exception as e:
logger.error(f"Error processing {p_name}: {e}")
else:
logger.warning(f"Unknown provider type {p_type} for {p_name}")
db.run_post_processing()
db.close()
logger.info("Scraping complete.")
if __name__ == "__main__":
main()