import psycopg2 import requests import re import logging import time # Import the time module import ast import json # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def get_initial_session_data(session, first_request_zx="fgd9osntni3x"): """ Makes the first POST request to get the SID and the dynamic gsessionid. """ url = f"https://firestore.googleapis.com/google.firestore.v1.Firestore/Listen/channel?database=projects%2Fkytc-goky%2Fdatabases%2F(default)&VER=8&RID=65930&CVER=22&X-HTTP-Session-Id=gsessionid&%24httpHeaders=X-Goog-Api-Client%3Agl-js%2F%20fire%2F9.6.10%0D%0AContent-Type%3Atext%2Fplain%0D%0AX-Firebase-GMPID%3A1%3A911478978941%3Aweb%3Ab965a6c158ee5c4d17b414%0D%0A&zx={first_request_zx}&t=1" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:139.0) Gecko/20100101 Firefox/139.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "content-type": "application/x-www-form-urlencoded", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "Priority": "u=4", "Referer": "https://goky.ky.gov/" } body = "count=2&ofs=0&req0___data__=%7B%22database%22%3A%22projects%2Fkytc-goky%2Fdatabases%2F(default)%22%2C%22addTarget%22%3A%7B%22query%22%3A%7B%22structuredQuery%22%3A%7B%22from%22%3A%5B%7B%22collectionId%22%3A%22realtime%22%7D%5D%2C%22orderBy%22%3A%5B%7B%22field%22%3A%7B%22fieldPath%22%3A%22__name__%22%7D%2C%22direction%22%3A%22ASCENDING%22%7D%5D%7D%2C%22parent%22%3A%22projects%2Fkytc-goky%2Fdatabases%2F(default)%2Fdocuments%22%7D%2C%22targetId%22%3A2%7D%7D&req1___data__=%7B%22database%22%3A%22projects%2Fkytc-goky%2Fdatabases%2F(default)%22%2C%22addTarget%22%3A%7B%22query%22%3A%7B%22structuredQuery%22%3A%7B%22from%22%3A%5B%7B%22collectionId%22%3A%22tweets%22%7D%5D%2C%22orderBy%22%3A%5B%7B%22field%22%3A%7B%22fieldPath%22%3A%22__name__%22%7D%2C%22direction%22%3A%22ASCENDING%22%7D%5D%7D%2C%22parent%22%3A%22projects%2Fkytc-goky%2Fdatabases%2F(default)%2Fdocuments%22%7D%2C%22targetId%22%3A4%7D%7D" #logging.info(f"Attempting to get initial session data from: {url}") actual_gsessionid = None sid = None try: response = session.post(url, headers=headers, data=body, timeout=15) # Timeout for connect and initial response response.raise_for_status() match = re.search(r'\[0,\s*\["c",\s*"([^"]+)"', response.text) if match: sid = match.group(1) #logging.info(f"Successfully obtained SID: {sid}") else: #logging.error(f"Could not parse SID from response body: {response.text[:500]}") return None for header_name_original_case in response.headers: if header_name_original_case.lower() == 'x-http-session-id': actual_gsessionid = response.headers[header_name_original_case] logging.info(f"Found '{header_name_original_case}' in response headers: {actual_gsessionid}") break if not actual_gsessionid: logging.warning("Dynamic gsessionid (X-HTTP-Session-Id) not found in first response headers.") return {"sid": sid, "dynamic_gsessionid": actual_gsessionid} except requests.exceptions.RequestException as e: logging.error(f"Error during initial session data request: {e}") return None def get_raw_roadway_datastream(session, sid, gsessionid_to_use, zx_for_data_req, read_duration_limit=2.0): """ Makes the second GET request in streaming mode and reads data for a limited duration. Returns the raw text content received within that duration. """ url = f"https://firestore.googleapis.com/google.firestore.v1.Firestore/Listen/channel?database=projects%2Fkytc-goky%2Fdatabases%2F(default)&gsessionid={gsessionid_to_use}&VER=8&RID=rpc&SID={sid}&CI=0&AID=0&TYPE=xmlhttp&zx={zx_for_data_req}&t=1" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:139.0) Gecko/20100101 Firefox/139.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "Priority": "u=4", "Referer": "https://goky.ky.gov/" } #logging.info(f"Attempting to get roadway data stream with SID: {sid}, gsessionid: {gsessionid_to_use}, zx: {zx_for_data_req}") #logging.info(f"Will attempt to read stream for a maximum of {read_duration_limit} seconds.") received_data_bytes = [] raw_text_data = None try: # Initial timeout for connection and first byte, stream=True to control body download with session.get(url, headers=headers, stream=True, timeout=(10, 30)) as response: # (connect_timeout, read_timeout_for_first_byte) response.raise_for_status() start_time = time.time() for chunk in response.iter_content(chunk_size=8192): # Read in 8KB chunks if chunk: # filter out keep-alive new chunks received_data_bytes.append(chunk) if time.time() - start_time > read_duration_limit: #logging.info(f"Read duration limit of {read_duration_limit} seconds reached. Stopping stream read.") break # After loop (timeout or stream ended), decode collected bytes # The server might send UTF-8, response.encoding might guess, or we assume utf-8 encoding = response.encoding if response.encoding else 'utf-8' raw_text_data = b"".join(received_data_bytes).decode(encoding, errors='replace') #logging.info(f"Successfully fetched and processed stream. Total data length: {len(raw_text_data)} characters.") except requests.exceptions.RequestException as e: logging.error(f"Error during roadway data stream request: {e}") # If there was an error but some data was received before it, try to decode that. if received_data_bytes: try: raw_text_data = b"".join(received_data_bytes).decode('utf-8', errors='replace') # Fallback to utf-8 logging.warning(f"Request error occurred, but returning partially received data: {len(raw_text_data)} characters.") except Exception as decode_err: logging.error(f"Could not decode partially received data after request error: {decode_err}") raw_text_data = None # Ensure it's None if decoding fails else: raw_text_data = None except Exception as e_generic: # Catch any other unexpected errors logging.error(f"An unexpected error occurred during stream processing: {e_generic}") if received_data_bytes and not raw_text_data: # If an error happened after collecting some data but before decoding try: raw_text_data = b"".join(received_data_bytes).decode('utf-8', errors='replace') logging.warning(f"Unexpected error, but returning partially received data: {len(raw_text_data)} characters.") except Exception as decode_err: logging.error(f"Could not decode partially received data after unexpected error: {decode_err}") raw_text_data = None else: raw_text_data = None return raw_text_data # --- Helper functions for Firestore value types --- def _parse_firestore_value(value_dict): if not isinstance(value_dict, dict): return value_dict if "stringValue" in value_dict: return value_dict["stringValue"] elif "integerValue" in value_dict: try: return int(value_dict["integerValue"]) except ValueError: return value_dict["integerValue"] elif "doubleValue" in value_dict: try: return float(value_dict["doubleValue"]) except ValueError: return value_dict["doubleValue"] elif "booleanValue" in value_dict: return value_dict["booleanValue"] elif "timestampValue" in value_dict: return value_dict["timestampValue"] elif "geoPointValue" in value_dict: return value_dict["geoPointValue"] elif "mapValue" in value_dict: return _parse_firestore_fields(value_dict["mapValue"].get("fields", {})) elif "arrayValue" in value_dict: values = value_dict["arrayValue"].get("values", []) return [_parse_firestore_value(v) for v in values] elif "nullValue" in value_dict: return None else: return value_dict def _parse_firestore_fields(fields_dict): if not isinstance(fields_dict, dict): return {} py_dict = {} for field_name, firestore_value_wrapper in fields_dict.items(): py_dict[field_name] = _parse_firestore_value(firestore_value_wrapper) return py_dict def _parse_single_message_payload(list_payload_string, message_number_for_debug=""): """ Parses the actual "[[...]]" list payload of a single Firestore message. Returns a list of processed change objects, or None if parsing fails. """ # Pre-process for common JSON-to-Python literal mismatches # This is a bit of a shotgun approach, be careful if these strings # could legitimately appear as string *values*. # Ensure there are word boundaries to avoid replacing "trueValue" with "TrueValue" # Using a more careful replacement with regular expressions might be better, # but for a quick test: processed_string = list_payload_string.replace(": true", ": True") processed_string = processed_string.replace(": false", ": False") processed_string = processed_string.replace(": null", ": None") # If they can appear at the start of an array value without a key: processed_string = processed_string.replace("[true", "[True") processed_string = processed_string.replace(", true", ", True") processed_string = processed_string.replace("[false", "[False") processed_string = processed_string.replace(", false", ", False") processed_string = processed_string.replace("[null", "[None") processed_string = processed_string.replace(", null", ", None") try: # Use the pre-processed string parsed_outer_list = ast.literal_eval(processed_string) # <--- USE PROCESSED STRING if not isinstance(parsed_outer_list, list): print(f"DEBUG: PAYLOAD_ERROR (Message {message_number_for_debug}) - Expected a list after ast.literal_eval, got {type(parsed_outer_list)}") return None except (ValueError, SyntaxError) as e: error_msg = getattr(e, 'msg', str(e)) #print(f"DEBUG: PAYLOAD_ERROR (Message {message_number_for_debug}) - Error parsing list payload string with ast.literal_eval: {error_msg}") error_line_num = getattr(e, 'lineno', None) error_col_num = getattr(e, 'offset', None) error_text_line = getattr(e, 'text', None) # Use the *original* string for error reporting context if pre-processing was done, # or the processed_string if not, to ensure line numbers match the string ast saw. string_for_error_context = processed_string if error_line_num is not None and error_text_line is not None: #print(f"DEBUG: PAYLOAD_ERROR Detail: Line {error_line_num}, Column {error_col_num if error_col_num is not None else '(unknown)'} (of the current {len(string_for_error_context)}-byte payload for Message {message_number_for_debug}).") #print(f"DEBUG: PAYLOAD_ERROR Offending line from parsed string: {error_text_line.rstrip()}") # This is the line ast.literal_eval saw if error_col_num is not None: pointer = " " * (error_col_num - 1) + "^" print(f"DEBUG: PAYLOAD_ERROR {pointer}") lines_of_payload = string_for_error_context.splitlines() context_window_size = 2 actual_error_line_idx = error_line_num - 1 display_start_idx = max(0, actual_error_line_idx - context_window_size) display_end_idx = min(len(lines_of_payload), actual_error_line_idx + context_window_size + 1) #print(f"DEBUG: PAYLOAD_ERROR Context from payload (Message {message_number_for_debug}, around its line {error_line_num}):") for i in range(display_start_idx, display_end_idx): current_line_content = lines_of_payload[i] if i < len(lines_of_payload) else "[End of Payload]" line_prefix = " " if i == actual_error_line_idx: line_prefix = ">>" #print(f"DEBUG: PAYLOAD_ERROR {line_prefix} L{i+1:04d}: {current_line_content.rstrip()}") if i == actual_error_line_idx and error_col_num is not None: context_pointer_prefix = f"DEBUG: PAYLOAD_ERROR {line_prefix} L{i+1:04d}: " # Recalculate prefix length context_pointer = " " * (len(context_pointer_prefix) + error_col_num -1) + "^" print(context_pointer) else: print(f"DEBUG: PAYLOAD_ERROR (Message {message_number_for_debug}) Problematic segment (first 200 chars of string given to ast.literal_eval): '{string_for_error_context[:200]}...'") return None # --- Rest of the processing loop (unchanged) --- processed_changes_for_this_message = [] for item_array in parsed_outer_list: # ... (same processing logic as before) if not isinstance(item_array, list) or len(item_array) != 2: print(f"DEBUG: PAYLOAD_WARN - Skipping malformed item in outer list: {item_array}") continue change_events_list = item_array[1] if not isinstance(change_events_list, list): print(f"DEBUG: PAYLOAD_WARN - Expected list of change events, got {type(change_events_list)}") continue for event_obj in change_events_list: if not isinstance(event_obj, dict): print(f"DEBUG: PAYLOAD_WARN - Expected dict for change event, got {type(event_obj)}") continue if "documentChange" in event_obj: doc_change = event_obj["documentChange"] doc = doc_change.get("document", {}) parsed_document = { "type": "document_change", "document_name": doc.get("name"), "fields": _parse_firestore_fields(doc.get("fields", {})), "createTime": doc.get("createTime"), "updateTime": doc.get("updateTime"), "targetIds": doc_change.get("targetIds"), } processed_changes_for_this_message.append(parsed_document) elif "targetChange" in event_obj: processed_changes_for_this_message.append({ "type": "target_change", "change": event_obj["targetChange"] }) elif "documentDelete" in event_obj: doc_delete = event_obj["documentDelete"] processed_changes_for_this_message.append({ "type": "document_delete", "document_name": doc_delete.get("document"), "removedTargetIds": doc_delete.get("removedTargetIds"), "readTime": doc_delete.get("readTime") }) elif "documentRemove" in event_obj: doc_remove = event_obj["documentRemove"] processed_changes_for_this_message.append({ "type": "document_remove", "document_name": doc_remove.get("document"), "removedTargetIds": doc_remove.get("removedTargetIds"), "readTime": doc_remove.get("readTime") }) elif "filter" in event_obj: processed_changes_for_this_message.append({ "type": "filter_update", "filter_details": event_obj["filter"] }) return processed_changes_for_this_message # --- Main function to parse the entire stream of messages --- def parse_all_firestore_messages(full_stream_string): """ Parses a string containing one or more length-prefixed Firestore messages. """ full_stream_string = full_stream_string.strip() # Strip entire input first all_parsed_changes = [] current_offset = 0 total_stream_length = len(full_stream_string) message_count = 0 while current_offset < total_stream_length: remaining_stream_chunk = full_stream_string[current_offset:] if not remaining_stream_chunk.strip(): break newline_idx_in_chunk = remaining_stream_chunk.find('\n') if newline_idx_in_chunk == -1: if remaining_stream_chunk.strip(): print(f"DEBUG: STREAM_ERROR - Malformed stream: No newline found for length prefix at main offset {current_offset}. Remaining data (first 100 chars): '{remaining_stream_chunk[:100].strip()}...'") break message_length_str = remaining_stream_chunk[:newline_idx_in_chunk].strip() try: expected_payload_length = int(message_length_str) if expected_payload_length < 0: print(f"DEBUG: STREAM_ERROR - Invalid negative length prefix '{message_length_str}' at main offset {current_offset}.") break except ValueError: print(f"DEBUG: STREAM_ERROR - Invalid length prefix '{message_length_str}' at main offset {current_offset}. Cannot convert to int. Chunk starts: '{remaining_stream_chunk[:100]}...'") break payload_start_in_chunk = newline_idx_in_chunk + 1 if payload_start_in_chunk + expected_payload_length > len(remaining_stream_chunk): print(f"DEBUG: STREAM_ERROR - Truncated payload for message (prefix: {message_length_str}) at main offset {current_offset}. Expected {expected_payload_length} payload bytes, but only {len(remaining_stream_chunk) - payload_start_in_chunk} available after newline.") break actual_list_payload_string = remaining_stream_chunk[payload_start_in_chunk : payload_start_in_chunk + expected_payload_length] message_count += 1 # print(f"DEBUG: STREAM - Parsing message #{message_count}, prefix: {message_length_str}, payload_len: {expected_payload_length}, main_offset: {current_offset}") parsed_changes_from_this_message = _parse_single_message_payload(actual_list_payload_string, str(message_count)) if parsed_changes_from_this_message is None: print(f"DEBUG: STREAM_ERROR - Halting processing due to error in message #{message_count} (prefix: {message_length_str}).") break all_parsed_changes.extend(parsed_changes_from_this_message) current_offset += (len(message_length_str) + 1 + expected_payload_length) # +1 for the original newline before stripping the prefix print(f"DEBUG: STREAM - Finished processing. Parsed {message_count} messages. Total changes found: {len(all_parsed_changes)}.") return all_parsed_changes if __name__ == "__main__": zx_for_sid_request = "fgd9osntni3x" zx_for_data_request = "kxqo6qqmk3y4" stream_read_time_seconds = 1.0 with requests.Session() as http_session: session_init_data = get_initial_session_data(http_session, first_request_zx=zx_for_sid_request) if session_init_data and session_init_data["sid"]: sid = session_init_data["sid"] gsessionid_to_use = session_init_data["dynamic_gsessionid"] if not gsessionid_to_use: logging.warning("Dynamic gsessionid was not found in headers. Using a STALE example gsessionid. THIS WILL LIKELY FAIL.") gsessionid_to_use = "7VNzPJC3suxWdG7nZ4kBzftyc1O2nD9xZb-U0AEQD6w" # Fallback, likely stale raw_data = get_raw_roadway_datastream( http_session, sid, gsessionid_to_use, zx_for_data_req=zx_for_data_request, read_duration_limit=stream_read_time_seconds ) if raw_data is not None: # Check for None explicitly, as empty string is valid data parsed_data = parse_all_firestore_messages(raw_data) #data = json.loads(parsed_data) cams = {} for i in parsed_data: try: if i['fields']['type'] == 'rwis': if i['fields']['location']['longitude'] > -83.90: needed = {'url': i['fields']['source']['imageUrl'],'route': i['fields']['source']['route'],'cameraKey': i['fields']['source']['cameraKey'],'milepost': i['fields']['source']['mile_post'],'id': i['fields']['source']['id'],'lon': i['fields']['location']['longitude'],'lat': i['fields']['location']['latitude']} key = len(cams) + 1 cams[key] = needed except (KeyError, TypeError): continue #print(cams) conn = psycopg2.connect(host='localhost', database='nws', user='nws', password='nws') cursor = conn.cursor() for i in cams: #print(cams[i]) values = (cams[i]['url'],cams[i]['route'] + " MM" + str(cams[i]['milepost']) ,cams[i]['cameraKey'],cams[i]['lat'],cams[i]['lon'],'normal',True,10) sql = 'INSERT INTO cams (url, description, bloomsky,lat,lon,method,active,interval) values (%s, %s, %s,%s,%s,%s,%s, %s) on conflict (bloomsky) do update set url = EXCLUDED.url' cursor.execute(sql,values) conn.commit() updates = ['UPDATE public.cams SET geom = ST_SetSRID(ST_MakePoint(lon, lat), 4326)', 'UPDATE public.cams SET county = county.countyname from public.county WHERE ST_Contains(county.geom,cams.geom)', 'UPDATE public.cams SET pzone = pzone.state_zone from public.pzone WHERE ST_Contains(pzone.geom,cams.geom)', 'UPDATE public.cams SET fzone = fzone.state_zone from public.fzone WHERE ST_Contains(fzone.geom,cams.geom)', 'UPDATE public.cams SET cwa = fzone.cwa from public.fzone WHERE ST_Contains(fzone.geom,cams.geom)', 'UPDATE public.cams SET zonename = pzone.shortname from public.pzone WHERE ST_Contains(pzone.geom,cams.geom)', 'UPDATE public.cams SET keephours = 400 WHERE keephours is null', """UPDATE public.cams SET method = 'normal' WHERE method is null""", 'UPDATE public.cams SET state = county.state from public.county WHERE ST_Contains(county.geom,cams.geom)'] for i in updates: cursor.execute(i) conn.commit() conn.close() else: print("Failed to retrieve any roadway data stream content, or an error occurred before data could be processed.") print(f" Attempted with SID='{sid}', gsessionid='{gsessionid_to_use}', zx for data='{zx_for_data_request}'") else: print("Could not obtain initial session data (SID and/or gsessionid). Script cannot continue.")