Files
test/ky511.py
2025-11-27 22:25:36 +00:00

395 lines
31 KiB
Python

import requests
import json
import traceback # For detailed error reporting
import psycopg2 # For PostgreSQL interaction
from psycopg2 import extras
import os
from datetime import datetime
import pytz # <--- Import pytz for timezone handling
# --- Database Configuration (Use Environment Variables!) ---
DB_NAME = os.environ.get("PG_DBNAME", "nws")
DB_USER = os.environ.get("PG_USER", "nws")
DB_PASSWORD = os.environ.get("PG_PASSWORD", "nws")
DB_HOST = os.environ.get("PG_HOST", "localhost")
DB_PORT = os.environ.get("PG_PORT", "5432")
DB_SCHEMA = "ky511" # Specify schema if used, otherwise set to None or remove usage
DB_TABLE = "ky511"
TABLE_NAME_QUALIFIED = f"{DB_SCHEMA}.{DB_TABLE}" if DB_SCHEMA else DB_TABLE
# --- Looker Studio API Configuration ---
# ... (URL, HEADERS, COOKIES, PAYLOADS remain the same - ensure they are valid/fresh) ...
URL = "https://lookerstudio.google.com/batchedDataV2?appVersion=20250324_0406"
# HEADERS, COOKIES, payload_req1, payload_req2, combined_payload - keep as provided
# (Make sure HEADERS/COOKIES are up-to-date for authentication)
HEADERS = { "authority": "lookerstudio.google.com", "method": "POST", "path": "/batchedDataV2?appVersion=20250324_0406", "scheme": "https", "accept": "application/json, text/plain, */*", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "en-US,en;q=0.9", "cache-control": "no-cache", "origin": "https://lookerstudio.google.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://lookerstudio.google.com/reporting/1413fcfb-1416-4e56-8967-55f8e9f30ec8/page/p_pbm4eo88qc", "sec-ch-ua": '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "x-client-data": "CIS2yQEIorbJAQipncoBCMHbygEIk6HLAQiFoM0BCP6lzgEIvdXOAQjJ4M4BCIbizgEIu+fOAQjS6M4BCKzpzgE=", "x-rap-xsrf-token": "AImk1AIPFDSOkwsENSc2sLLwBANEib1ysQ:1744403672035", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", "Content-Type": "application/json", }
COOKIES = { "SID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVzyy0oo1gNvQbwZa53AWllwACgYKAVQSARMSFQHGX2MiUpZ3JZlRTzUS5L-5fPmgpBoVAUF8yKprZzPlNM_X83070Ct35bq_0076", "__Secure-1PSID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVKNqZm8qWU9ZC2DlRVPn0igACgYKAfQSARMSFQHGX2MiYGW66m_L_p1vlqiuEg-IrxoVAUF8yKrgRbA9tcA4VinZix0qlexX0076", "__Secure-3PSID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVnGF-cJAl9Wr1rJ-NOCW63wACgYKAZASARMSFQHGX2Mi1gYcRRnI8v2AdvofwvFG5BoVAUF8yKooSw-opCZ1-vxmkQXCB7330076", "HSID": "AJifEcy5MKbQSoqIz", "SSID": "AB87p4RIXK_USto4D", "APISID": "fQw9oTJbNnptFKdr/AIXxJuvoAk3qrIeWi", "SAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "__Secure-1PAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "__Secure-3PAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "AEC": "AVcja2e2MbzHDDL2yqJDMgUeDL-S3BEFB_yK293aRhQwshqv22Bd7IQ9mvg", "S": "maestro=IUwVdLn0rPZ27SY26uYPEonS9u1J9oQi8YzVJpqIc-w", "_gid": "GA1.3.1639311329.1744393279", "NID": "523=n5TR7moG8fZ7ZefkfCPQgx3aIuzhv_Vqk6CI0ah9HTWaoS-D4CtVwh6nIKaFY3hPLBPf_vf3THtu0nnJXc2ZaD5i3MpiIqOD8zBQbDuCp5G8rB3an7Jt7wdVuE8S3326_SF6FEBti9omWc2wNU43MbtEXSC7g4YP2sVzLohrGiUi_FHMIRDHj8OUKtDg9HjD3U3RzLRZk-IlyQyoRrwz0Ubf7IKxTsMvt57VWJvB2gqODqJJvaGWUdASl5lvPuTR7hlNkDyZwRC9rr-T2XtGUa10wUS_5bERpv6A2wTXXquQodkB25oDZvnhoS7FmO-SSAV-fweVu9tGlQ566w1yfBS0iANk8_iSCqhqFx6R1fyB9Nwxyf_g6ncw6nOmIHrtsIFiwAGJgwno2WRNey5nEgZbd5O-ew3z2K_aiPLvgeJCvY82L5swZctp7ZJtPFgGqXuAj3i6KtDhsTtizx0PbPpb7bvZ2nJ-SxlkR4s84fp_NxcXZKEdQfzOzoL6nAQMG9Kh28t_yKvcordnv25-55J_n_yzPDH78dTcLBzWS5dEcLP_Tt7HlSevSbP_2-NNoBGhM76vmMyIsMS0FucwPPKExrF6pwC3kc4g-4gruLlIWEHSnYodVcXQxD7Y2pD-0MBD7O8s-fCBDhlk1OgWfTHC1JnyrkUIMuqzj5-_LbuRIDPbN8YjVF0lO7jeyX0I9JjHHU3qc9EvhZ5LqpZKpRgTl5U4Prsmgq9K0o3nqvNoRMAbSTLX2kKGfhZKxtZT-Ezzrs_jLuZELFL8u98joYHS6v-guEvjD2Kg_XeWgRz3zAKBojSdtJnSuD7qjsnrV6IagTb8Yoazd7Gz3g-mlz8XTtSx1v7sStbFLnpMUlk", "_ga": "GA1.1.1275703331.1743917675", "_ga_LPCKLD3Z7X": "GS1.1.1744403669.9.1.1744403672.0.0.0", "RAP_XSRF_TOKEN": "AImk1AIPFDSOkwsENSc2sLLwBANEib1ysQ:1744403672035", "__Secure-1PSIDTS": "sidts-CjIB7pHpta3VWPdkJx7cNCUjUpy6c-d9WA3bvc3_9icXKqMA3kaW8Nh6_NJ-S4_OkQRZtRAA", "__Secure-3PSIDTS": "sidts-CjIB7pHpta3VWPdkJx7cNCUjUpy6c-d9WA3bvc3_9icXKqMA3kaW8Nh6_NJ-S4_OkQRZtRAA", "SIDCC": "AKEyXzUvhhICXSbNTp0dL7c4St5hDPC_ghsZ_PlMfsv7M8YVMlV8EibT-8IAsh7PmcPAtY38PJY", "__Secure-1PSIDCC": "AKEyXzUDzbYtACzfEgsC_j1S2ay0dayt4PwLSUoOjlS1xMBQ9FeL52NBdnlZBn_KWMM4a8jFaNv5", "__Secure-3PSIDCC": "AKEyXzVCxfyVIsm9apiA7JdTK3UKhcQUNLBOKQFxN8QDJyeLPojNFUBUs_K_X0xOR2vzZsBBEk5V", "_gat": "1", "_ga_S4FJY0X3VX": "GS1.1.1744401723.8.1.1744404147.0.0.0", }
payload_req1 = { "requestContext": {"reportContext": {"reportId": "1413fcfb-1416-4e56-8967-55f8e9f30ec8", "pageId": "p_pbm4eo88qc", "mode": 1, "componentId": "cd-rdkny9a9qc", "displayType": "simple-table", "actionId": "crossFilters"}, "requestMode": 0}, "datasetSpec": {"dataset": [{"datasourceId": "c2fc8cdd-46bb-454c-bf09-90ebfd4067d7", "revisionNumber": 0, "parameterOverrides": []}], "queryFields": [{"name": "qt_3nwfu9yq1c", "datasetNs": "d0", "tableNs": "t0", "resultTransformation": {"analyticalFunction": 0, "isRelativeToBase": False}, "dataTransformation": {"sourceFieldName": "_Event_Id_"}}, {"name": "qt_8yjok4izsc", "datasetNs": "d0", "tableNs": "t0", "resultTransformation": {"analyticalFunction": 0, "isRelativeToBase": False}, "dataTransformation": {"sourceFieldName": "_DateTime_EST_"}}, {"name": "qt_sfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, {"name": "qt_4e66idhbrc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, {"name": "qt_re76qrqe2c", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_District_", "aggregation": 0}}, {"name": "qt_tfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_County_Name_"}}, {"name": "qt_ufkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Route_Label_"}}, {"name": "qt_vfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_BMP_Initial_"}}, {"name": "qt_o7kc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_EMP_Initial_"}}, {"name": "qt_p7kc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Description_"}}], "sortData": [{"sortColumn": {"name": "qt_8yjok4izsc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_DateTime_EST_"}}, "sortDir": 1}], "includeRowsCount": True, "relatedDimensionMask": {"addDisplay": False, "addUniqueId": False, "addLatLong": False}, "paginateInfo": {"startRow": 1, "rowsCount": 500}, "dsFilterOverrides": [], "filters": [{"filterDefinition": {"filterExpression": {"include": False, "conceptType": 0, "concept": {"ns": "t0", "name": "qt_exs3vib9qc"}, "filterConditionType": "PT", "stringValues": ["Shoulder"], "numberValues": [], "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Source_Type_"}}}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_vwmdhfhbrc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, "filterConditionType": "IN", "stringValues": ["KYTC Reported"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_kjyfx83arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_County_Name_"}}, "filterConditionType": "IN", "stringValues": ["Boyd", "Carter", "Fleming", "Greenup"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_uiren73arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, "filterConditionType": "IN", "stringValues": ["Weather"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}], "features": [], "dateRanges": [], "contextNsCount": 1, "calculatedField": [], "needGeocoding": False, "geoFieldMask": [], "multipleGeocodeFields": [], "timezone": "America/New_York"}, "role": "main", "retryHints": {"useClientControlledRetry": True, "isLastRetry": False, "retryCount": 0, "originalRequestId": "cd-rdkny9a9qc_0_0"} }
payload_req2 = { "requestContext": {"reportContext": {"reportId": "1413fcfb-1416-4e56-8967-55f8e9f30ec8", "pageId": "p_pbm4eo88qc", "mode": 1, "componentId": "cd-3aflvp88qc", "displayType": "google-map", "actionId": "crossFilters"}, "requestMode": 0}, "datasetSpec": {"dataset": [{"datasourceId": "c2fc8cdd-46bb-454c-bf09-90ebfd4067d7", "revisionNumber": 0, "parameterOverrides": []}], "queryFields": [{"name": "qt_pbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Id_"}}, {"name": "qt_qbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, {"name": "qt_7grvqe4arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "qt_7grvqe4arc", "textFormula": "CONCAT(t0._KYTC_Type_, \" - \", t0._Route_Label_, \" - \", t0._BMP_Initial_)", "sourceType": 1, "frontendTextFormula": "CONCAT(t0._KYTC_Type_,\" - \",t0._Route_Label_,\" - \",t0._BMP_Initial_)", "formulaOutputDataType": 0}}, {"name": "qt_5prae63arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_SHAPE_", "aggregation": 16, "outputGeoType": 1}}], "sortData": [{"sortColumn": {"name": "qt_pbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Id_"}}, "sortDir": 1}], "includeRowsCount": True, "relatedDimensionMask": {"addDisplay": False, "addUniqueId": False, "addLatLong": True}, "paginateInfo": {"startRow": 1, "rowsCount": 100000}, "dsFilterOverrides": [], "filters": [{"filterDefinition": {"filterExpression": {"include": False, "conceptType": 0, "concept": {"ns": "t0", "name": "qt_exs3vib9qc"}, "filterConditionType": "PT", "stringValues": ["Shoulder"], "numberValues": [], "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Source_Type_"}}}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_vwmdhfhbrc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, "filterConditionType": "IN", "stringValues": ["KYTC Reported"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_kjyfx83arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_County_Name_"}}, "filterConditionType": "IN", "stringValues": ["Boyd", "Carter", "Fleming", "Greenup"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_uiren73arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, "filterConditionType": "IN", "stringValues": ["Weather"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}], "features": [], "dateRanges": [], "contextNsCount": 1, "calculatedField": [], "needGeocoding": False, "geoFieldMask": [], "geoVertices": 100000, "multipleGeocodeFields": [], "timezone": "America/New_York"}, "role": "main", "retryHints": {"useClientControlledRetry": True, "isLastRetry": False, "retryCount": 0, "originalRequestId": "cd-3aflvp88qc_0_0"} }
combined_payload = {"dataRequest": [payload_req1, payload_req2]}
# --- Key Mappings and Constants ---
KEY_RENAME_MAP = { 'qt_3nwfu9yq1c': 'id', 'qt_8yjok4izsc': 'dtg', 'qt_4e66idhbrc': 'source', 'qt_ufkc163arc': 'route', 'qt_p7kc163arc': 'remark' }
COORDINATE_KEY_OLD = 'qt_5prae63arc'
LONGITUDE_KEY_NEW = 'lon'
LATITUDE_KEY_NEW = 'lat'
BMP_KEY_ORIGINAL = 'qt_vfkc163arc'
COUNTY_KEY_ORIGINAL = 'qt_tfkc163arc'
# --- Define the US Eastern Timezone ---
# This timezone correctly handles switches between EST and EDT
eastern_tz = pytz.timezone('America/New_York')
# --- Helper Function (Column-to-Row) ---
# ... (process_table_dataset function remains the same) ...
def process_table_dataset(table_dataset):
if not table_dataset: return []
try:
column_info = table_dataset.get('columnInfo', [])
columns_data = table_dataset.get('column', [])
num_records = table_dataset.get('totalCount', 0)
if num_records == 0 and columns_data:
first_col_data = columns_data[0]
data_key = next((k for k in first_col_data if isinstance(first_col_data[k], dict) and 'values' in first_col_data[k]), None)
if data_key: num_records = len(first_col_data[data_key].get('values', []))
if not column_info or not columns_data or num_records == 0: return []
column_names = [info.get('name', f'unknown_col_{i}') for i, info in enumerate(column_info)]
processed_events = []
for i in range(num_records):
event = {}
for j, col_name in enumerate(column_names):
if j >= len(columns_data): event[col_name] = None; continue
col_data_dict = columns_data[j]
value = None
data_key = next((k for k in col_data_dict if isinstance(col_data_dict[k], dict) and 'values' in col_data_dict[k]), None)
if data_key:
values_list = col_data_dict[data_key].get('values', [])
if i < len(values_list): value = values_list[i]
else: value = None
else: value = None
event[col_name] = value
processed_events.append(event)
return processed_events
except Exception as e: print(f"Error processing table dataset: {e}"); traceback.print_exc(); return []
# --- Data Fetching and Processing Function ---
def fetch_and_process_data():
print("Sending combined request...")
try:
response = requests.post(URL, headers=HEADERS, cookies=COOKIES, json=combined_payload)
response.raise_for_status()
print(f"Request successful! Status Code: {response.status_code}")
cleaned_text = response.text.lstrip(")]}'")
response_data = json.loads(cleaned_text)
table_data_list, map_data_list = [], []
if 'dataResponse' in response_data and isinstance(response_data['dataResponse'], list) and len(response_data['dataResponse']) >= 2:
try:
table_subset = response_data['dataResponse'][0].get('dataSubset', [{}])[0]
table_dataset = table_subset.get('dataset', {}).get('tableDataset')
if table_dataset: table_data_list = process_table_dataset(table_dataset)
except IndexError: pass
try:
map_subset = response_data['dataResponse'][1].get('dataSubset', [{}])[0]
map_dataset = map_subset.get('dataset', {}).get('tableDataset')
if map_dataset: map_data_list = process_table_dataset(map_dataset)
except IndexError: pass
else: print(f"Error: Expected 2 'dataResponse' items."); return []
if not table_data_list or not map_data_list: print("Error: Failed to process datasets."); return []
print(f"\nMerging {len(table_data_list)} table events and {len(map_data_list)} map events...")
merged_events_dict = {}
for event1 in table_data_list:
event_type = event1.get('qt_sfkc163arc')
route_label = event1.get('qt_ufkc163arc')
bmp_value = event1.get(BMP_KEY_ORIGINAL)
if event_type is not None and route_label is not None and bmp_value is not None:
bmp_str = str(int(bmp_value)) if isinstance(bmp_value, float) and bmp_value.is_integer() else str(bmp_value)
join_key = f"{event_type} - {route_label} - {bmp_str}"
merged_events_dict[join_key] = event1.copy()
merge_success_count = 0
for event2 in map_data_list:
join_key = event2.get('qt_7grvqe4arc')
if join_key in merged_events_dict:
existing_event = merged_events_dict[join_key]
for key, value in event2.items():
if key == COORDINATE_KEY_OLD:
if isinstance(value, str) and value.strip().startswith('{'):
try:
geo_data = json.loads(value)
if geo_data.get('type') == 'Point' and 'coordinates' in geo_data: existing_event[key] = tuple(geo_data['coordinates'])
else: existing_event[key] = value
except json.JSONDecodeError: existing_event[key] = value
else: existing_event[key] = value
elif key not in existing_event: existing_event[key] = value
merge_success_count += 1
#print(f"-> Merged data for {merge_success_count} events.")
#print("\nApplying final transformations (including timezone handling)...") # <--- Updated print message
final_transformed_list = []
parse_error_count = 0
# Define potential datetime formats from Looker Studio (add more if needed)
# Common formats:
# %m/%d/%Y %I:%M:%S %p (e.g., 05/20/2024 02:30:00 PM)
# %Y-%m-%d %H:%M:%S (e.g., 2024-05-20 14:30:00)
# %Y%m%d%H%M%S (e.g., 20240520143000) - Less common for display but possible
possible_dt_formats = [
'%m/%d/%Y %I:%M:%S %p', # e.g., 05/20/2024 02:30:00 PM
'%Y-%m-%d %H:%M:%S', # e.g., 2024-05-20 14:30:00
'%Y-%m-%dT%H:%M:%S', # <--- ADD THIS FORMAT (e.g., 2025-04-12T06:16:06)
# Add other formats if you observe them in the data
]
for event in merged_events_dict.values():
transformed_event = {}
aware_dtg = None # Initialize dtg as None
try:
# --- Timezone Handling for 'dtg' ---
raw_dtg_value = event.get('qt_8yjok4izsc')
if raw_dtg_value:
parsed_naive = None
for fmt in possible_dt_formats:
try:
# Attempt to parse using the current format
parsed_naive = datetime.strptime(str(raw_dtg_value), fmt)
break # Stop trying formats if one succeeds
except ValueError:
continue # Try the next format
if parsed_naive:
# Successfully parsed, now make it timezone-aware (ET)
aware_dtg = eastern_tz.localize(parsed_naive, is_dst=None) # is_dst=None handles ambiguous times
# print(f"DEBUG: Raw: {raw_dtg_value}, Parsed Naive: {parsed_naive}, Aware ET: {aware_dtg}, UTC: {aware_dtg.astimezone(pytz.utc)}") # Optional debug print
else:
# Could not parse with any known format
print(f"Warning: Could not parse dtg value '{raw_dtg_value}' for event ID {event.get('qt_3nwfu9yq1c')} using known formats.")
parse_error_count += 1
else:
print(f"Warning: Missing dtg value for event ID {event.get('qt_3nwfu9yq1c')}.")
parse_error_count += 1
# --- Standard Key Renaming and Data Extraction ---
for old_key, new_key in KEY_RENAME_MAP.items():
if old_key == 'qt_8yjok4izsc':
transformed_event[new_key] = aware_dtg # Assign the aware datetime object or None
elif old_key in event:
transformed_event[new_key] = event[old_key]
if COORDINATE_KEY_OLD in event:
coordinates = event[COORDINATE_KEY_OLD]
if isinstance(coordinates, (list, tuple)) and len(coordinates) >= 2:
try:
transformed_event[LONGITUDE_KEY_NEW] = float(coordinates[0])
transformed_event[LATITUDE_KEY_NEW] = float(coordinates[1])
except (TypeError, ValueError): pass # Keep lon/lat None if conversion fails
keys_to_keep = [BMP_KEY_ORIGINAL, COUNTY_KEY_ORIGINAL]
for key in keys_to_keep:
if key in event: transformed_event[key] = event[key]
# Ensure source is added if not already mapped
if 'source' not in transformed_event and 'qt_4e66idhbrc' in event:
transformed_event['source'] = event['qt_4e66idhbrc']
# Only add event if it has an ID (required for DB)
if transformed_event.get('id'):
final_transformed_list.append(transformed_event)
else:
print(f"Warning: Skipping event due to missing ID. Original data snippet: {str(event)[:100]}")
except Exception as e: print(f"Error during final transformation for event: {e}"); traceback.print_exc()
if parse_error_count > 0:
print(f"-> Encountered {parse_error_count} datetime parsing issues.")
#print(f"-> Finished transformation. Resulting valid events: {len(final_transformed_list)}.")
return final_transformed_list
except requests.exceptions.RequestException as e:
print(f"API Request failed: {e}")
except json.JSONDecodeError as e:
print(f"Failed to decode API JSON response: {e}")
if 'cleaned_text' in locals():
print("Raw response snippet:", cleaned_text[:500])
except Exception as e:
print(f"An unexpected error occurred during data fetching/processing: {e}")
traceback.print_exc()
return []
# --- Database Upsert and Post-Update Function ---
def upsert_and_update_db(events):
"""Upserts events and runs post-update spatial queries."""
if not events:
print("No events to process.")
return
conn = None
cursor = None
upserted_count = 0
upsert_error_count = 0
# SQL definitions (NO COUNTY in upsert)
# Ensure first_seen and last_seen_in_feed columns are TIMESTAMPTZ
# psycopg2 handles timezone-aware datetime objects correctly for TIMESTAMPTZ
sql_upsert = f"""
INSERT INTO {TABLE_NAME_QUALIFIED}
(id, first_seen, initial_description, latest_description, last_updated, last_seen_in_feed, geom, source, route, remark)
VALUES
(%(id)s, %(first_seen)s, %(initial_desc)s, %(latest_desc)s, NOW(), %(last_seen)s, ST_SetSRID(ST_MakePoint(%(lon)s, %(lat)s), 4326), %(source)s, %(route)s, %(remark)s)
ON CONFLICT (id) DO UPDATE SET
latest_description = excluded.latest_description,
last_updated = NOW(),
last_seen_in_feed = excluded.last_seen_in_feed,
geom = excluded.geom,
source = excluded.source,
route = excluded.route,
remark = excluded.remark,
-- Keep original first_seen and initial_description on update
first_seen = {TABLE_NAME_QUALIFIED}.first_seen,
initial_description = {TABLE_NAME_QUALIFIED}.initial_description;
"""
sql_upsert_no_geom = f"""
INSERT INTO {TABLE_NAME_QUALIFIED}
(id, first_seen, initial_description, latest_description, last_updated, last_seen_in_feed, geom, source, route, remark)
VALUES
(%(id)s, %(first_seen)s, %(initial_desc)s, %(latest_desc)s, NOW(), %(last_seen)s, NULL, %(source)s, %(route)s, %(remark)s)
ON CONFLICT (id) DO UPDATE SET
latest_description = excluded.latest_description,
last_updated = NOW(),
last_seen_in_feed = excluded.last_seen_in_feed,
geom = NULL, -- Keep geom NULL on update if it was NULL
source = excluded.source,
route = excluded.route,
remark = excluded.remark,
-- Keep original first_seen and initial_description on update
first_seen = {TABLE_NAME_QUALIFIED}.first_seen,
initial_description = {TABLE_NAME_QUALIFIED}.initial_description;
"""
# Post-update SQL definitions (remain the same)
post_update_sqls = [
f'UPDATE {TABLE_NAME_QUALIFIED} SET county = county.countyname from public.county WHERE ST_Contains(county.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.county IS NULL', # Optional: only update if null
f'UPDATE {TABLE_NAME_QUALIFIED} SET cwa = fzone.cwa from public.fzone WHERE ST_Contains(fzone.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.cwa IS NULL', # Optional: only update if null
f'UPDATE {TABLE_NAME_QUALIFIED} SET st = county.state from public.county WHERE ST_Contains(county.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.st IS NULL' # Optional: only update if null
]
try:
#print(f"\nConnecting to PostgreSQL database '{DB_NAME}' on {DB_HOST}...")
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
cursor = conn.cursor()
#print("Connection successful.")
# --- Stage 1: Upsert Events ---
#print("Starting upsert stage...")
for event in events:
try:
event_id = event.get('id')
if not event_id:
upsert_error_count += 1
#print("Skipping event due to missing ID in upsert stage.")
continue
# Get the timezone-aware datetime object (or None)
dtg_aware = event.get('dtg')
lon_val = event.get('lon'); lat_val = event.get('lat')
route_val = event.get('route'); remark_val = event.get('remark')
bmp_val = event.get(BMP_KEY_ORIGINAL)
desc_parts = []
if route_val: desc_parts.append(route_val)
if bmp_val is not None: desc_parts.append(f"at MM {str(int(bmp_val)) if isinstance(bmp_val, float) and bmp_val.is_integer() else str(bmp_val)}")
if remark_val: desc_parts.append(remark_val)
full_desc = " ".join(desc_parts) if desc_parts else None
data_dict = {
'id': event_id,
# Pass the aware datetime object directly. If it's None, psycopg2 handles it as NULL.
'first_seen': dtg_aware,
'initial_desc': full_desc,
'latest_desc': full_desc,
'last_seen': dtg_aware,
'lon': lon_val,
'lat': lat_val,
'source': event.get('source'),
'route': route_val,
'remark': remark_val
}
# Choose the correct SQL based on geometry presence
if lon_val is not None and lat_val is not None:
cursor.execute(sql_upsert, data_dict)
else:
cursor.execute(sql_upsert_no_geom, data_dict)
upserted_count += 1
except psycopg2.Error as db_err:
upsert_error_count += 1
print(f"DB upsert error ID '{event_id}': {db_err}")
conn.rollback() # Rollback this event
except Exception as e:
upsert_error_count += 1
print(f"Prep/Exec error ID '{event_id}': {e}")
traceback.print_exc()
# No rollback here needed as the error is before execute or handled by psycopg2 block
# Commit upsert stage (committing successful ones)
if upsert_error_count > 0:
print(f"Upsert stage completed with {upsert_error_count} errors for individual events (rolled back individually).")
#print(f"Committing {upserted_count} successful upserts...")
conn.commit()
#print(f"Upsert stage committed.")
# --- Stage 2: Post-Update Spatial Queries ---
#print("\nRunning post-upsert spatial updates...")
post_update_errors = 0
for i, update_sql in enumerate(post_update_sqls, 1):
#print(f"Executing post-update {i}/{len(post_update_sqls)}: {update_sql[:100]}...") # Print start of query
try:
cursor.execute(update_sql)
#print(f" -> Update {i} successful ({cursor.rowcount} rows affected).")
# Commit each successful post-update step individually
conn.commit()
#print(f" Committed post-update step {i}.")
except psycopg2.Error as post_db_err:
post_update_errors += 1
print(f" -> Database error executing post-update {i}: {post_db_err}")
print(f" Failed SQL: {update_sql}")
conn.rollback() # Rollback this specific update attempt
print(" Rolled back post-update transaction attempt.")
# Decide if script should stop entirely on post-update failure
# break # Optional: Stop processing further post-updates if one fails
except Exception as post_e:
post_update_errors += 1
print(f" -> Unexpected error executing post-update {i}: {post_e}")
print(f" Failed SQL: {update_sql}")
conn.rollback()
print(" Rolled back post-update transaction attempt.")
# break # Optional: Stop processing further post-updates if one fails
if post_update_errors == 0:
print("Post-upsert updates completed successfully.")
else:
print(f"Post-upsert updates encountered {post_update_errors} errors (each failed step was rolled back).")
except psycopg2.OperationalError as e:
print(f"Database connection failed: {e}")
except Exception as e:
print(f"An unexpected error occurred during database operations: {e}")
traceback.print_exc()
if conn:
conn.rollback() # Rollback any outstanding transaction on major error
finally:
if cursor: cursor.close()
if conn: conn.close()
print("Database connection closed.")
# --- Script Entry Point ---
if __name__ == "__main__":
processed_events = fetch_and_process_data()
if processed_events: # Only run DB operations if we got data
upsert_and_update_db(processed_events)
else:
print("No processed events found, skipping database operations.")
print("\nScript finished.")