diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a5ca19 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv/ +*.log diff --git a/newpower2.py b/newpower2.py new file mode 100644 index 0000000..fd6f9be --- /dev/null +++ b/newpower2.py @@ -0,0 +1,340 @@ +import requests +import polyline +import json +import psycopg2 +import mercantile +import logging +from datetime import datetime, timezone +from abc import ABC, abstractmethod + +# --- LOGGING SETUP --- +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.FileHandler('power2_new.log'), logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# --- CONFIGURATION --- +DB_CONFIG = {'host': 'localhost', 'database': 'nws', 'user': 'nws', 'password': 'nws'} +KUBRA_BASE_TEMPLATE = 'https://kubra.io/cluster-data/' + +from requests.packages.urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +# --- PROVIDER CONFIGURATION --- +# To add a new site, just add a dictionary here. +PROVIDERS = [ + { + 'name': 'AEP-WV', + 'type': 'kubra', + 'meta_url': "https://kubra.io/stormcenter/api/v1/stormcenters/6674f49e-0236-4ed8-a40a-b31747557ab7/views/8cfe790f-59f3-4ce3-a73f-a9642227411f/currentState?preview=false", + 'layer': 'cluster-2', + 'quadkeys': ['0320001','0320003','0320010','0320011','0320012','0320013','0320021','0320030','0320031','0320100','0320102','0320120'] + }, + { + 'name': 'AEP-OH', + 'type': 'kubra', + 'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/9c0735d8-b721-4dce-b80b-558e98ce1083/views/9b2feb80-69f8-4035-925e-f2acbcf1728e/currentState?preview=false', + 'layer': 'cluster-1', + 'quadkeys': ['0320013','0320010','0320011','0320012','0320003','0320001','0302322','0302233','0302232','0302223','0320102','0320100'] + }, + { + 'name': 'AEP-KY', + 'type': 'kubra', + 'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/23dcd38e-2573-4e20-a463-959b11cae011/views/60f31606-5702-4a1e-a74c-08d866b7a6fa/currentState?preview=false', + 'layer': 'cluster-2', + 'quadkeys': ['0320031','0320030','0320021','0320013','0320012','0320011','0320010','0320003','0320001'] + }, + { + 'name': 'FirstEnergy', + 'type': 'kubra', + 'meta_url': 'https://kubra.io/stormcenter/api/v1/stormcenters/6c715f0e-bbec-465f-98cc-0b81623744be/views/5ed3ddf1-3a6f-4cfd-8957-eba54b5baaad/currentState?preview=false', + 'layer': 'cluster-4', + 'quadkeys': ['030223','030232','032001','032003','032010','032012'] + }, + { + 'name': 'SouthCentral', + 'type': 'simple_json', + 'url': 'https://outage.southcentralpower.com/data/outages.json' + }, + { + 'name': 'Grayson', + 'type': 'simple_json', + 'url': 'https://outages.graysonrecc.com/data/outages.json' + } +] + +# --- DATABASE HANDLER --- +class PowerDB: + def __init__(self, config): + self.conn = psycopg2.connect(**config) + self.conn.autocommit = True + + def close(self): + self.conn.close() + + def upsert_outage(self, data): + sql = """ + INSERT INTO newpower + (incidentid, utility, lat, lon, pointgeom, areageom, start_time, etr, + outagen, peakoutage, cause, crew_status, active, last_change, fetch_time, geom) + VALUES + (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326)) + ON CONFLICT (pointgeom) DO UPDATE SET + outagen = EXCLUDED.outagen, + peakoutage = GREATEST(newpower.peakoutage, EXCLUDED.outagen), + cause = EXCLUDED.cause, + etr = EXCLUDED.etr, + crew_status = EXCLUDED.crew_status, + last_change = EXCLUDED.last_change, + fetch_time = EXCLUDED.fetch_time, + active = TRUE + """ + peak = data.get('outagen', 0) + params = ( + data.get('incidentid'), data.get('utility'), data.get('lat'), data.get('lon'), + data.get('pointgeom'), data.get('areageom'), data.get('start'), data.get('etr'), + data.get('outagen'), peak, data.get('cause'), data.get('crew_status'), + True, data.get('last_change', datetime.now(timezone.utc)), datetime.now(timezone.utc), + data.get('lon'), data.get('lat') + ) + with self.conn.cursor() as cursor: + cursor.execute(sql, params) + + def run_post_processing(self): + logger.info("Running post-processing...") + with self.conn.cursor() as cursor: + cursor.execute('UPDATE newpower SET county = c.countyname FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.county IS NULL') + cursor.execute('UPDATE newpower SET state = c.state FROM public.county c WHERE ST_Contains(c.geom, newpower.geom) AND newpower.state IS NULL') + cursor.execute('UPDATE newpower SET cwa = f.cwa FROM public.fzone f WHERE ST_Contains(f.geom, newpower.geom) AND newpower.cwa IS NULL') + cursor.execute('UPDATE newpower SET realareageom = ST_LineFromEncodedPolyline(areageom) WHERE areageom IS NOT NULL AND realareageom IS NULL') + cursor.execute("UPDATE newpower SET active = TRUE WHERE fetch_time > NOW() - INTERVAL '30 minutes'") + cursor.execute("UPDATE newpower SET active = FALSE WHERE fetch_time < NOW() - INTERVAL '30 minutes'") + cursor.execute("DELETE FROM newpower WHERE fetch_time < NOW() - INTERVAL '365 days'") + logger.info("Post-processing complete.") + + +# --- PROVIDER ARCHITECTURE --- + +class BaseProvider(ABC): + """Abstract base class for all providers""" + def __init__(self, config, session): + self.config = config + self.session = session + self.name = config.get('name', 'Unknown') + + @abstractmethod + def fetch(self): + """Must return a list of standardized outage dictionaries""" + pass + +class SimpleJsonProvider(BaseProvider): + """Handles sites that return a flat JSON list of outages""" + def fetch(self): + url = self.config.get('url') + if not url: + logger.error(f"Missing URL for {self.name}") + return [] + + try: + resp = self.session.get(url, verify=False) + if not resp.ok: + logger.error(f"{self.name} returned {resp.status_code}") + return [] + + data = resp.json() + results = [] + for item in data: + results.append(self._normalize(item)) + return results + except Exception as e: + logger.error(f"Error fetching {self.name}: {e}") + return [] + + def _normalize(self, item): + # Helper to parse ISO strings safely + def safe_parse(ts): + if not ts: return None + try: return datetime.fromisoformat(ts.replace('Z', '+00:00')) + except: return None + + return { + 'incidentid': str(item.get('outageRecID')), + 'utility': self.name, + 'lat': item.get('outagePoint', {}).get('lat'), + 'lon': item.get('outagePoint', {}).get('lng'), + 'pointgeom': f"{item.get('outagePoint', {}).get('lat')},{item.get('outagePoint', {}).get('lng')}", + 'areageom': None, + 'start': safe_parse(item.get('outageStartTime')), + 'etr': safe_parse(item.get('outageEndTime')), + 'outagen': item.get('customersOutNow'), + 'cause': item.get('cause'), + 'crew_status': item.get('outageWorkStatus'), + 'last_change': safe_parse(item.get('outageModifiedTime')) + } + +class KubraProvider(BaseProvider): + """Handles Kubra StormCenter recursive quadkey fetching""" + def __init__(self, config, session): + super().__init__(config, session) + self.max_zoom = 14 + self.results = [] + + def fetch(self): + # 1. Dynamic Setup: Get hex keys + meta_url = self.config.get('meta_url') + if not meta_url: return [] + + hex1, hex2 = self._get_hexes(meta_url) + if not hex1: + logger.error(f"{self.name}: Could not fetch hex keys") + return [] + + self.base_url = f"{KUBRA_BASE_TEMPLATE}{hex1}/{hex2}/" + self.layer = self.config.get('layer') + quadkeys = self.config.get('quadkeys', []) + + # 2. Recursive Fetch + self.results = [] + self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0])) + return self.results + + def _get_hexes(self, url): + try: + resp = self.session.get(url) + path = resp.json().get('data', {}).get('cluster_interval_generation_data') + parts = path.split('/') + return parts[2], parts[3] + except Exception as e: + logger.error(f"Hex fetch error {self.name}: {e}") + return None, None + + def _fetch_recursive(self, quadkeys, seen, zoom): + for q in quadkeys: + suffix = q[-3:][::-1] + url = f"{self.base_url}{suffix}/public/{self.layer}/{q}.json" + + if url in seen: continue + seen.add(url) + + try: + resp = self.session.get(url) + if not resp.ok or not 'application/json' in resp.headers.get('Content-Type', ''): + continue + + for item in resp.json().get('file_data', []): + desc = item.get('desc', {}) + if desc.get('cluster', False): + if zoom + 1 > self.max_zoom: + self.results.append(self._normalize(item)) + else: + p_geom = item.get('geom', {}).get('p', []) + if p_geom: + next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1) + self._fetch_recursive([next_key], seen, zoom + 1) + else: + self.results.append(self._normalize(item)) + neighbors = self._get_neighboring_quadkeys(q) + self._fetch_recursive(neighbors, seen, zoom) + except Exception as e: + logger.error(f"Error reading quadkey {q}: {e}") + + def _normalize(self, item): + desc = item.get('desc', {}) + geom = item.get('geom', {}) + point_poly = geom.get('p', [None])[0] + if not point_poly: return {} + + latlon = polyline.decode(point_poly)[0] + + def parse_ts(ts_str): + if not ts_str or ts_str == 'ETR-NULL': return None + try: return datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z") + except: + try: return datetime.strptime(ts_str, "%Y-%m-%dT%H:%M%z") + except: return None + + cause = desc.get('cause', {}) + if isinstance(cause, dict): cause = cause.get('EN-US') + + return { + 'incidentid': desc.get('inc_id'), + 'utility': self.name, + 'lat': latlon[0], + 'lon': latlon[1], + 'pointgeom': point_poly, + 'areageom': geom.get('a'), + 'start': parse_ts(desc.get('start_time')), + 'etr': parse_ts(desc.get('etr')), + 'outagen': desc.get('cust_a', {}).get('val', 0), + 'cause': cause or "Pending Investigation", + 'crew_status': desc.get('crew_status', {}).get('EN-US'), + 'active': True + } + + def _get_quadkey_for_point(self, polyline_str, zoom): + latlon = polyline.decode(polyline_str)[0] + return mercantile.quadkey(mercantile.tile(lng=latlon[1], lat=latlon[0], zoom=zoom)) + + def _get_neighboring_quadkeys(self, quadkey): + tile = mercantile.quadkey_to_tile(quadkey) + neighbors = [ + mercantile.Tile(x=tile.x, y=tile.y - 1, z=tile.z), + mercantile.Tile(x=tile.x + 1, y=tile.y, z=tile.z), + mercantile.Tile(x=tile.x, y=tile.y + 1, z=tile.z), + mercantile.Tile(x=tile.x - 1, y=tile.y, z=tile.z), + mercantile.Tile(x=tile.x + 1, y=tile.y - 1, z=tile.z), + mercantile.Tile(x=tile.x + 1, y=tile.y + 1, z=tile.z), + mercantile.Tile(x=tile.x - 1, y=tile.y - 1, z=tile.z), + mercantile.Tile(x=tile.x - 1, y=tile.y + 1, z=tile.z), + ] + return [mercantile.quadkey(t) for t in neighbors if t.x >= 0 and t.y >= 0] + + +# --- REGISTRY --- +# Map string types to Classes +PROVIDER_REGISTRY = { + 'kubra': KubraProvider, + 'simple_json': SimpleJsonProvider +} + +# --- MAIN --- + +def main(): + S = requests.Session() + S.verify = False + db = PowerDB(DB_CONFIG) + + logger.info("Starting Power Scraper...") + + for config in PROVIDERS: + p_type = config.get('type') + p_name = config.get('name') + + ProviderClass = PROVIDER_REGISTRY.get(p_type) + + if ProviderClass: + try: + provider = ProviderClass(config, S) + logger.info(f"Fetching {p_name}...") + outages = provider.fetch() + + count = 0 + for outage in outages: + if outage: # Ensure valid data + db.upsert_outage(outage) + count += 1 + logger.info(f"Saved {count} records for {p_name}") + + except Exception as e: + logger.error(f"Critical error running {p_name}: {e}") + else: + logger.warning(f"Unknown provider type '{p_type}' for {p_name}") + + db.run_post_processing() + db.close() + logger.info("Scraping complete.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/power2.MD b/power2.MD new file mode 100644 index 0000000..01c9cf4 --- /dev/null +++ b/power2.MD @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +This Markdown document details the architecture, configuration, and maintenance procedures for the `newpower2.py` script. + +# Power Outage Scraper (`newpower2.py`) Specification + +## 1\. Overview + +The `newpower2.py` script is a modular, extensible Python application designed to scrape, normalize, and store high-resolution power outage data from multiple utility providers. It supports distinct scraping strategies (Kubra Storm Center, Simple JSON) and persists standardized data into a PostGIS-enabled PostgreSQL database. + +----- + +## 2\. Architecture + +The script utilizes a **Strategy Pattern** to handle different utility API formats while maintaining a unified execution pipeline. + +[Image of strategy design pattern class diagram] + +### Core Components + +1. **Provider Registry:** A configuration list (`PROVIDERS`) that defines every utility to be scraped. +2. **Provider Classes:** + * `BaseProvider` (Abstract): Defines the blueprint for all scrapers. + * `KubraProvider`: Implements recursive quadkey drilling for Kubra maps. + * `SimpleJsonProvider`: Implements flat list parsing for standard JSON APIs. +3. **Database Handler (`PowerDB`):** Manages connection pooling, upsert operations, and post-processing SQL tasks. +4. **Main Loop:** Iterates through the registry, instantiates the correct provider class based on the `type` field, and executes the fetch. + +----- + +## 3\. Database Schema (`newpower`) + +The script writes to a table named `newpower`. Ensure PostGIS is enabled (`CREATE EXTENSION postgis;`). + +| Column | Type | Description | +| :--- | :--- | :--- | +| `id` | `SERIAL PRIMARY KEY` | Auto-incrementing unique ID. | +| `incidentid` | `TEXT` | Utility-assigned ID (or synthetic composite). | +| `utility` | `TEXT` | Name of the utility (e.g., 'AEP-WV'). | +| `lat` / `lon` | `FLOAT` | Coordinates of the outage. | +| `pointgeom` | `TEXT UNIQUE` | Encoded polyline or coordinate string. **Used for deduplication.** | +| `geom` | `GEOMETRY(Point, 4326)` | PostGIS Point object for spatial queries. | +| `areageom` | `TEXT` | Encoded polyline for outage areas (if available). | +| `realareageom` | `GEOMETRY(LineString)` | PostGIS LineString decoded from `areageom`. | +| `outagen` | `INTEGER` | Current number of customers affected. | +| `peakoutage` | `INTEGER` | Max customers affected (tracked over time). | +| `start_time` | `TIMESTAMPTZ` | Reported start time. | +| `etr` | `TIMESTAMPTZ` | Estimated Time of Restoration. | +| `active` | `BOOLEAN` | `TRUE` if currently active, `FALSE` if restored. | +| `fetch_time` | `TIMESTAMPTZ` | Timestamp of the last successful scrape. | + +### Post-Processing + +After every run, the script executes SQL to: + + * **Enrich Data:** Spatial joins with `county` and `fzone` tables to populate `county`, `state`, and `cwa` columns. + * **Decode Geometry:** Converts `areageom` strings into `realareageom` PostGIS objects. + * **Update Status:** Sets `active = FALSE` for records not seen in the last 30 minutes. + * **Cleanup:** deletes records older than 365 days. + +----- + +## 4\. Configuration + +All utility configurations are centralized in the `PROVIDERS` list at the top of the script. + +### Adding a "Simple JSON" Provider + +Used for utilities that return a flat list of outages (e.g., South Central Power). +""" + +{ + 'name': 'Utility Name', + 'type': 'simple_json', + 'url': 'https://example.com/data/outages.json' +} + +"""### Adding a "Kubra" Provider + +Used for utilities utilizing Kubra Storm Center maps (e.g., AEP, FirstEnergy). +""" + +{ + 'name': 'AEP-WV', + 'type': 'kubra', + 'meta_url': 'https://kubra.io/.../currentState?preview=false', # The 'Current State' URL + 'layer': 'cluster-2', # Found via get_kubra_config.py + 'quadkeys': ['0320001', '0320003'] # Generated via generate_keys.py +} + +"""----- + +## 5\. Helper Scripts + +Two utility scripts assist in configuring new Kubra providers: + +### A. `get_kubra_config.py` + +**Purpose:** Finds the hidden Cluster Layer ID and Hex Keys. + + * **Input:** The "Current State" URL found in the browser Network tab. + * **Output:** The `layer` ID (e.g., `cluster-1`) and constructed base URL. + +### B. `generate_keys.py` + +**Purpose:** Generates the list of starting map tiles (`quadkeys`) for a specific region. + + * **Input:** A WKT (Well-Known Text) geometry string from your database. + * *SQL:* `SELECT ST_AsText(geom) FROM county WHERE ...` + * **Output:** A Python list of strings `['032...', '032...']`. + +----- + +## 6\. Extensibility + +To add support for a new API format (e.g., XML): + +1. **Define Class:** Create a class inheriting from `BaseProvider`. +2. **Implement `fetch()`:** Write logic to download and normalize data into the standard dictionary format. +3. **Register:** Add the class to `PROVIDER_REGISTRY`. + + +""" + +class XmlProvider(BaseProvider): + def fetch(self): + # ... XML parsing logic ... + return [standardized_outages] + +# Register it +PROVIDER_REGISTRY = { + 'kubra': KubraProvider, + 'simple_json': SimpleJsonProvider, + 'xml': XmlProvider +} diff --git a/setup_seed_quadkeys.py b/setup_seed_quadkeys.py new file mode 100644 index 0000000..72529d5 --- /dev/null +++ b/setup_seed_quadkeys.py @@ -0,0 +1,100 @@ +""" +HELPER SCRIPT: generate_keys.py +------------------------------- +This script generates a list of 'Quadkeys' (map tiles) that intersect a specific +geographic area (WKT Geometry). This list is used to seed the scraper in 'power2.py' +so it knows exactly where to look for outages without scanning the whole world. + +PREREQUISITES: + pip install shapely mercantile + +USAGE INSTRUCTIONS: +1. Obtain the WKT (Well-Known Text) geometry for your target area from your database. + + Example SQL to get WKT for a specific county: + SELECT ST_AsText(geom) FROM county WHERE countyname = 'Kanawha' AND state = 'WV'; + + Example SQL to get WKT for a whole NWS Warning Area (Union of counties): + SELECT ST_AsText(ST_Union(geom)) FROM county WHERE cwa = 'RLX'; + +2. Run this script: + python generate_keys.py + +3. Paste the huge text string returned by your SQL query into the prompt and hit Enter. + +4. Copy the resulting 'KEY_LIST = [...]' output and paste it into the configuration + section of 'power2.py' (e.g., replace AEP_WV_KEYS). +""" + +import mercantile +from shapely import wkt +from shapely.geometry import box +import sys + +# --- CONFIGURATION --- +# The zoom level to generate keys for. +# Level 7 is standard for the "entry" lists in your scraper (e.g. '0320001'). +# If the utility uses a different zoom level for its top-level clusters, adjust this. +TARGET_ZOOM = 7 + +# Increase the CSV field size limit to handle massive WKT strings +import csv +csv.field_size_limit(sys.maxsize) + +def generate_keys_from_wkt(wkt_string): + """ + Takes a WKT geometry string, finds the bounding box, generates all tiles + within that box, and filters them to keep only those that actually + intersect the geometry. + """ + try: + print("Parsing WKT...") + # 1. Parse the WKT + service_area = wkt.loads(wkt_string) + + # 2. Get the Bounding Box (minx, miny, maxx, maxy) + bounds = service_area.bounds + + # 3. Generate all tiles covering the bounding box at the target zoom + # mercantile.tiles(west, south, east, north, zooms) + candidate_tiles = list(mercantile.tiles(bounds[0], bounds[1], bounds[2], bounds[3], zooms=[TARGET_ZOOM])) + + valid_keys = [] + + print(f"Scanning {len(candidate_tiles)} candidate tiles...") + + for tile in candidate_tiles: + # 4. Create a geometry for the tile itself + # mercantile.bounds returns (west, south, east, north) + t_bounds = mercantile.bounds(tile) + tile_geom = box(t_bounds.west, t_bounds.south, t_bounds.east, t_bounds.north) + + # 5. Intersection Check: Does this tile actually touch the service area? + # We use intersects() because contains() might miss tiles that only partially overlap. + if service_area.intersects(tile_geom): + valid_keys.append(mercantile.quadkey(tile)) + + # 6. Output formatted for Python list + valid_keys.sort() + print(f"\nFound {len(valid_keys)} intersecting tiles.") + print("-" * 30) + print(f"KEY_LIST = {valid_keys}") + print("-" * 30) + + except Exception as e: + print(f"Error processing geometry: {e}") + +if __name__ == "__main__": + print("Paste your PostGIS WKT (Well-Known Text) string below and press Enter:") + + # Use standard input reading to handle potentially large inputs better than input() + try: + user_wkt = input().strip() + if user_wkt: + generate_keys_from_wkt(user_wkt) + else: + print("No input provided.") + except EOFError: + print("Error reading input.") + except KeyboardInterrupt: + print("\nCancelled.") \ No newline at end of file