import requests import json import traceback # For detailed error reporting import psycopg2 # For PostgreSQL interaction from psycopg2 import extras import os from datetime import datetime import pytz # <--- Import pytz for timezone handling # --- Database Configuration (Use Environment Variables!) --- DB_NAME = os.environ.get("PG_DBNAME", "nws") DB_USER = os.environ.get("PG_USER", "nws") DB_PASSWORD = os.environ.get("PG_PASSWORD", "nws") DB_HOST = os.environ.get("PG_HOST", "localhost") DB_PORT = os.environ.get("PG_PORT", "5432") DB_SCHEMA = "ky511" # Specify schema if used, otherwise set to None or remove usage DB_TABLE = "ky511" TABLE_NAME_QUALIFIED = f"{DB_SCHEMA}.{DB_TABLE}" if DB_SCHEMA else DB_TABLE # --- Looker Studio API Configuration --- # ... (URL, HEADERS, COOKIES, PAYLOADS remain the same - ensure they are valid/fresh) ... URL = "https://lookerstudio.google.com/batchedDataV2?appVersion=20250324_0406" # HEADERS, COOKIES, payload_req1, payload_req2, combined_payload - keep as provided # (Make sure HEADERS/COOKIES are up-to-date for authentication) HEADERS = { "authority": "lookerstudio.google.com", "method": "POST", "path": "/batchedDataV2?appVersion=20250324_0406", "scheme": "https", "accept": "application/json, text/plain, */*", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "en-US,en;q=0.9", "cache-control": "no-cache", "origin": "https://lookerstudio.google.com", "pragma": "no-cache", "priority": "u=1, i", "referer": "https://lookerstudio.google.com/reporting/1413fcfb-1416-4e56-8967-55f8e9f30ec8/page/p_pbm4eo88qc", "sec-ch-ua": '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "x-client-data": "CIS2yQEIorbJAQipncoBCMHbygEIk6HLAQiFoM0BCP6lzgEIvdXOAQjJ4M4BCIbizgEIu+fOAQjS6M4BCKzpzgE=", "x-rap-xsrf-token": "AImk1AIPFDSOkwsENSc2sLLwBANEib1ysQ:1744403672035", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", "Content-Type": "application/json", } COOKIES = { "SID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVzyy0oo1gNvQbwZa53AWllwACgYKAVQSARMSFQHGX2MiUpZ3JZlRTzUS5L-5fPmgpBoVAUF8yKprZzPlNM_X83070Ct35bq_0076", "__Secure-1PSID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVKNqZm8qWU9ZC2DlRVPn0igACgYKAfQSARMSFQHGX2MiYGW66m_L_p1vlqiuEg-IrxoVAUF8yKrgRbA9tcA4VinZix0qlexX0076", "__Secure-3PSID": "g.a000vwi83sZx47d1q9Po8BgPR3jBX7lpsrzWSuPIKyp6RycUEctVnGF-cJAl9Wr1rJ-NOCW63wACgYKAZASARMSFQHGX2Mi1gYcRRnI8v2AdvofwvFG5BoVAUF8yKooSw-opCZ1-vxmkQXCB7330076", "HSID": "AJifEcy5MKbQSoqIz", "SSID": "AB87p4RIXK_USto4D", "APISID": "fQw9oTJbNnptFKdr/AIXxJuvoAk3qrIeWi", "SAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "__Secure-1PAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "__Secure-3PAPISID": "3zFHAXrGoL_XNe0X/A7AafTXBL2NRj7N2h", "AEC": "AVcja2e2MbzHDDL2yqJDMgUeDL-S3BEFB_yK293aRhQwshqv22Bd7IQ9mvg", "S": "maestro=IUwVdLn0rPZ27SY26uYPEonS9u1J9oQi8YzVJpqIc-w", "_gid": "GA1.3.1639311329.1744393279", "NID": "523=n5TR7moG8fZ7ZefkfCPQgx3aIuzhv_Vqk6CI0ah9HTWaoS-D4CtVwh6nIKaFY3hPLBPf_vf3THtu0nnJXc2ZaD5i3MpiIqOD8zBQbDuCp5G8rB3an7Jt7wdVuE8S3326_SF6FEBti9omWc2wNU43MbtEXSC7g4YP2sVzLohrGiUi_FHMIRDHj8OUKtDg9HjD3U3RzLRZk-IlyQyoRrwz0Ubf7IKxTsMvt57VWJvB2gqODqJJvaGWUdASl5lvPuTR7hlNkDyZwRC9rr-T2XtGUa10wUS_5bERpv6A2wTXXquQodkB25oDZvnhoS7FmO-SSAV-fweVu9tGlQ566w1yfBS0iANk8_iSCqhqFx6R1fyB9Nwxyf_g6ncw6nOmIHrtsIFiwAGJgwno2WRNey5nEgZbd5O-ew3z2K_aiPLvgeJCvY82L5swZctp7ZJtPFgGqXuAj3i6KtDhsTtizx0PbPpb7bvZ2nJ-SxlkR4s84fp_NxcXZKEdQfzOzoL6nAQMG9Kh28t_yKvcordnv25-55J_n_yzPDH78dTcLBzWS5dEcLP_Tt7HlSevSbP_2-NNoBGhM76vmMyIsMS0FucwPPKExrF6pwC3kc4g-4gruLlIWEHSnYodVcXQxD7Y2pD-0MBD7O8s-fCBDhlk1OgWfTHC1JnyrkUIMuqzj5-_LbuRIDPbN8YjVF0lO7jeyX0I9JjHHU3qc9EvhZ5LqpZKpRgTl5U4Prsmgq9K0o3nqvNoRMAbSTLX2kKGfhZKxtZT-Ezzrs_jLuZELFL8u98joYHS6v-guEvjD2Kg_XeWgRz3zAKBojSdtJnSuD7qjsnrV6IagTb8Yoazd7Gz3g-mlz8XTtSx1v7sStbFLnpMUlk", "_ga": "GA1.1.1275703331.1743917675", "_ga_LPCKLD3Z7X": "GS1.1.1744403669.9.1.1744403672.0.0.0", "RAP_XSRF_TOKEN": "AImk1AIPFDSOkwsENSc2sLLwBANEib1ysQ:1744403672035", "__Secure-1PSIDTS": "sidts-CjIB7pHpta3VWPdkJx7cNCUjUpy6c-d9WA3bvc3_9icXKqMA3kaW8Nh6_NJ-S4_OkQRZtRAA", "__Secure-3PSIDTS": "sidts-CjIB7pHpta3VWPdkJx7cNCUjUpy6c-d9WA3bvc3_9icXKqMA3kaW8Nh6_NJ-S4_OkQRZtRAA", "SIDCC": "AKEyXzUvhhICXSbNTp0dL7c4St5hDPC_ghsZ_PlMfsv7M8YVMlV8EibT-8IAsh7PmcPAtY38PJY", "__Secure-1PSIDCC": "AKEyXzUDzbYtACzfEgsC_j1S2ay0dayt4PwLSUoOjlS1xMBQ9FeL52NBdnlZBn_KWMM4a8jFaNv5", "__Secure-3PSIDCC": "AKEyXzVCxfyVIsm9apiA7JdTK3UKhcQUNLBOKQFxN8QDJyeLPojNFUBUs_K_X0xOR2vzZsBBEk5V", "_gat": "1", "_ga_S4FJY0X3VX": "GS1.1.1744401723.8.1.1744404147.0.0.0", } payload_req1 = { "requestContext": {"reportContext": {"reportId": "1413fcfb-1416-4e56-8967-55f8e9f30ec8", "pageId": "p_pbm4eo88qc", "mode": 1, "componentId": "cd-rdkny9a9qc", "displayType": "simple-table", "actionId": "crossFilters"}, "requestMode": 0}, "datasetSpec": {"dataset": [{"datasourceId": "c2fc8cdd-46bb-454c-bf09-90ebfd4067d7", "revisionNumber": 0, "parameterOverrides": []}], "queryFields": [{"name": "qt_3nwfu9yq1c", "datasetNs": "d0", "tableNs": "t0", "resultTransformation": {"analyticalFunction": 0, "isRelativeToBase": False}, "dataTransformation": {"sourceFieldName": "_Event_Id_"}}, {"name": "qt_8yjok4izsc", "datasetNs": "d0", "tableNs": "t0", "resultTransformation": {"analyticalFunction": 0, "isRelativeToBase": False}, "dataTransformation": {"sourceFieldName": "_DateTime_EST_"}}, {"name": "qt_sfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, {"name": "qt_4e66idhbrc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, {"name": "qt_re76qrqe2c", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_District_", "aggregation": 0}}, {"name": "qt_tfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_County_Name_"}}, {"name": "qt_ufkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Route_Label_"}}, {"name": "qt_vfkc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_BMP_Initial_"}}, {"name": "qt_o7kc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_EMP_Initial_"}}, {"name": "qt_p7kc163arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Description_"}}], "sortData": [{"sortColumn": {"name": "qt_8yjok4izsc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_DateTime_EST_"}}, "sortDir": 1}], "includeRowsCount": True, "relatedDimensionMask": {"addDisplay": False, "addUniqueId": False, "addLatLong": False}, "paginateInfo": {"startRow": 1, "rowsCount": 500}, "dsFilterOverrides": [], "filters": [{"filterDefinition": {"filterExpression": {"include": False, "conceptType": 0, "concept": {"ns": "t0", "name": "qt_exs3vib9qc"}, "filterConditionType": "PT", "stringValues": ["Shoulder"], "numberValues": [], "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Source_Type_"}}}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_vwmdhfhbrc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, "filterConditionType": "IN", "stringValues": ["KYTC Reported"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_kjyfx83arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_County_Name_"}}, "filterConditionType": "IN", "stringValues": ["Boyd", "Carter", "Fleming", "Greenup"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_uiren73arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, "filterConditionType": "IN", "stringValues": ["Weather"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}], "features": [], "dateRanges": [], "contextNsCount": 1, "calculatedField": [], "needGeocoding": False, "geoFieldMask": [], "multipleGeocodeFields": [], "timezone": "America/New_York"}, "role": "main", "retryHints": {"useClientControlledRetry": True, "isLastRetry": False, "retryCount": 0, "originalRequestId": "cd-rdkny9a9qc_0_0"} } payload_req2 = { "requestContext": {"reportContext": {"reportId": "1413fcfb-1416-4e56-8967-55f8e9f30ec8", "pageId": "p_pbm4eo88qc", "mode": 1, "componentId": "cd-3aflvp88qc", "displayType": "google-map", "actionId": "crossFilters"}, "requestMode": 0}, "datasetSpec": {"dataset": [{"datasourceId": "c2fc8cdd-46bb-454c-bf09-90ebfd4067d7", "revisionNumber": 0, "parameterOverrides": []}], "queryFields": [{"name": "qt_pbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Id_"}}, {"name": "qt_qbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, {"name": "qt_7grvqe4arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "qt_7grvqe4arc", "textFormula": "CONCAT(t0._KYTC_Type_, \" - \", t0._Route_Label_, \" - \", t0._BMP_Initial_)", "sourceType": 1, "frontendTextFormula": "CONCAT(t0._KYTC_Type_,\" - \",t0._Route_Label_,\" - \",t0._BMP_Initial_)", "formulaOutputDataType": 0}}, {"name": "qt_5prae63arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_SHAPE_", "aggregation": 16, "outputGeoType": 1}}], "sortData": [{"sortColumn": {"name": "qt_pbotw53arc", "datasetNs": "d0", "tableNs": "t0", "dataTransformation": {"sourceFieldName": "_Incident_Id_"}}, "sortDir": 1}], "includeRowsCount": True, "relatedDimensionMask": {"addDisplay": False, "addUniqueId": False, "addLatLong": True}, "paginateInfo": {"startRow": 1, "rowsCount": 100000}, "dsFilterOverrides": [], "filters": [{"filterDefinition": {"filterExpression": {"include": False, "conceptType": 0, "concept": {"ns": "t0", "name": "qt_exs3vib9qc"}, "filterConditionType": "PT", "stringValues": ["Shoulder"], "numberValues": [], "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Source_Type_"}}}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_vwmdhfhbrc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_Incident_Source_"}}, "filterConditionType": "IN", "stringValues": ["KYTC Reported"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_kjyfx83arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_County_Name_"}}, "filterConditionType": "IN", "stringValues": ["Boyd", "Carter", "Fleming", "Greenup"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}, {"filterDefinition": {"filterExpression": {"include": True, "conceptType": 0, "concept": {"name": "qt_uiren73arc", "ns": "t0"}, "queryTimeTransformation": {"dataTransformation": {"sourceFieldName": "_KYTC_Type_"}}, "filterConditionType": "IN", "stringValues": ["Weather"]}}, "dataSubsetNs": {"datasetNs": "d0", "tableNs": "t0", "contextNs": "c0"}, "version": 3, "isCanvasFilter": True}], "features": [], "dateRanges": [], "contextNsCount": 1, "calculatedField": [], "needGeocoding": False, "geoFieldMask": [], "geoVertices": 100000, "multipleGeocodeFields": [], "timezone": "America/New_York"}, "role": "main", "retryHints": {"useClientControlledRetry": True, "isLastRetry": False, "retryCount": 0, "originalRequestId": "cd-3aflvp88qc_0_0"} } combined_payload = {"dataRequest": [payload_req1, payload_req2]} # --- Key Mappings and Constants --- KEY_RENAME_MAP = { 'qt_3nwfu9yq1c': 'id', 'qt_8yjok4izsc': 'dtg', 'qt_4e66idhbrc': 'source', 'qt_ufkc163arc': 'route', 'qt_p7kc163arc': 'remark' } COORDINATE_KEY_OLD = 'qt_5prae63arc' LONGITUDE_KEY_NEW = 'lon' LATITUDE_KEY_NEW = 'lat' BMP_KEY_ORIGINAL = 'qt_vfkc163arc' COUNTY_KEY_ORIGINAL = 'qt_tfkc163arc' # --- Define the US Eastern Timezone --- # This timezone correctly handles switches between EST and EDT eastern_tz = pytz.timezone('America/New_York') # --- Helper Function (Column-to-Row) --- # ... (process_table_dataset function remains the same) ... def process_table_dataset(table_dataset): if not table_dataset: return [] try: column_info = table_dataset.get('columnInfo', []) columns_data = table_dataset.get('column', []) num_records = table_dataset.get('totalCount', 0) if num_records == 0 and columns_data: first_col_data = columns_data[0] data_key = next((k for k in first_col_data if isinstance(first_col_data[k], dict) and 'values' in first_col_data[k]), None) if data_key: num_records = len(first_col_data[data_key].get('values', [])) if not column_info or not columns_data or num_records == 0: return [] column_names = [info.get('name', f'unknown_col_{i}') for i, info in enumerate(column_info)] processed_events = [] for i in range(num_records): event = {} for j, col_name in enumerate(column_names): if j >= len(columns_data): event[col_name] = None; continue col_data_dict = columns_data[j] value = None data_key = next((k for k in col_data_dict if isinstance(col_data_dict[k], dict) and 'values' in col_data_dict[k]), None) if data_key: values_list = col_data_dict[data_key].get('values', []) if i < len(values_list): value = values_list[i] else: value = None else: value = None event[col_name] = value processed_events.append(event) return processed_events except Exception as e: print(f"Error processing table dataset: {e}"); traceback.print_exc(); return [] # --- Data Fetching and Processing Function --- def fetch_and_process_data(): print("Sending combined request...") try: response = requests.post(URL, headers=HEADERS, cookies=COOKIES, json=combined_payload) response.raise_for_status() print(f"Request successful! Status Code: {response.status_code}") cleaned_text = response.text.lstrip(")]}'") response_data = json.loads(cleaned_text) table_data_list, map_data_list = [], [] if 'dataResponse' in response_data and isinstance(response_data['dataResponse'], list) and len(response_data['dataResponse']) >= 2: try: table_subset = response_data['dataResponse'][0].get('dataSubset', [{}])[0] table_dataset = table_subset.get('dataset', {}).get('tableDataset') if table_dataset: table_data_list = process_table_dataset(table_dataset) except IndexError: pass try: map_subset = response_data['dataResponse'][1].get('dataSubset', [{}])[0] map_dataset = map_subset.get('dataset', {}).get('tableDataset') if map_dataset: map_data_list = process_table_dataset(map_dataset) except IndexError: pass else: print(f"Error: Expected 2 'dataResponse' items."); return [] if not table_data_list or not map_data_list: print("Error: Failed to process datasets."); return [] print(f"\nMerging {len(table_data_list)} table events and {len(map_data_list)} map events...") merged_events_dict = {} for event1 in table_data_list: event_type = event1.get('qt_sfkc163arc') route_label = event1.get('qt_ufkc163arc') bmp_value = event1.get(BMP_KEY_ORIGINAL) if event_type is not None and route_label is not None and bmp_value is not None: bmp_str = str(int(bmp_value)) if isinstance(bmp_value, float) and bmp_value.is_integer() else str(bmp_value) join_key = f"{event_type} - {route_label} - {bmp_str}" merged_events_dict[join_key] = event1.copy() merge_success_count = 0 for event2 in map_data_list: join_key = event2.get('qt_7grvqe4arc') if join_key in merged_events_dict: existing_event = merged_events_dict[join_key] for key, value in event2.items(): if key == COORDINATE_KEY_OLD: if isinstance(value, str) and value.strip().startswith('{'): try: geo_data = json.loads(value) if geo_data.get('type') == 'Point' and 'coordinates' in geo_data: existing_event[key] = tuple(geo_data['coordinates']) else: existing_event[key] = value except json.JSONDecodeError: existing_event[key] = value else: existing_event[key] = value elif key not in existing_event: existing_event[key] = value merge_success_count += 1 #print(f"-> Merged data for {merge_success_count} events.") #print("\nApplying final transformations (including timezone handling)...") # <--- Updated print message final_transformed_list = [] parse_error_count = 0 # Define potential datetime formats from Looker Studio (add more if needed) # Common formats: # %m/%d/%Y %I:%M:%S %p (e.g., 05/20/2024 02:30:00 PM) # %Y-%m-%d %H:%M:%S (e.g., 2024-05-20 14:30:00) # %Y%m%d%H%M%S (e.g., 20240520143000) - Less common for display but possible possible_dt_formats = [ '%m/%d/%Y %I:%M:%S %p', # e.g., 05/20/2024 02:30:00 PM '%Y-%m-%d %H:%M:%S', # e.g., 2024-05-20 14:30:00 '%Y-%m-%dT%H:%M:%S', # <--- ADD THIS FORMAT (e.g., 2025-04-12T06:16:06) # Add other formats if you observe them in the data ] for event in merged_events_dict.values(): transformed_event = {} aware_dtg = None # Initialize dtg as None try: # --- Timezone Handling for 'dtg' --- raw_dtg_value = event.get('qt_8yjok4izsc') if raw_dtg_value: parsed_naive = None for fmt in possible_dt_formats: try: # Attempt to parse using the current format parsed_naive = datetime.strptime(str(raw_dtg_value), fmt) break # Stop trying formats if one succeeds except ValueError: continue # Try the next format if parsed_naive: # Successfully parsed, now make it timezone-aware (ET) aware_dtg = eastern_tz.localize(parsed_naive, is_dst=None) # is_dst=None handles ambiguous times # print(f"DEBUG: Raw: {raw_dtg_value}, Parsed Naive: {parsed_naive}, Aware ET: {aware_dtg}, UTC: {aware_dtg.astimezone(pytz.utc)}") # Optional debug print else: # Could not parse with any known format print(f"Warning: Could not parse dtg value '{raw_dtg_value}' for event ID {event.get('qt_3nwfu9yq1c')} using known formats.") parse_error_count += 1 else: print(f"Warning: Missing dtg value for event ID {event.get('qt_3nwfu9yq1c')}.") parse_error_count += 1 # --- Standard Key Renaming and Data Extraction --- for old_key, new_key in KEY_RENAME_MAP.items(): if old_key == 'qt_8yjok4izsc': transformed_event[new_key] = aware_dtg # Assign the aware datetime object or None elif old_key in event: transformed_event[new_key] = event[old_key] if COORDINATE_KEY_OLD in event: coordinates = event[COORDINATE_KEY_OLD] if isinstance(coordinates, (list, tuple)) and len(coordinates) >= 2: try: transformed_event[LONGITUDE_KEY_NEW] = float(coordinates[0]) transformed_event[LATITUDE_KEY_NEW] = float(coordinates[1]) except (TypeError, ValueError): pass # Keep lon/lat None if conversion fails keys_to_keep = [BMP_KEY_ORIGINAL, COUNTY_KEY_ORIGINAL] for key in keys_to_keep: if key in event: transformed_event[key] = event[key] # Ensure source is added if not already mapped if 'source' not in transformed_event and 'qt_4e66idhbrc' in event: transformed_event['source'] = event['qt_4e66idhbrc'] # Only add event if it has an ID (required for DB) if transformed_event.get('id'): final_transformed_list.append(transformed_event) else: print(f"Warning: Skipping event due to missing ID. Original data snippet: {str(event)[:100]}") except Exception as e: print(f"Error during final transformation for event: {e}"); traceback.print_exc() if parse_error_count > 0: print(f"-> Encountered {parse_error_count} datetime parsing issues.") #print(f"-> Finished transformation. Resulting valid events: {len(final_transformed_list)}.") return final_transformed_list except requests.exceptions.RequestException as e: print(f"API Request failed: {e}") except json.JSONDecodeError as e: print(f"Failed to decode API JSON response: {e}") if 'cleaned_text' in locals(): print("Raw response snippet:", cleaned_text[:500]) except Exception as e: print(f"An unexpected error occurred during data fetching/processing: {e}") traceback.print_exc() return [] # --- Database Upsert and Post-Update Function --- def upsert_and_update_db(events): """Upserts events and runs post-update spatial queries.""" if not events: print("No events to process.") return conn = None cursor = None upserted_count = 0 upsert_error_count = 0 # SQL definitions (NO COUNTY in upsert) # Ensure first_seen and last_seen_in_feed columns are TIMESTAMPTZ # psycopg2 handles timezone-aware datetime objects correctly for TIMESTAMPTZ sql_upsert = f""" INSERT INTO {TABLE_NAME_QUALIFIED} (id, first_seen, initial_description, latest_description, last_updated, last_seen_in_feed, geom, source, route, remark) VALUES (%(id)s, %(first_seen)s, %(initial_desc)s, %(latest_desc)s, NOW(), %(last_seen)s, ST_SetSRID(ST_MakePoint(%(lon)s, %(lat)s), 4326), %(source)s, %(route)s, %(remark)s) ON CONFLICT (id) DO UPDATE SET latest_description = excluded.latest_description, last_updated = NOW(), last_seen_in_feed = excluded.last_seen_in_feed, geom = excluded.geom, source = excluded.source, route = excluded.route, remark = excluded.remark, -- Keep original first_seen and initial_description on update first_seen = {TABLE_NAME_QUALIFIED}.first_seen, initial_description = {TABLE_NAME_QUALIFIED}.initial_description; """ sql_upsert_no_geom = f""" INSERT INTO {TABLE_NAME_QUALIFIED} (id, first_seen, initial_description, latest_description, last_updated, last_seen_in_feed, geom, source, route, remark) VALUES (%(id)s, %(first_seen)s, %(initial_desc)s, %(latest_desc)s, NOW(), %(last_seen)s, NULL, %(source)s, %(route)s, %(remark)s) ON CONFLICT (id) DO UPDATE SET latest_description = excluded.latest_description, last_updated = NOW(), last_seen_in_feed = excluded.last_seen_in_feed, geom = NULL, -- Keep geom NULL on update if it was NULL source = excluded.source, route = excluded.route, remark = excluded.remark, -- Keep original first_seen and initial_description on update first_seen = {TABLE_NAME_QUALIFIED}.first_seen, initial_description = {TABLE_NAME_QUALIFIED}.initial_description; """ # Post-update SQL definitions (remain the same) post_update_sqls = [ f'UPDATE {TABLE_NAME_QUALIFIED} SET county = county.countyname from public.county WHERE ST_Contains(county.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.county IS NULL', # Optional: only update if null f'UPDATE {TABLE_NAME_QUALIFIED} SET cwa = fzone.cwa from public.fzone WHERE ST_Contains(fzone.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.cwa IS NULL', # Optional: only update if null f'UPDATE {TABLE_NAME_QUALIFIED} SET st = county.state from public.county WHERE ST_Contains(county.geom,{TABLE_NAME_QUALIFIED}.geom) AND {TABLE_NAME_QUALIFIED}.st IS NULL' # Optional: only update if null ] try: #print(f"\nConnecting to PostgreSQL database '{DB_NAME}' on {DB_HOST}...") conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT) cursor = conn.cursor() #print("Connection successful.") # --- Stage 1: Upsert Events --- #print("Starting upsert stage...") for event in events: try: event_id = event.get('id') if not event_id: upsert_error_count += 1 #print("Skipping event due to missing ID in upsert stage.") continue # Get the timezone-aware datetime object (or None) dtg_aware = event.get('dtg') lon_val = event.get('lon'); lat_val = event.get('lat') route_val = event.get('route'); remark_val = event.get('remark') bmp_val = event.get(BMP_KEY_ORIGINAL) desc_parts = [] if route_val: desc_parts.append(route_val) if bmp_val is not None: desc_parts.append(f"at MM {str(int(bmp_val)) if isinstance(bmp_val, float) and bmp_val.is_integer() else str(bmp_val)}") if remark_val: desc_parts.append(remark_val) full_desc = " ".join(desc_parts) if desc_parts else None data_dict = { 'id': event_id, # Pass the aware datetime object directly. If it's None, psycopg2 handles it as NULL. 'first_seen': dtg_aware, 'initial_desc': full_desc, 'latest_desc': full_desc, 'last_seen': dtg_aware, 'lon': lon_val, 'lat': lat_val, 'source': event.get('source'), 'route': route_val, 'remark': remark_val } # Choose the correct SQL based on geometry presence if lon_val is not None and lat_val is not None: cursor.execute(sql_upsert, data_dict) else: cursor.execute(sql_upsert_no_geom, data_dict) upserted_count += 1 except psycopg2.Error as db_err: upsert_error_count += 1 print(f"DB upsert error ID '{event_id}': {db_err}") conn.rollback() # Rollback this event except Exception as e: upsert_error_count += 1 print(f"Prep/Exec error ID '{event_id}': {e}") traceback.print_exc() # No rollback here needed as the error is before execute or handled by psycopg2 block # Commit upsert stage (committing successful ones) if upsert_error_count > 0: print(f"Upsert stage completed with {upsert_error_count} errors for individual events (rolled back individually).") #print(f"Committing {upserted_count} successful upserts...") conn.commit() #print(f"Upsert stage committed.") # --- Stage 2: Post-Update Spatial Queries --- #print("\nRunning post-upsert spatial updates...") post_update_errors = 0 for i, update_sql in enumerate(post_update_sqls, 1): #print(f"Executing post-update {i}/{len(post_update_sqls)}: {update_sql[:100]}...") # Print start of query try: cursor.execute(update_sql) #print(f" -> Update {i} successful ({cursor.rowcount} rows affected).") # Commit each successful post-update step individually conn.commit() #print(f" Committed post-update step {i}.") except psycopg2.Error as post_db_err: post_update_errors += 1 print(f" -> Database error executing post-update {i}: {post_db_err}") print(f" Failed SQL: {update_sql}") conn.rollback() # Rollback this specific update attempt print(" Rolled back post-update transaction attempt.") # Decide if script should stop entirely on post-update failure # break # Optional: Stop processing further post-updates if one fails except Exception as post_e: post_update_errors += 1 print(f" -> Unexpected error executing post-update {i}: {post_e}") print(f" Failed SQL: {update_sql}") conn.rollback() print(" Rolled back post-update transaction attempt.") # break # Optional: Stop processing further post-updates if one fails if post_update_errors == 0: print("Post-upsert updates completed successfully.") else: print(f"Post-upsert updates encountered {post_update_errors} errors (each failed step was rolled back).") except psycopg2.OperationalError as e: print(f"Database connection failed: {e}") except Exception as e: print(f"An unexpected error occurred during database operations: {e}") traceback.print_exc() if conn: conn.rollback() # Rollback any outstanding transaction on major error finally: if cursor: cursor.close() if conn: conn.close() print("Database connection closed.") # --- Script Entry Point --- if __name__ == "__main__": processed_events = fetch_and_process_data() if processed_events: # Only run DB operations if we got data upsert_and_update_db(processed_events) else: print("No processed events found, skipping database operations.") print("\nScript finished.")