Files
test/newpower2.py

494 lines
20 KiB
Python

import requests
import polyline
import json
import psycopg2
import mercantile
import logging
import os
from datetime import datetime, timezone, timedelta
from abc import ABC, abstractmethod
from urllib.parse import urlparse
from pyproj import Transformer
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# Import the helper module
import get_rpc_config_auto
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# --- LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('power2_new.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- CONFIG ---
DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'}
CONFIG_FILE = 'providers.json'
AUTO_UPDATE_COOLDOWN_HOURS = 4 # Only try to repair once every 4 hours
# --- CONFIG MANAGEMENT ---
def load_providers():
if not os.path.exists(CONFIG_FILE):
logger.error(f"{CONFIG_FILE} not found!")
return []
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
def save_providers(providers):
with open(CONFIG_FILE, 'w') as f:
json.dump(providers, f, indent=4)
logger.info("Configuration saved to providers.json")
def update_provider_config(provider_name, new_settings):
providers = load_providers()
updated = False
for p in providers:
if p.get('name') == provider_name:
if 'headers' in new_settings: p['headers'] = new_settings['headers']
if 'body' in new_settings: p['body'] = new_settings['body']
if 'url' in new_settings: p['url'] = new_settings['url']
if 'cookies' in new_settings: p['cookies'] = new_settings['cookies']
# <--- NEW: Save User-Agent
if 'user_agent' in new_settings: p['user_agent'] = new_settings['user_agent']
p['last_auto_update'] = datetime.now(timezone.utc).isoformat()
updated = True
break
if updated:
save_providers(providers)
return True
return False
# --- DATABASE ---
class PowerDB:
def __init__(self, config):
self.conn = psycopg2.connect(**config)
self.conn.autocommit = True
def close(self):
self.conn.close()
def upsert_outage(self, data):
sql = """
INSERT INTO newpower
(incidentid, utility, lat, lon, pointgeom, areageom, start_time, etr,
outagen, peakoutage, cause, crew_status, active, last_change, fetch_time, geom)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
ON CONFLICT (pointgeom) DO UPDATE SET
outagen = EXCLUDED.outagen,
peakoutage = GREATEST(newpower.peakoutage, EXCLUDED.outagen),
cause = EXCLUDED.cause,
etr = EXCLUDED.etr,
crew_status = EXCLUDED.crew_status,
last_change = EXCLUDED.last_change,
fetch_time = EXCLUDED.fetch_time,
active = TRUE
"""
peak = data.get('outagen', 0)
params = (
data.get('incidentid'), data.get('utility'), data.get('lat'), data.get('lon'),
data.get('pointgeom'), data.get('areageom'), data.get('start'), data.get('etr'),
data.get('outagen'), peak, data.get('cause'), data.get('crew_status'),
True, data.get('last_change', datetime.now(timezone.utc)), datetime.now(timezone.utc),
data.get('lon'), data.get('lat')
)
with self.conn.cursor() as cursor:
cursor.execute(sql, params)
def run_post_processing(self):
logger.info("Running post-processing...")
with self.conn.cursor() as cursor:
cursor.execute('UPDATE newpower SET county = c.countyname FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.county IS NULL')
cursor.execute('UPDATE newpower SET state = c.state FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.state IS NULL')
cursor.execute('UPDATE newpower SET cwa = f.cwa FROM public.fzone f WHERE ST_Contains(f.geom, newpower.geom) AND newpower.cwa IS NULL')
cursor.execute('UPDATE newpower SET realareageom = ST_LineFromEncodedPolyline(areageom) WHERE areageom IS NOT NULL AND realareageom IS NULL')
cursor.execute("UPDATE newpower SET active = TRUE WHERE fetch_time > NOW() - INTERVAL '30 minutes'")
cursor.execute("UPDATE newpower SET active = FALSE WHERE fetch_time < NOW() - INTERVAL '30 minutes'")
cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'")
logger.info("Post-processing complete.")
# --- PROVIDERS ---
class BaseProvider(ABC):
def __init__(self, config, session):
self.config = config
self.session = session
self.name = config.get('name', 'Unknown')
@abstractmethod
def fetch(self):
pass
class SimpleJsonProvider(BaseProvider):
def fetch(self):
url = self.config.get('url')
try:
resp = self.session.get(url, verify=False)
if not resp.ok: return []
data = resp.json()
results = []
for item in data:
results.append(self._normalize(item))
return results
except Exception as e:
logger.error(f"Error fetching {self.name}: {e}")
return []
def _normalize(self, item):
def safe_parse(ts):
if not ts: return None
try: return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except: return None
return {
'incidentid': str(item.get('outageRecID')), 'utility': self.name,
'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'),
'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}",
'areageom': None, 'start': safe_parse(item.get('outageStartTime')),
'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'),
'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'),
'last_change': safe_parse(item.get('outageModifiedTime'))
}
class KubraProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.max_zoom = 14
self.results = []
self.base_url_template = 'https://kubra.io/cluster-data/'
def fetch(self):
meta_url = self.config.get('meta_url')
if not meta_url: return []
hex1, hex2 = self._get_hexes(meta_url)
if not hex1: return []
self.base_url = f"{self.base_url_template}{hex1}/{hex2}/"
self.layer = self.config.get('layer')
quadkeys = self.config.get('quadkeys', [])
self.results = []
self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0]))
return self.results
def _get_hexes(self, url):
try:
resp = self.session.get(url)
path = resp.json().get('data', {}).get('cluster_interval_generation_data')
parts = path.split('/')
return parts[2], parts[3]
except: return None, None
def _fetch_recursive(self, quadkeys, seen, zoom):
for q in quadkeys:
suffix = q[-3:][::-1]
url = f"{self.base_url}{suffix}/public/{self.layer}/{q}.json"
if url in seen: continue
seen.add(url)
try:
resp = self.session.get(url)
if not resp.ok: continue
for item in resp.json().get('file_data', []):
if item.get('desc', {}).get('cluster', False):
if zoom + 1 <= self.max_zoom:
p_geom = item.get('geom', {}).get('p', [])
if p_geom:
next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1)
self._fetch_recursive([next_key], seen, zoom + 1)
else: self.results.append(self._normalize(item))
else:
self.results.append(self._normalize(item))
self._fetch_recursive(self._get_neighbors(q), seen, zoom)
except: pass
def _normalize(self, item):
desc = item.get('desc', {})
geom = item.get('geom', {})
p = geom.get('p', [None])[0]
if not p: return {}
latlon = polyline.decode(p)[0]
def ts(s):
if not s or s=='ETR-NULL': return None
try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
except: return None
return {
'incidentid': desc.get('inc_id'), 'utility': self.name,
'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'),
'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')),
'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': desc.get('cause', {}).get('EN-US', "Pending"),
'crew_status': desc.get('crew_status', {}).get('EN-US'), 'active': True
}
def _get_quadkey_for_point(self, p, z):
ll = polyline.decode(p)[0]
return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z))
def _get_neighbors(self, q):
t = mercantile.quadkey_to_tile(q)
return [mercantile.quadkey(n) for n in mercantile.neighbors(t)]
class GwtRpcProvider(BaseProvider):
def __init__(self, config, session):
super().__init__(config, session)
self.transformer = None
self.state_filter = config.get('state_filter')
self.map_url = config.get('map_url')
# 1. Set User-Agent (Dynamic > Default)
# We try to use the one from config if available (captured from actual browser)
ua = config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
self.session.headers.update({
'User-Agent': ua,
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin'
})
parsed_url = urlparse(config.get('url'))
self.session.headers.update({'Origin': f"{parsed_url.scheme}://{parsed_url.netloc}"})
# Load Cookies
if config.get('cookies'):
for cookie in config['cookies']:
try:
self.session.cookies.set(
cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']
)
except: pass
self.STATE_BOUNDS = {
'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7},
'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5},
'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9},
'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1}
}
if config.get('epsg'):
try:
self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True)
except: logger.error(f"EPSG Error for {self.name}")
def attempt_auto_repair(self):
if not self.map_url: return False
# ... (Cooldown check - keep as is) ...
logger.info(f"Attempting Auto-Repair for {self.name}...")
try:
# We expect 4 return values now
new_settings = get_rpc_config_auto.get_fresh_config(self.map_url)
if new_settings:
logger.info(f"Repair successful! Updating {self.name}.")
# Update In-Memory
self.config.update(new_settings)
self.config['last_auto_update'] = datetime.now(timezone.utc).isoformat()
# Update Session Cookies
self.session.cookies.clear()
if new_settings.get('cookies'):
for c in new_settings['cookies']:
self.session.cookies.set(c['name'], c['value'], domain=c['domain'], path=c['path'])
# Update Session UA
if new_settings.get('user_agent'):
self.session.headers.update({'User-Agent': new_settings['user_agent']})
# Persist to disk
update_provider_config(self.name, new_settings)
return True
except Exception as e:
logger.error(f"Auto-repair failed: {e}")
return False
def fetch(self, is_retry=False):
url = self.config.get('url')
headers = self.config.get('headers', {})
body = self.config.get('body')
if not url: return []
# --- STRATEGY A: Standard Requests (Fast) ---
try:
parsed_url = urlparse(url)
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Priority: Configured Referer > Module Base > Origin
correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin
ua = headers.get('User-Agent', self.session.headers['User-Agent'])
if "Headless" in ua:
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
self.session.headers.update({
'Origin': origin,
'Referer': correct_referer,
'User-Agent': ua
})
if self.map_url and not self.config.get('cookies'):
try: self.session.get(correct_referer, verify=False, timeout=10)
except: pass
req_headers = headers.copy()
if 'Content-Type' not in req_headers: req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8'
req_headers['Referer'] = correct_referer
req_headers['User-Agent'] = ua
# Debug log (Optional - disable if too noisy)
# logger.info(f"Sending Headers: {json.dumps(req_headers, indent=2)}")
resp = self.session.post(url, headers=req_headers, data=body, verify=False)
# --- STRATEGY B: Browser Fallback & Self-Heal ---
if resp.status_code == 500 or "//EX" in resp.text:
logger.warning(f"Standard fetch failed for {self.name} (Status: {resp.status_code}). Switching to Browser Fetch.")
if self.map_url:
# 1. Fetch data AND credentials via Browser
data, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url)
if data:
logger.info(f"Browser success! Self-healing {self.name} configuration...")
# --- HEADER CLEANING FIX ---
# Instead of selecting specific headers, we exclude known transport headers.
# This preserves custom headers like 'coop.nisc.outagewebmap.configname'
excluded = {
'content-length', 'host', 'connection', 'cookie', 'accept-encoding',
'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'
}
clean_headers = {}
for k, v in valid_headers.items():
if k.lower() not in excluded:
clean_headers[k] = v
# Ensure we force the correct Referer for next time
clean_headers['Referer'] = self.map_url
# 3. Save to JSON so next run is FAST
new_settings = {
'headers': clean_headers,
'cookies': valid_cookies,
'body': valid_body,
'user_agent': valid_headers.get('user-agent')
}
update_provider_config(self.name, new_settings)
return self._extract_outages(data)
logger.error(f"Browser Fetch failed for {self.name}.")
return []
if not resp.ok: return []
text = resp.text
if text.startswith('//OK'): text = text[4:]
return self._extract_outages(json.loads(text))
except Exception as e:
logger.error(f"Fetch error {self.name}: {e}")
return []
def _extract_outages(self, data_list):
results = []
if not self.transformer: return []
processed = set()
stride = 2
for i in range(len(data_list) - stride):
val1 = data_list[i]
val2 = data_list[i+stride]
if (isinstance(val1, (int, float)) and isinstance(val2, (int, float)) and
abs(val1) > 100000 and abs(val2) > 100000):
lat, lon = None, None
try:
res_lon, res_lat = self.transformer.transform(val2, val1)
if self._is_valid(res_lat, res_lon): lat, lon = res_lat, res_lon
except: pass
if not lat:
try:
res_lon, res_lat = self.transformer.transform(val1, val2)
if self._is_valid(res_lat, res_lon): lat, lon = res_lat, res_lon
except: pass
if lat and lon:
k = f"{lat:.4f},{lon:.4f}"
if k in processed: continue
processed.add(k)
oid = str(abs(hash(k)))
for o in range(1, 15):
idx = i - o
if idx >= 0 and isinstance(data_list[idx], str):
s = data_list[idx]
if len(s) < 20 and "java" not in s and "http" not in s: oid = s; break
results.append({
'incidentid': oid, 'utility': self.name,
'lat': lat, 'lon': lon, 'pointgeom': k, 'areageom': None,
'start': datetime.now(timezone.utc), 'etr': None, 'outagen': 1,
'cause': "Unknown", 'crew_status': "Unknown", 'active': True,
'last_change': datetime.now(timezone.utc)
})
return results
def _is_valid(self, lat, lon):
if not self.state_filter: return True
b = self.STATE_BOUNDS.get(self.state_filter)
if not b: return True
return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max']
# --- REGISTRY ---
PROVIDER_REGISTRY = {
'kubra': KubraProvider,
'simple_json': SimpleJsonProvider,
'gwt_rpc': GwtRpcProvider
}
# --- MAIN ---
def main():
S = requests.Session()
S.verify = False
db = PowerDB(DB_CONFIG)
logger.info("Starting Power Scraper...")
providers = load_providers()
for config in providers:
p_type = config.get('type')
p_name = config.get('name')
ProviderClass = PROVIDER_REGISTRY.get(p_type)
if ProviderClass:
try:
provider = ProviderClass(config, S)
logger.info(f"Fetching {p_name}...")
outages = provider.fetch()
count = 0
for outage in outages:
if outage:
db.upsert_outage(outage)
count += 1
logger.info(f"Saved {count} records for {p_name}")
except Exception as e:
logger.error(f"Error processing {p_name}: {e}")
else:
logger.warning(f"Unknown provider type {p_type} for {p_name}")
db.run_post_processing()
db.close()
logger.info("Scraping complete.")
if __name__ == "__main__":
main()