import requests import json import logging import polyline import mercantile from datetime import datetime from providers.base import BaseProvider, BaseCountyProvider logger = logging.getLogger(__name__) class KubraCountyProvider(BaseCountyProvider): def fetch(self): meta_url = self.config.get('county_meta_url') report_url_suffix = self.config.get('county_report_suffix') try: # 1. Get hexes from meta_url meta_resp = self.session.get(meta_url) meta_data = meta_resp.json() path = meta_data.get('data', {}).get('cluster_interval_generation_data') if not path: return [] # 2. Construct final report URL # The old script's logic reveals the path is composed of a base, # the second hex from the metadata path, and the report suffix. # Example path from meta: data/e2ae0326-9912-436a-9355-eb2687e798b1 path_parts = path.split('/') # e.g., ['data', 'hex1', 'hex2', 'hex3'] if len(path_parts) < 4: logger.error(f"Invalid metadata path format for {self.name}: {path}") return [] # This is the single, correct URL format used by the original script. # It uses the fourth element (index 3) from the metadata path. report_url = f"https://kubra.io/data/{path_parts[3]}{report_url_suffix}" # 3. Fetch and process report report_resp = self.session.get(report_url) if not report_resp.ok or not report_resp.text: logger.info(f"No county report data available for {self.name} at this time.") return [] report_data = report_resp.json() return self._normalize(report_data) except json.JSONDecodeError: logger.warning(f"Could not decode JSON from county report for {self.name}. The report may be empty or invalid.") return [] except requests.exceptions.RequestException as e: logger.error(f"Error fetching Kubra county data for {self.name}: {e}") return [] def _normalize(self, data): results = [] primary_areas = data.get("file_data", {}).get("areas", []) if not primary_areas: return [] first_item_key = primary_areas[0].get("key") if first_item_key == "state": for state_area in primary_areas: for county in state_area.get("areas", []): if county.get("key") == "county": results.append(self._extract_info(county)) elif first_item_key == "county": for county in primary_areas: if county.get("key") == "county": results.append(self._extract_info(county)) return results def _extract_info(self, county_item): return { 'outages': county_item.get('cust_a', {}).get('val'), 'served': county_item.get('cust_s'), 'county': county_item.get('name', '').capitalize(), 'state': county_item.get('state') or self.config.get('state_filter'), 'company': self.name } class KubraProvider(BaseProvider): def __init__(self, config, session): super().__init__(config, session) self.max_zoom = 14 self.results = [] self.base_url_template = 'https://kubra.io/cluster-data/' def fetch(self): meta_url = self.config.get('meta_url') if not meta_url: return [] # Fetch hexes ONCE per run, not in the recursive loop. self.hex1, self.hex2 = self._get_hexes(meta_url) if not self.hex1 or not self.hex2: logger.error(f"[{self.name}] Could not get session hex keys. Aborting fetch for this provider.") return [] quadkeys = self.config.get('quadkeys', []) self.results = [] self._fetch_recursive(quadkeys, set(), zoom=len(quadkeys[0])) return self.results def _get_hexes(self, url): try: resp = self.session.get(url) path = resp.json().get('data', {}).get('cluster_interval_generation_data') parts = path.split('/') return parts[2], parts[3] except: return None, None def _fetch_recursive(self, quadkeys, seen, zoom): for q in quadkeys: suffix = q[-3:][::-1] url = f"{self.base_url_template}{suffix}/{self.hex1}/{self.hex2}/public/{self.config.get('layer')}/{q}.json" try: resp = self.session.get(url) if not resp.ok: continue file_data = resp.json().get('file_data', []) for item in file_data: desc = item.get('desc') # This mirrors the safe logic from the original power2.py's 'kubra' function. # If 'desc' is missing, assume it's a cluster to be safe and drill down. is_cluster = True if desc is None else desc.get('cluster', False) # If it's a cluster and we haven't hit max zoom, drill down. if is_cluster and zoom + 1 <= self.max_zoom: p_geom = item.get('geom', {}).get('p', []) if p_geom: next_key = self._get_quadkey_for_point(p_geom[0], zoom + 1) self._fetch_recursive([next_key], seen, zoom + 1) else: # Otherwise, it's a final outage record. Process it. self.results.append(self._normalize(item)) except Exception as e: logger.error(f"[{self.name}] Unhandled exception in _fetch_recursive for {q}: {e}", exc_info=True) def _normalize(self, item): # Ensure 'desc' is a dictionary, even if it's missing from the item. This prevents the AttributeError. desc = item.get('desc') or {} geom = item.get('geom', {}) p = geom.get('p', [None])[0] if geom.get('p') else None if not p: return {} latlon = polyline.decode(p)[0] def ts(s): if not s or s=='ETR-NULL': return None try: return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z") except: try: return datetime.strptime(s, "%Y-%m-%dT%H:%M%z") except: return None cause_dict = desc.get('cause') cause = cause_dict.get('EN-US', "Pending") if cause_dict else "Pending" crew_dict = desc.get('crew_status') crew_status = crew_dict.get('EN-US') if crew_dict else None return { 'incidentid': desc.get('inc_id'), 'utility': self.name, 'lat': latlon[0], 'lon': latlon[1], 'pointgeom': p, 'areageom': geom.get('a'), 'start': ts(desc.get('start_time')), 'etr': ts(desc.get('etr')), 'outagen': desc.get('cust_a', {}).get('val', 0), 'cause': cause, 'crew_status': crew_status, 'active': True } def _get_quadkey_for_point(self, p, z): ll = polyline.decode(p)[0] return mercantile.quadkey(mercantile.tile(lng=ll[1], lat=ll[0], zoom=z)) def _get_neighbors(self, q): t = mercantile.quadkey_to_tile(q) return [mercantile.quadkey(n) for n in mercantile.neighbors(t)]