add archive
This commit is contained in:
62
newpower.py
62
newpower.py
@@ -81,53 +81,35 @@ class CountyPowerDB:
|
||||
self.conn.autocommit = True
|
||||
def close(self):
|
||||
self.conn.close()
|
||||
|
||||
def upsert_and_zero_outages(self, company_name, outage_data, fetch_time):
|
||||
|
||||
def insert_outage_snapshot(self, outage_data, fetch_time):
|
||||
"""
|
||||
Atomically updates outage information for a given company.
|
||||
1. UPSERTS counties with active outages, updating their counts.
|
||||
2. SETS outage count to 0 for any other county from that company that was not in the active list.
|
||||
Inserts a snapshot of county outage data for a given fetch time.
|
||||
This creates a historical record of outages.
|
||||
"""
|
||||
|
||||
# Prepare data for counties with active outages
|
||||
active_outage_values = []
|
||||
reported_counties = []
|
||||
all_values = []
|
||||
for item in outage_data:
|
||||
if all(k in item for k in ['county', 'state', 'company']):
|
||||
val = (
|
||||
item['county'], item['state'], item['company'],
|
||||
item.get('outages'), item.get('served'), fetch_time
|
||||
item.get('outages'),
|
||||
item.get('served'),
|
||||
item['county'],
|
||||
item['state'],
|
||||
fetch_time,
|
||||
item['company']
|
||||
)
|
||||
active_outage_values.append(val)
|
||||
reported_counties.append(item['county'])
|
||||
|
||||
all_values.append(val)
|
||||
|
||||
with self.conn.cursor() as cursor:
|
||||
# Step 1: UPSERT active outages
|
||||
if active_outage_values:
|
||||
upsert_sql = """
|
||||
INSERT INTO newcountyoutages (county, state, company, outages, served, fetch_time)
|
||||
if all_values:
|
||||
# Use a simple INSERT to create a historical record for each run.
|
||||
# The column order matches the old power3.py script for the `countyoutages` table.
|
||||
sql = """
|
||||
INSERT INTO newcountyoutages (outages, served, county, state, fetch_time, company)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (county, state, company) DO UPDATE SET
|
||||
outages = EXCLUDED.outages,
|
||||
served = COALESCE(EXCLUDED.served, newcountyoutages.served),
|
||||
fetch_time = EXCLUDED.fetch_time;
|
||||
"""
|
||||
cursor.executemany(upsert_sql, active_outage_values)
|
||||
logger.info(f"Upserted {len(active_outage_values)} active outage records for {company_name}.")
|
||||
|
||||
# Step 2: Set outages to 0 for any other county from this company
|
||||
# This correctly creates point-in-time zero records by updating the fetch_time.
|
||||
zero_out_sql = """
|
||||
UPDATE newcountyoutages
|
||||
SET outages = 0, fetch_time = %s
|
||||
WHERE company = %s AND county NOT IN %s;
|
||||
"""
|
||||
# Ensure reported_counties is not empty to avoid "IN (NULL)"
|
||||
if not reported_counties:
|
||||
reported_counties.append("NO_COUNTIES_REPORTED_DUMMY_VALUE")
|
||||
|
||||
cursor.execute(zero_out_sql, (fetch_time, company_name, tuple(reported_counties)))
|
||||
logger.info(f"Zeroed out {cursor.rowcount} resolved outage records for {company_name}.")
|
||||
cursor.executemany(sql, all_values)
|
||||
logger.info(f"Inserted {len(all_values)} county outage records for this run.")
|
||||
|
||||
def run_post_processing(self):
|
||||
logger.info("Running post-processing for county data...")
|
||||
@@ -176,8 +158,8 @@ def main():
|
||||
outages = provider.fetch()
|
||||
logger.info(f"Found {len(outages)} active outage records for {p_name}.")
|
||||
|
||||
# Process this company's data in a single transaction
|
||||
db.upsert_and_zero_outages(p_name, outages, run_timestamp)
|
||||
# Insert all collected outages as a new snapshot for this run time.
|
||||
db.insert_outage_snapshot(outages, run_timestamp)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {p_name}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user