diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..1584277 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# This file makes the 'providers' directory a Python package. \ No newline at end of file diff --git a/__pycache__/base.cpython-310.pyc b/__pycache__/base.cpython-310.pyc new file mode 100644 index 0000000..f44103b Binary files /dev/null and b/__pycache__/base.cpython-310.pyc differ diff --git a/__pycache__/gwt_rpc.cpython-310.pyc b/__pycache__/gwt_rpc.cpython-310.pyc new file mode 100644 index 0000000..05859e1 Binary files /dev/null and b/__pycache__/gwt_rpc.cpython-310.pyc differ diff --git a/__pycache__/kubra.cpython-310.pyc b/__pycache__/kubra.cpython-310.pyc new file mode 100644 index 0000000..04e3cfe Binary files /dev/null and b/__pycache__/kubra.cpython-310.pyc differ diff --git a/__pycache__/nisc.cpython-310.pyc b/__pycache__/nisc.cpython-310.pyc new file mode 100644 index 0000000..45bf0b9 Binary files /dev/null and b/__pycache__/nisc.cpython-310.pyc differ diff --git a/__pycache__/simple.cpython-310.pyc b/__pycache__/simple.cpython-310.pyc new file mode 100644 index 0000000..c62992b Binary files /dev/null and b/__pycache__/simple.cpython-310.pyc differ diff --git a/base.py b/base.py new file mode 100644 index 0000000..a4b1d43 --- /dev/null +++ b/base.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod + +class BaseProvider(ABC): + """Abstract base class for point-based outage providers.""" + def __init__(self, config, session): + self.config = config + self.session = session + self.name = config.get('name', 'Unknown') + + @abstractmethod + def fetch(self): + pass + +class BaseCountyProvider(ABC): + """Abstract base class for county-based outage providers.""" + def __init__(self, config, session): + self.config = config + self.session = session + self.name = config.get('name', 'Unknown') + + @abstractmethod + def fetch(self): + pass \ No newline at end of file diff --git a/gwt_rpc.py b/gwt_rpc.py new file mode 100644 index 0000000..99b7958 --- /dev/null +++ b/gwt_rpc.py @@ -0,0 +1,255 @@ +import logging +import json +from datetime import datetime, timezone, timedelta +from urllib.parse import urlparse +from pyproj import Transformer + +import get_rpc_config_auto +from base import BaseProvider, BaseCountyProvider + +logger = logging.getLogger(__name__) + +class GwtRpcBaseProvider: + """Base class for GWT-RPC providers to share common logic like auto-repair.""" + def __init__(self, config, session): + self.config = config + self.session = session + self.name = config.get('name', 'Unknown') + self.map_url = config.get('map_url') + self.state_filter = config.get('state_filter') + self.AUTO_UPDATE_COOLDOWN_HOURS = 4 + + # Set up session headers and cookies from config + self.session.headers.update({ + 'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'), + 'Accept': '*/*', + 'Sec-Fetch-Site': 'same-origin' + }) + if config.get('cookies'): + for cookie in config['cookies']: + self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) + + def attempt_auto_repair(self): + if not self.map_url: return False + last_update = self.config.get('last_auto_update') + if last_update: + try: + last_dt = datetime.fromisoformat(last_update) + if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) + if datetime.now(timezone.utc) - last_dt < timedelta(hours=self.AUTO_UPDATE_COOLDOWN_HOURS): + logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).") + return False + except ValueError: pass + + logger.info(f"Attempting Auto-Repair for {self.name}...") + try: + # This function needs to be defined in the main script context to save config + from newpower import update_provider_config as update_county_config + except ImportError: + from newpower2 import update_provider_config as update_point_config + update_county_config = update_point_config # Fallback + + try: + _, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url) + if valid_headers and valid_body: + logger.info(f"Repair successful! Updating {self.name}.") + excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'} + clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded} + clean_headers['Referer'] = self.map_url + + new_settings = { + 'headers': clean_headers, 'body': valid_body, 'cookies': valid_cookies, + 'user_agent': valid_headers.get('user-agent'), + } + + # Update in-memory config for the current run + self.config.update(new_settings) + self.config['last_auto_update'] = datetime.now(timezone.utc).isoformat() + + # Update session for the current run + self.session.cookies.clear() + for cookie in valid_cookies: + self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) + + # Save to disk for next time + update_county_config(self.name, self.config) + return True + except Exception as e: + logger.error(f"Auto-repair failed: {e}") + return False + + def _fetch_rpc_data(self, is_retry=False): + url = self.config.get('url') + headers = self.config.get('headers', {}) + body = self.config.get('body') + if not url or not body: return None + + parsed_url = urlparse(url) + origin = f"{parsed_url.scheme}://{parsed_url.netloc}" + correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin + + req_headers = headers.copy() + req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8' + req_headers['Referer'] = correct_referer + + resp = self.session.post(url, headers=req_headers, data=body, verify=False) + + if "//EX" in resp.text or resp.status_code == 500: + logger.error(f"GWT Failure for {self.name}.") + if is_retry: return None + if self.attempt_auto_repair(): + logger.info("Retrying fetch with new settings...") + return self._fetch_rpc_data(is_retry=True) + return None + + if not resp.ok: return None + return json.loads(resp.text.replace('//OK', '')) + + +class GwtRpcCountyProvider(GwtRpcBaseProvider, BaseCountyProvider): + def fetch(self): + try: + data = self._fetch_rpc_data() + if data: + return self._extract_county_summary(data) + return [] + except Exception as e: + logger.error(f"County fetch error for {self.name}: {e}") + return [] + + def _extract_county_summary(self, data_list): + try: + string_table = next((item for item in data_list if isinstance(item, list)), None) + if not string_table: return [] + + stream_raw = [item for item in data_list if not isinstance(item, list)] + stream = [int(token) for token in stream_raw if isinstance(token, (int, float, str)) and str(token).replace('.','',1).isdigit()] + + REGION_SIG = "cc.nisc.oms.clientandserver.v2.pojo.Region/3192921568" + INTEGER_SIG = "java.lang.Integer/3438268394" + CATEGORY_KEY = "County" + + def get_index(val): + try: return string_table.index(val) + 1 + except ValueError: return 0 + + region_type_id = get_index(REGION_SIG) + integer_type_id = get_index(INTEGER_SIG) + county_type_id = get_index(CATEGORY_KEY) + + if region_type_id == 0: return [] + + results = [] + i = 0 + while i < len(stream): + if stream[i] == region_type_id: + try: + p = i + 1 + served = stream[p] if stream[p+1] == integer_type_id else 0 + p += 2 if served > 0 else 1 + out = stream[p] if stream[p+1] == integer_type_id else 0 + p += 2 if out > 0 else 1 + name_idx, cat_idx = stream[p], stream[p+1] + + if cat_idx == county_type_id: + name = string_table[name_idx - 1] if 0 < name_idx <= len(string_table) else "Unknown" + results.append({'county': name, 'state': self.state_filter, 'company': self.name, 'outages': out, 'served': served}) + except IndexError: pass + i += 1 + return results + except Exception as e: + logger.error(f"Could not parse county summary for {self.name}: {e}") + return [] + + +class GwtRpcProvider(GwtRpcBaseProvider, BaseProvider): + def __init__(self, config, session): + super().__init__(config, session) + self.transformer = None + self.STATE_BOUNDS = { + 'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7}, + 'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5}, + 'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9}, + 'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1} + } + if config.get('epsg'): + try: + self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True) + except: logger.error(f"EPSG Error for {self.name}") + + def fetch(self): + try: + data = self._fetch_rpc_data() + if data: + return self._extract_outages(data) + return [] + except Exception as e: + logger.error(f"Fetch error {self.name}: {e}") + return [] + + def _extract_outages(self, data_list): + results = [] + try: + string_table = next((item for item in data_list if isinstance(item, list)), None) + if not string_table: return [] + + stream_raw = [item for item in data_list if not isinstance(item, list)] + stream = [int(token) for token in stream_raw if isinstance(token, (int, float))] + + OUTAGE_SIG_KEYWORD = ".pojo.Outage/" + outage_sig_full = next((s for s in string_table if OUTAGE_SIG_KEYWORD in s), None) + if not outage_sig_full: return [] + + outage_type_id = string_table.index(outage_sig_full) + 1 + + i = 0 + while i < len(stream): + if stream[i] == outage_type_id: + try: + p = i + 1 + outagen = stream[p]; p += 1 + crew_status_idx = stream[p]; p += 1 + cause_idx = stream[p]; p += 1 + etr_high = stream[p]; p += 1 + etr_low = stream[p]; p += 1; p += 1 + start_high = stream[p]; p += 1 + start_low = stream[p]; p += 1; p += 1 + coord_x = stream[p]; p += 1 + coord_y = stream[p]; p += 1 + + lat, lon = None, None + if self.transformer and coord_x and coord_y: + try: + lon, lat = self.transformer.transform(coord_x, coord_y) + if not self._is_valid(lat, lon): lat, lon = None, None + except: pass + + if lat and lon: + start_ms = (start_high << 32) | start_low + etr_ms = (etr_high << 32) | etr_low + start_time = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc) if start_ms > 0 else None + etr_time = datetime.fromtimestamp(etr_ms / 1000, tz=timezone.utc) if etr_ms > 0 else None + + cause = string_table[cause_idx - 1].strip() if 0 < cause_idx <= len(string_table) else "Unknown" + crew_status = string_table[crew_status_idx - 1].strip() if 0 < crew_status_idx <= len(string_table) else "Unknown" + + results.append({ + 'incidentid': f"{self.name}-{lat:.5f}-{lon:.5f}", 'utility': self.name, + 'lat': lat, 'lon': lon, 'pointgeom': f"{lat:.5f},{lon:.5f}", + 'start': start_time, 'etr': etr_time, 'outagen': outagen, + 'cause': cause, 'crew_status': crew_status, + 'last_change': datetime.now(timezone.utc) + }) + except (IndexError, TypeError): + pass + i += 1 + return results + except Exception as e: + logger.error(f"Could not parse point outages for {self.name}: {e}") + return [] + + def _is_valid(self, lat, lon): + if not self.state_filter: return True + b = self.STATE_BOUNDS.get(self.state_filter) + if not b: return True + return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max'] \ No newline at end of file diff --git a/kubra.py b/kubra.py new file mode 100644 index 0000000..e267400 --- /dev/null +++ b/kubra.py @@ -0,0 +1,173 @@ +import requests +import json +import logging +import polyline +import mercantile +from datetime import datetime +from base import BaseProvider, BaseCountyProvider + +logger = logging.getLogger(__name__) + +class KubraCountyProvider(BaseCountyProvider): + def fetch(self): + meta_url = self.config.get('county_meta_url') + report_url_suffix = self.config.get('county_report_suffix') + + try: + # 1. Get hexes from meta_url + meta_resp = self.session.get(meta_url) + meta_data = meta_resp.json() + path = meta_data.get('data', {}).get('cluster_interval_generation_data') + if not path: return [] + + # 2. Construct final report URL + # The old script's logic reveals the path is composed of a base, + # the second hex from the metadata path, and the report suffix. + # Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1 + path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3'] + if len(path_parts) < 4: + logger.error(f"Invalid metadata path format for {self.name}: {path}") + return [] + + # This is the single, correct URL format used by the original script. + # It uses the fourth element (index 3) from the metadata path. + report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}" + + # 3. Fetch and process report + report_resp = self.session.get(report_url) + if not report_resp.ok or not report_resp.text: + logger.info(f"No county report data available for {self.name} at this time.") + return [] + + report_data = report_resp.json() + return self._normalize(report_data) + except json.JSONDecodeError: + logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.") + return [] + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching Kubra county data for {self.name}: {e}") + return [] + + def _normalize(self, data): + results = [] + primary_areas = data.get("file_data", {}).get("areas", []) + if not primary_areas: return [] + + first_item_key = primary_areas[0].get("key") + + if first_item_key == "state": + for state_area in primary_areas: + for county in state_area.get("areas", []): + if county.get("key") == "county": + results.append(self._extract_info(county)) + elif first_item_key == "county": + for county in primary_areas: + if county.get("key") == "county": + results.append(self._extract_info(county)) + return results + + def _extract_info(self, county_item): + return { + 'outages': county_item.get('cust_a', {}).get('val'), + 'served': county_item.get('cust_s'), + 'county': county_item.get('name', '').capitalize(), + 'state': county_item.get('state') or self.config.get('state_filter'), + 'company': self.name + } + +class KubraProvider(BaseProvider): + def __init__(self, config, session): + super().__init__(config, session) + self.max_zoom = 14 + self.results = [] + self.base_url_template = 'https://kubra.io/cluster-data/' + + def fetch(self): + meta_url = self.config.get('meta_url') + if not meta_url: return [] + + # Fetch hexes ONCE per run, not in the recursive loop. + self.hex1, self.hex2 = self._get_hexes(meta_url) + if not self.hex1 or not self.hex2: + logger.error(f"[{self.name}] Could not get session hex keys. Aborting fetch for this provider.") + return [] + + quadkeys = self.config.get('quadkeys', []) + + self.results = [] + self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0])) + return self.results + + def _get_hexes(self, url): + try: + resp = self.session.get(url) + path = resp.json().get('data', {}).get('cluster_interval_generation_data') + parts = path.split('/') + return parts[2], parts[3] + except: return None, None + + def _fetch_recursive(self, quadkeys, seen, zoom): + for q in quadkeys: + suffix = q[-3:][::-1] + url = f"{self.base_url_template}{suffix}/{self.hex1}/{self.hex2}/public/{self.config.get('layer')}/{q}.json" + try: + resp = self.session.get(url) + if not resp.ok: + continue + + file_data = resp.json().get('file_data', []) + for item in file_data: + desc = item.get('desc') + + # This mirrors the safe logic from the original power2.py's 'kubra' function. + # If 'desc' is missing, assume it's a cluster to be safe and drill down. + is_cluster = True if desc is None else desc.get('cluster', False) + + # If it's a cluster and we haven't hit max zoom, drill down. + if is_cluster and zoom + 1 <= self.max_zoom: + p_geom = item.get('geom', {}).get('p', []) + if p_geom: + next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1) + self._fetch_recursive([next_key], seen, zoom + 1) + else: + # Otherwise, it's a final outage record. Process it. + self.results.append(self._normalize(item)) + except Exception as e: + logger.error(f"[{self.name}] Unhandled exception in _fetch_recursive for {q}: {e}", exc_info=True) + + def _normalize(self, item): + # Ensure 'desc' is a dictionary, even if it's missing from the item. This prevents the AttributeError. + desc = item.get('desc') or {} + geom = item.get('geom', {}) + p = geom.get('p', [None])[0] if geom.get('p') else None + if not p: + return {} + latlon = polyline.decode(p)[0] + def ts(s): + if not s or s=='ETR-NULL': return None + try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z") + except: + try: return datetime.strptime(s, "%Y-%m-%dT%H:%M%z") + except: return None + + cause_dict = desc.get('cause') + cause = cause_dict.get('EN-US', "Pending") if cause_dict else "Pending" + + crew_dict = desc.get('crew_status') + crew_status = crew_dict.get('EN-US') if crew_dict else None + + return { + 'incidentid': desc.get('inc_id'), 'utility': self.name, + 'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'), + 'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')), + 'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': cause, + 'crew_status': crew_status, 'active': True + } + + def _get_quadkey_for_point(self, p, z): + ll = polyline.decode(p)[0] + return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z)) + + def _get_neighbors(self, q): + t = mercantile.quadkey_to_tile(q) + return [mercantile.quadkey(n) for n in mercantile.neighbors(t)] \ No newline at end of file diff --git a/newpower.py b/newpower.py index ed36de8..fca17ee 100644 --- a/newpower.py +++ b/newpower.py @@ -1,17 +1,27 @@ +import os +import sys +# Add the script's directory to the Python path to ensure modules can be found +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import requests import json import psycopg2 import logging -import os import re from datetime import datetime, timezone, timedelta -from abc import ABC, abstractmethod from urllib.parse import urlparse from requests.packages.urllib3.exceptions import InsecureRequestWarning # Import the helper module for auto-repair import get_rpc_config_auto +# Import provider classes +from base import BaseCountyProvider +from kubra import KubraCountyProvider +from simple import SimpleCountyJsonProvider +from nisc import NiscCountyProvider +from gwt_rpc import GwtRpcCountyProvider + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # --- LOGGING --- @@ -134,314 +144,6 @@ class CountyPowerDB: # --- PROVIDERS --- -class BaseCountyProvider(ABC): - def __init__(self, config, session): - self.config = config - self.session = session - self.name = config.get('name', 'Unknown') - - @abstractmethod - def fetch(self): - pass - -class SimpleCountyJsonProvider(BaseCountyProvider): - def fetch(self): - url = self.config.get('county_url') - state = self.config.get('state_filter') - try: - resp = self.session.get(url, verify=False) - if not resp.ok: return [] - data = resp.json() - results = [] - for boundary_group in data: - for item in boundary_group.get('boundaries', []): - results.append({ - 'outages': item.get('customersOutNow'), - 'served': item.get('customersServed'), - 'county': item.get('name'), - 'state': state, - 'company': self.name - }) - return results - except Exception as e: - logger.error(f"Error fetching {self.name}: {e}") - return [] - -class KubraCountyProvider(BaseCountyProvider): - def fetch(self): - meta_url = self.config.get('county_meta_url') - report_url_suffix = self.config.get('county_report_suffix') - - try: - # 1. Get hexes from meta_url - meta_resp = self.session.get(meta_url) - meta_data = meta_resp.json() - path = meta_data.get('data', {}).get('cluster_interval_generation_data') - if not path: return [] - - # 2. Construct final report URL - # The old script's logic reveals the path is composed of a base, - # the second hex from the metadata path, and the report suffix. - # Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1 - path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3'] - if len(path_parts) < 4: - logger.error(f"Invalid metadata path format for {self.name}: {path}") - return [] - - # This is the single, correct URL format used by the original script. - # It uses the fourth element (index 3) from the metadata path. - report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}" - - # 3. Fetch and process report - report_resp = self.session.get(report_url) - if not report_resp.ok or not report_resp.text: - logger.info(f"No county report data available for {self.name} at this time.") - return [] - - report_data = report_resp.json() - return self._normalize(report_data) - except json.JSONDecodeError: - logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.") - return [] - except requests.exceptions.RequestException as e: - logger.error(f"Error fetching Kubra county data for {self.name}: {e}") - return [] - - def _normalize(self, data): - results = [] - primary_areas = data.get("file_data", {}).get("areas", []) - if not primary_areas: return [] - - first_item_key = primary_areas[0].get("key") - - if first_item_key == "state": - for state_area in primary_areas: - for county in state_area.get("areas", []): - if county.get("key") == "county": - results.append(self._extract_info(county)) - elif first_item_key == "county": - for county in primary_areas: - if county.get("key") == "county": - results.append(self._extract_info(county)) - return results - - def _extract_info(self, county_item): - return { - 'outages': county_item.get('cust_a', {}).get('val'), - 'served': county_item.get('cust_s'), - 'county': county_item.get('name', '').capitalize(), - 'state': county_item.get('state') or self.config.get('state_filter'), - 'company': self.name - } - -class NiscCountyProvider(BaseCountyProvider): - """ Handles county data from NISC-hosted cloud sources. """ - def fetch(self): - url = self.config.get('county_url') - state = self.config.get('state_filter') - try: - resp = self.session.get(url, verify=False) - if not resp.ok: return [] - data = resp.json() - results = [] - # The structure is typically a list containing one object with a 'boundaries' key - for boundary_group in data: - for item in boundary_group.get('boundaries', []): - results.append({ - 'outages': item.get('customersOutNow'), - 'served': item.get('customersServed'), - 'county': item.get('name'), - 'state': state, - 'company': self.name - }) - return results - except Exception as e: - logger.error(f"Error fetching NISC county data for {self.name}: {e}") - return [] - -class GwtRpcCountyProvider(BaseCountyProvider): - """ Handles county data from GWT-RPC sources. """ - def __init__(self, config, session): - super().__init__(config, session) - self.state_filter = config.get('state_filter') - self.map_url = config.get('map_url') - - # Set up session headers and cookies from config - self.session.headers.update({ - 'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'), - 'Accept': '*/*', - 'Sec-Fetch-Site': 'same-origin' - }) - if config.get('cookies'): - for cookie in config['cookies']: - self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) - - def attempt_auto_repair(self): - if not self.map_url: return False - last_update = self.config.get('last_auto_update') - if last_update: - try: - last_dt = datetime.fromisoformat(last_update) - if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) - if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS): - logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).") - return False - except ValueError: pass - - logger.info(f"Attempting Auto-Repair for {self.name}...") - try: - _, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url) - if valid_headers and valid_body: - logger.info(f"Repair successful! Updating {self.name}.") - excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'} - clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded} - clean_headers['Referer'] = self.map_url - - # Update in-memory config for the current run - self.config.update({ - 'headers': clean_headers, 'body': valid_body, 'cookies': valid_cookies, - 'user_agent': valid_headers.get('user-agent'), - 'last_auto_update': datetime.now(timezone.utc).isoformat() - }) - # Update session for the current run - self.session.cookies.clear() - for cookie in valid_cookies: - self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) - - # Save to disk for next time - update_provider_config(self.name, self.config) - return True - except Exception as e: - logger.error(f"Auto-repair failed: {e}") - return False - - def fetch(self, is_retry=False): - url = self.config.get('url') - headers = self.config.get('headers', {}) - body = self.config.get('body') - if not url or not body: return [] - - try: - parsed_url = urlparse(url) - origin = f"{parsed_url.scheme}://{parsed_url.netloc}" - correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin - - req_headers = headers.copy() - req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8' - req_headers['Referer'] = correct_referer - - resp = self.session.post(url, headers=req_headers, data=body, verify=False) - - if "//EX" in resp.text or resp.status_code == 500: - logger.error(f"GWT Failure for {self.name} (County Fetch).") - if is_retry: return [] - if self.attempt_auto_repair(): - logger.info("Retrying county fetch with new settings...") - # After repair, self.config is updated, so we can just call fetch again. - return self.fetch(is_retry=True) - return [] - - if not resp.ok: return [] - text = resp.text.replace('//OK', '') - return self._extract_county_summary(json.loads(text)) - except Exception as e: - logger.error(f"County fetch error for {self.name}: {e}") - return [] - - def _extract_county_summary(self, data_list): - """ - Decodes a GWT-RPC payload to extract outage data for Counties. - This logic is adapted from test.py. - """ - try: - # 1. Separate Stream and String Table - string_table = None - stream_raw = [] - for item in data_list: - if isinstance(item, list): - string_table = item - break - else: - stream_raw.append(item) - - if not string_table: - logger.error(f"String table not found in payload for {self.name}.") - return [] - - # 2. Normalize the Stream - stream = [] - for token in stream_raw: - if isinstance(token, int): - stream.append(token) - elif isinstance(token, float): - stream.append(int(token)) - elif isinstance(token, str): - try: - stream.append(int(float(token))) - except ValueError: - pass # Ignore non-numeric strings - - # 3. Decode Logic - REGION_SIG = "cc.nisc.oms.clientandserver.v2.pojo.Region/3192921568" - INTEGER_SIG = "java.lang.Integer/3438268394" - CATEGORY_KEY = "County" - - def get_index(val): - try: return string_table.index(val) + 1 - except ValueError: return 0 - - region_type_id = get_index(REGION_SIG) - integer_type_id = get_index(INTEGER_SIG) - county_type_id = get_index(CATEGORY_KEY) - - if region_type_id == 0: - logger.error(f"Region type signature not found for {self.name}.") - return [] - - results = [] - i = 0 - stream_len = len(stream) - - while i < stream_len: - if stream[i] == region_type_id: - try: - p = i + 1 - - served = 0 - val1 = stream[p] - p += 1 - if p < stream_len and stream[p] == integer_type_id: - served = val1 - p += 1 - - out = 0 - val2 = stream[p] - p += 1 - if p < stream_len and stream[p] == integer_type_id: - out = val2 - p += 1 - - name_idx = stream[p] - p += 1 - cat_idx = stream[p] - - if cat_idx == county_type_id: - name = "Unknown" - if 0 < name_idx <= len(string_table): - name = string_table[name_idx - 1] - - results.append({ - 'county': name, 'state': self.state_filter, - 'company': self.name, 'outages': out, 'served': served - }) - except IndexError: - pass - i += 1 - return results - except (ValueError, IndexError, TypeError) as e: - logger.error(f"Could not parse county summary for {self.name}: {e}") - return [] - # --- REGISTRY --- PROVIDER_REGISTRY = { 'kubra_county': KubraCountyProvider, diff --git a/newpower2.py b/newpower2.py index 2fd0b2b..93ff3b7 100644 --- a/newpower2.py +++ b/newpower2.py @@ -1,13 +1,16 @@ +import os +import sys +# Add the script's directory to the Python path to ensure modules can be found +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import requests import polyline import json import psycopg2 import mercantile import logging -import os import re from datetime import datetime, timezone, timedelta -from abc import ABC, abstractmethod from urllib.parse import urlparse from pyproj import Transformer from requests.packages.urllib3.exceptions import InsecureRequestWarning @@ -15,6 +18,13 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning # Import the helper module for auto-repair import get_rpc_config_auto +# Import provider classes +from base import BaseProvider +from kubra import KubraProvider +from simple import SimpleJsonProvider +from gwt_rpc import GwtRpcProvider +from nisc import NiscHostedProvider + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # --- LOGGING --- @@ -114,482 +124,6 @@ class PowerDB: cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'") logger.info("Post-processing complete.") -# --- PROVIDERS --- - -class BaseProvider(ABC): - def __init__(self, config, session): - self.config = config - self.session = session - self.name = config.get('name', 'Unknown') - - @abstractmethod - def fetch(self): - pass - -class SimpleJsonProvider(BaseProvider): - def fetch(self): - url = self.config.get('url') - try: - resp = self.session.get(url, verify=False) - if not resp.ok: return [] - data = resp.json() - results = [] - for item in data: - results.append(self._normalize(item)) - return results - except Exception as e: - logger.error(f"Error fetching {self.name}: {e}") - return [] - - def _normalize(self, item): - def safe_parse(ts): - if not ts: return None - try: return datetime.fromisoformat(ts.replace('Z', '+00:00')) - except: return None - return { - 'incidentid': str(item.get('outageRecID')), 'utility': self.name, - 'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'), - 'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}", - 'areageom': None, 'start': safe_parse(item.get('outageStartTime')), - 'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'), - 'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'), - 'last_change': safe_parse(item.get('outageModifiedTime')) - } - -class KubraProvider(BaseProvider): - def __init__(self, config, session): - super().__init__(config, session) - self.max_zoom = 14 - self.results = [] - self.base_url_template = 'https://kubra.io/cluster-data/' - - def fetch(self): - meta_url = self.config.get('meta_url') - if not meta_url: return [] - - # Fetch hexes ONCE per run, not in the recursive loop. - self.hex1, self.hex2 = self._get_hexes(meta_url) - if not self.hex1 or not self.hex2: - logger.error(f"[{self.name}] Could not get session hex keys. Aborting fetch for this provider.") - return [] - - quadkeys = self.config.get('quadkeys', []) - - self.results = [] - self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0])) - return self.results - - def _get_hexes(self, url): - try: - resp = self.session.get(url) - path = resp.json().get('data', {}).get('cluster_interval_generation_data') - parts = path.split('/') - return parts[2], parts[3] - except: return None, None - - def _fetch_recursive(self, quadkeys, seen, zoom): - for q in quadkeys: - suffix = q[-3:][::-1] - url = f"{self.base_url_template}{suffix}/{self.hex1}/{self.hex2}/public/{self.config.get('layer')}/{q}.json" - try: - resp = self.session.get(url) - if not resp.ok: - continue - - file_data = resp.json().get('file_data', []) - for item in file_data: - desc = item.get('desc') - - # This mirrors the safe logic from the original power2.py's 'kubra' function. - # If 'desc' is missing, assume it's a cluster to be safe and drill down. - is_cluster = True if desc is None else desc.get('cluster', False) - - # If it's a cluster and we haven't hit max zoom, drill down. - if is_cluster and zoom + 1 <= self.max_zoom: - p_geom = item.get('geom', {}).get('p', []) - if p_geom: - next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1) - self._fetch_recursive([next_key], seen, zoom + 1) - else: - # Otherwise, it's a final outage record. Process it. - self.results.append(self._normalize(item)) - except Exception as e: - logger.error(f"[{self.name}] Unhandled exception in _fetch_recursive for {q}: {e}", exc_info=True) - - def _normalize(self, item): - # Ensure 'desc' is a dictionary, even if it's missing from the item. This prevents the AttributeError. - desc = item.get('desc') or {} - geom = item.get('geom', {}) - p = geom.get('p', [None])[0] if geom.get('p') else None - if not p: - return {} - latlon = polyline.decode(p)[0] - def ts(s): - if not s or s=='ETR-NULL': return None - try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z") - except: - try: return datetime.strptime(s, "%Y-%m-%dT%H:%M%z") - except: return None - - cause_dict = desc.get('cause') - cause = cause_dict.get('EN-US', "Pending") if cause_dict else "Pending" - - crew_dict = desc.get('crew_status') - crew_status = crew_dict.get('EN-US') if crew_dict else None - - return { - 'incidentid': desc.get('inc_id'), 'utility': self.name, - 'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'), - 'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')), - 'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': cause, - 'crew_status': crew_status, 'active': True - } - - def _get_quadkey_for_point(self, p, z): - ll = polyline.decode(p)[0] - return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z)) - - def _get_neighbors(self, q): - t = mercantile.quadkey_to_tile(q) - return [mercantile.quadkey(n) for n in mercantile.neighbors(t)] - -class GwtRpcProvider(BaseProvider): - def __init__(self, config, session): - super().__init__(config, session) - self.transformer = None - self.state_filter = config.get('state_filter') - self.map_url = config.get('map_url') - - # 1. Base Headers - self.session.headers.update({ - 'User-Agent': config.get('user_agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'), - 'Accept': '*/*', - 'Sec-Fetch-Site': 'same-origin' - }) - - parsed_url = urlparse(config.get('url')) - self.session.headers.update({'Origin': f"{parsed_url.scheme}://{parsed_url.netloc}"}) - - # 2. Load Cookies - if config.get('cookies'): - for cookie in config['cookies']: - try: - self.session.cookies.set( - cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path'] - ) - except: pass - - self.STATE_BOUNDS = { - 'WV': {'lat_min': 37.0, 'lat_max': 40.7, 'lon_min': -82.7, 'lon_max': -77.7}, - 'OH': {'lat_min': 38.4, 'lat_max': 42.0, 'lon_min': -84.9, 'lon_max': -80.5}, - 'KY': {'lat_min': 36.4, 'lat_max': 39.2, 'lon_min': -89.6, 'lon_max': -81.9}, - 'IA': {'lat_min': 40.3, 'lat_max': 43.6, 'lon_min': -96.7, 'lon_max': -90.1} - } - - if config.get('epsg'): - try: - self.transformer = Transformer.from_crs(f"EPSG:{config['epsg']}", "EPSG:4326", always_xy=True) - except: logger.error(f"EPSG Error for {self.name}") - - def attempt_auto_repair(self): - if not self.map_url: return False - - # --- Cooldown Check --- - last_update = self.config.get('last_auto_update') - if last_update: - try: - last_dt = datetime.fromisoformat(last_update) - if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) - if datetime.now(timezone.utc) - last_dt < timedelta(hours=AUTO_UPDATE_COOLDOWN_HOURS): - logger.info(f"Skipping auto-repair for {self.name} (Cooldown active).") - return False - except ValueError: pass - - logger.info(f"Attempting Auto-Repair for {self.name}...") - - try: - # Expecting 4 values: data, headers, cookies, body - new_data, valid_headers, valid_cookies, valid_body = get_rpc_config_auto.fetch_live_data(self.map_url) - - if valid_headers and valid_body: - logger.info(f"Repair successful! Updating {self.name}.") - - # Clean Headers (Blacklist approach) - excluded = {'content-length', 'host', 'connection', 'cookie', 'accept-encoding', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'origin'} - clean_headers = {k: v for k, v in valid_headers.items() if k.lower() not in excluded} - - # Ensure Referer is set correctly for next time - clean_headers['Referer'] = self.map_url - - # Update In-Memory Config - current_time = datetime.now(timezone.utc).isoformat() - self.config['headers'] = clean_headers - self.config['body'] = valid_body - self.config['cookies'] = valid_cookies - self.config['user_agent'] = valid_headers.get('user-agent') - self.config['last_auto_update'] = current_time - - # Update Session - self.session.cookies.clear() - for cookie in valid_cookies: - self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'], path=cookie['path']) - if valid_headers.get('user_agent'): - self.session.headers.update({'User-Agent': valid_headers.get('user-agent')}) - - # Save to Disk - new_settings = { - 'headers': clean_headers, - 'body': valid_body, - 'cookies': valid_cookies, - 'user_agent': valid_headers.get('user-agent') - } - update_provider_config(self.name, new_settings) - return True - except Exception as e: - logger.error(f"Auto-repair failed: {e}") - - return False - - def fetch(self, is_retry=False): - url = self.config.get('url') - headers = self.config.get('headers', {}) - body = self.config.get('body') - - if not url: return [] - - try: - # 3. Dynamic Origin Update - parsed_url = urlparse(url) - origin = f"{parsed_url.scheme}://{parsed_url.netloc}" - - # Priority: Configured Referer > Module Base > Origin - correct_referer = headers.get('Referer') or headers.get('x-gwt-module-base') or origin - - ua = headers.get('User-Agent', self.session.headers['User-Agent']) - if "Headless" in ua: # Fallback safety - ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' - - self.session.headers.update({ - 'Origin': origin, - 'Referer': correct_referer, - 'User-Agent': ua - }) - - # Prime if missing cookies or map_url is defined - if self.map_url and not self.config.get('cookies'): - try: self.session.get(correct_referer, verify=False, timeout=10) - except: pass - - req_headers = headers.copy() - req_headers['Content-Type'] = 'text/x-gwt-rpc; charset=UTF-8' - req_headers['Referer'] = correct_referer - req_headers['User-Agent'] = ua - - # Only fetch if we have a body - if body: - resp = self.session.post(url, headers=req_headers, data=body, verify=False) - else: - resp = type('obj', (object,), {'status_code': 500, 'text': 'No Body', 'ok': False})() - - # 5. Error Handling & Retry - failed = False - if "//EX" in resp.text: failed = True - if resp.status_code == 500: failed = True - - if failed: - logger.error(f"GWT Failure for {self.name} (Status: {resp.status_code}).") - - if is_retry: - logger.error(f"Retry failed for {self.name}. Aborting.") - return [] - - if self.attempt_auto_repair(): - logger.info("Retrying fetch with new settings...") - return self.fetch(is_retry=True) - else: - return [] - - if not resp.ok: return [] - text = resp.text - if text.startswith('//OK'): text = text[4:] - return self._extract_outages(json.loads(text)) - except Exception as e: - logger.error(f"Fetch error {self.name}: {e}") - return [] - - def _extract_outages(self, data_list): - """ - Decodes a GWT-RPC payload to extract detailed point outage information. - This is a more robust implementation that replaces the previous heuristic-based coordinate search. - """ - results = [] - try: - # 1. Separate Stream and String Table - string_table = next((item for item in data_list if isinstance(item, list)), None) - if not string_table: return [] - - stream_raw = [item for item in data_list if not isinstance(item, list)] - - # 2. Normalize the Stream - stream = [int(token) for token in stream_raw if isinstance(token, (int, float))] - - # 3. Define Signatures and Helper - # The signature can vary (e.g., cc.nisc... vs coop.nisc...). - # We search for a common, unique substring. - OUTAGE_SIG_KEYWORD = ".pojo.Outage/" - - outage_sig_full = next((s for s in string_table if OUTAGE_SIG_KEYWORD in s), None) - - if not outage_sig_full: - logger.error(f"Outage type signature not found for {self.name}.") - return [] - - # Get the 1-based index of the found signature - outage_type_id = string_table.index(outage_sig_full) + 1 - - # 4. Decode Logic - i = 0 - stream_len = len(stream) - while i < stream_len: - if stream[i] == outage_type_id: - try: - p = i + 1 - - # Field extraction based on observed GWT stream structure - outagen = stream[p] if p < stream_len else 0; p += 1 - crew_status_idx = stream[p] if p < stream_len else 0; p += 1 - cause_idx = stream[p] if p < stream_len else 0; p += 1 - - etr_high = stream[p] if p < stream_len else 0; p += 1 - etr_low = stream[p] if p < stream_len else 0; p += 1 - p += 1 # Skip long type ID - - start_high = stream[p] if p < stream_len else 0; p += 1 - start_low = stream[p] if p < stream_len else 0; p += 1 - p += 1 # Skip long type ID - - coord_x = stream[p] if p < stream_len else 0; p += 1 - coord_y = stream[p] if p < stream_len else 0; p += 1 - - # Process coordinates - lat, lon = None, None - if self.transformer and coord_x and coord_y: - try: - lon, lat = self.transformer.transform(coord_x, coord_y) - if not self._is_valid(lat, lon): lat, lon = None, None - except: pass - - if lat and lon: - # Process timestamps (GWT sends 64-bit longs as two 32-bit integers) - start_ms = (start_high << 32) | start_low - etr_ms = (etr_high << 32) | etr_low - start_time = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc) if start_ms > 0 else None - etr_time = datetime.fromtimestamp(etr_ms / 1000, tz=timezone.utc) if etr_ms > 0 else None - - # Resolve strings from table - cause = string_table[cause_idx - 1].strip() if 0 < cause_idx <= len(string_table) else "Unknown" - crew_status = string_table[crew_status_idx - 1].strip() if 0 < crew_status_idx <= len(string_table) else "Unknown" - - results.append({ - 'incidentid': f"{self.name}-{lat:.5f}-{lon:.5f}", - 'utility': self.name, - 'lat': lat, 'lon': lon, - 'pointgeom': f"{lat:.5f},{lon:.5f}", - 'areageom': None, - 'start': start_time, - 'etr': etr_time, - 'outagen': outagen, - 'cause': cause, - 'crew_status': crew_status, - 'active': True, - 'last_change': datetime.now(timezone.utc) - }) - except (IndexError, TypeError): - pass # Move to the next potential object - i += 1 - return results - except Exception as e: - logger.error(f"Could not parse point outages for {self.name}: {e}") - return [] - - def _is_valid(self, lat, lon): - if not self.state_filter: return True - b = self.STATE_BOUNDS.get(self.state_filter) - if not b: return True - return b['lat_min'] <= lat <= b['lat_max'] and b['lon_min'] <= lon <= b['lon_max'] - -class NiscHostedProvider(BaseProvider): - """ - Handles NISC Cloud Coop format (JSON with PROJ coordinate strings). - Example: Buckeye REC - """ - def __init__(self, config, session): - super().__init__(config, session) - self.transformer = None - proj_str = config.get('proj_string') - - if proj_str: - try: - # Create transformer from the custom PROJ string to WGS84 - # always_xy=True ensures we assume (Lon, Lat) output order - self.transformer = Transformer.from_proj(proj_str, "EPSG:4326", always_xy=True) - except Exception as e: - logger.error(f"Failed to initialize projection for {self.name}: {e}") - - def fetch(self, is_retry=False): - url = self.config.get('url') - try: - resp = self.session.get(url, verify=False) - if not resp.ok: - logger.error(f"{self.name} HTTP {resp.status_code}") - return [] - - data = resp.json() - outages_list = data.get('outages', []) - - results = [] - for item in outages_list: - results.append(self._normalize(item)) - return results - except Exception as e: - logger.error(f"Error fetching {self.name}: {e}") - return [] - - def _normalize(self, item): - # Coordinates - x, y = item.get('x'), item.get('y') - lat, lon = None, None - - if x is not None and y is not None and self.transformer: - try: - # Transform custom projection X/Y to Lon/Lat - lon, lat = self.transformer.transform(x, y) - except: pass - - # Timestamps (Epoch Milliseconds) - start_ts = None - time_off = item.get('timeOff') - if time_off: - try: - # Convert ms to seconds - start_ts = datetime.fromtimestamp(time_off / 1000, tz=timezone.utc) - except: pass - - return { - 'incidentid': str(item.get('id')), - 'utility': self.name, - 'lat': lat, - 'lon': lon, - 'pointgeom': f"{lat},{lon}" if lat else None, - 'areageom': None, - 'start': start_ts, - 'etr': None, - 'outagen': item.get('nbrOut', 1), - 'cause': "Unknown", - 'crew_status': "Unknown", - 'active': True, - 'last_change': datetime.now(timezone.utc) - } # --- REGISTRY --- PROVIDER_REGISTRY = { @@ -597,9 +131,10 @@ PROVIDER_REGISTRY = { 'simple_json': SimpleJsonProvider, 'gwt_rpc': GwtRpcProvider, 'nisc_hosted': NiscHostedProvider, - } + + # --- MAIN --- # --- MAIN (Point Scraper) --- def main(): diff --git a/nisc.py b/nisc.py new file mode 100644 index 0000000..3dca4d7 --- /dev/null +++ b/nisc.py @@ -0,0 +1,80 @@ +import logging +from datetime import datetime, timezone +from pyproj import Transformer +from base import BaseProvider, BaseCountyProvider + +logger = logging.getLogger(__name__) + +class NiscCountyProvider(BaseCountyProvider): + """ Handles county data from NISC-hosted cloud sources. """ + def fetch(self): + url = self.config.get('county_url') + state = self.config.get('state_filter') + try: + resp = self.session.get(url, verify=False) + if not resp.ok: return [] + data = resp.json() + results = [] + # The structure is typically a list containing one object with a 'boundaries' key + for boundary_group in data: + for item in boundary_group.get('boundaries', []): + results.append({ + 'outages': item.get('customersOutNow'), + 'served': item.get('customersServed'), + 'county': item.get('name'), + 'state': state, + 'company': self.name + }) + return results + except Exception as e: + logger.error(f"Error fetching NISC county data for {self.name}: {e}") + return [] + +class NiscHostedProvider(BaseProvider): + """ + Handles NISC Cloud Coop format (JSON with PROJ coordinate strings). + Example: Buckeye REC + """ + def __init__(self, config, session): + super().__init__(config, session) + self.transformer = None + proj_str = config.get('proj_string') + + if proj_str: + try: + self.transformer = Transformer.from_proj(proj_str, "EPSG:4326", always_xy=True) + except Exception as e: + logger.error(f"Failed to initialize projection for {self.name}: {e}") + + def fetch(self): + url = self.config.get('url') + try: + resp = self.session.get(url, verify=False) + if not resp.ok: + logger.error(f"{self.name} HTTP {resp.status_code}") + return [] + + data = resp.json() + return [self._normalize(item) for item in data.get('outages', [])] + except Exception as e: + logger.error(f"Error fetching {self.name}: {e}") + return [] + + def _normalize(self, item): + x, y = item.get('x'), item.get('y') + lat, lon = None, None + if x is not None and y is not None and self.transformer: + try: + lon, lat = self.transformer.transform(x, y) + except: pass + + time_off = item.get('timeOff') + start_ts = datetime.fromtimestamp(time_off / 1000, tz=timezone.utc) if time_off else None + + return { + 'incidentid': str(item.get('id')), 'utility': self.name, + 'lat': lat, 'lon': lon, 'pointgeom': f"{lat},{lon}" if lat else None, + 'start': start_ts, 'outagen': item.get('nbrOut', 1), + 'cause': "Unknown", 'crew_status': "Unknown", 'active': True, + 'last_change': datetime.now(timezone.utc) + } \ No newline at end of file diff --git a/simple.py b/simple.py new file mode 100644 index 0000000..344d912 --- /dev/null +++ b/simple.py @@ -0,0 +1,55 @@ +import logging +from datetime import datetime +from base import BaseProvider, BaseCountyProvider + +logger = logging.getLogger(__name__) + +class SimpleCountyJsonProvider(BaseCountyProvider): + def fetch(self): + url = self.config.get('county_url') + state = self.config.get('state_filter') + try: + resp = self.session.get(url, verify=False) + if not resp.ok: return [] + data = resp.json() + results = [] + for boundary_group in data: + for item in boundary_group.get('boundaries', []): + results.append({ + 'outages': item.get('customersOutNow'), + 'served': item.get('customersServed'), + 'county': item.get('name'), + 'state': state, + 'company': self.name + }) + return results + except Exception as e: + logger.error(f"Error fetching {self.name}: {e}") + return [] + +class SimpleJsonProvider(BaseProvider): + def fetch(self): + url = self.config.get('url') + try: + resp = self.session.get(url, verify=False) + if not resp.ok: return [] + data = resp.json() + return [self._normalize(item) for item in data] + except Exception as e: + logger.error(f"Error fetching {self.name}: {e}") + return [] + + def _normalize(self, item): + def safe_parse(ts): + if not ts: return None + try: return datetime.fromisoformat(ts.replace('Z', '+00:00')) + except: return None + return { + 'incidentid': str(item.get('outageRecID')), 'utility': self.name, + 'lat': item.get('outagePoint', {}).get('lat'), 'lon': item.get('outagePoint', {}).get('lng'), + 'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}", + 'areageom': None, 'start': safe_parse(item.get('outageStartTime')), + 'etr': safe_parse(item.get('outageEndTime')), 'outagen': item.get('customersOutNow'), + 'cause': item.get('cause'), 'crew_status': item.get('outageWorkStatus'), + 'last_change': safe_parse(item.get('outageModifiedTime')) + } \ No newline at end of file