import sys import hashlib import time import os import requests import cv2 import ffmpeg import shutil from mimetypes import guess_extension import psycopg2 from psycopg2.extras import Json from yt_dlp import YoutubeDL import numpy as np import json from PIL import Image import subprocess import ephem import datetime import imagehash from io import BytesIO import re from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from concurrent.futures import ThreadPoolExecutor from collections import defaultdict import threading import random import warnings from urllib3.exceptions import InsecureRequestWarning from urllib.parse import unquote import logging # Suppress InsecureRequestWarning warnings.filterwarnings('ignore', category=InsecureRequestWarning) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/www/html/work/camai.log') ] ) logger = logging.getLogger(__name__) # Database connection conn = psycopg2.connect(host='localhost', database='nws', user='nws', password='nws') cursor = conn.cursor() # Constants CAM_DIR = "/var/www/html/work/camdata/" HOST_DELAY = 1.5 # Changed from 0.5 to 1.5 seconds MAX_WORKERS = 20 # Changed from 50 to 10 COMMIT_INTERVAL = 25 # Commit every 5 successful camera processes # Global host tracking and commit counter host_last_access = defaultdict(float) host_lock = threading.Lock() commit_counter = threading.local() # YouTube downloader setup ydl_options = { 'quiet': True, 'cookiefile': 'cook.txt' } ydl = YoutubeDL(ydl_options) ydl.add_default_info_extractors() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', 'Cache-Control': 'no-cache' } proxies = { "http": "http://192.168.11.83:8118", "https": "http://192.168.11.83:8118" } def get_host_delay(url): """Implement delay between requests to same host from any URL type""" hostname_match = re.match(r'(?:https?|rtsp)://([^/:]+)', url) hostname = hostname_match.group(1) if hostname_match else 'unknown' with host_lock: current_time = time.time() last_access = host_last_access[hostname] time_since_last = current_time - last_access if time_since_last < HOST_DELAY: time.sleep(HOST_DELAY - time_since_last) host_last_access[hostname] = time.time() def image_hash(blob): img = Image.open(BytesIO(blob)) return str(imagehash.phash(img, hash_size=10)) def check_if_daytime(lat='38.4803826', lon='-81.1850195'): now = datetime.datetime.utcnow() o = ephem.Observer() o.lon, o.lat, o.date = lon, lat, now s = ephem.Sun() s.compute(o) return s.alt > -3 def guess_ext(content_type): ext = guess_extension(content_type.partition(';')[0].strip()) return '.jpg' if ext == '.jpe' or not ext else ext def resize_frame(frame): width = frame.shape[1] if width < 600: scalefactor = 800 / width elif width > 1000: scalefactor = 800 / width else: return frame new_width = int(scalefactor * width) new_height = int(scalefactor * frame.shape[0]) return cv2.resize(frame, (new_width, new_height)) def save_frame(frame, path, camid): if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) cv2.imwrite(path, frame) def ipcamlive_update(): cursor.execute("""SELECT camid, bloomsky FROM cams WHERE method = 'ipcam' AND active = True AND ((EXTRACT(EPOCH FROM (current_timestamp - lastsuccess))/60) > interval OR lastsuccess IS NULL) """) ipcams = cursor.fetchall() #logger.info(ipcams) base_url = 'https://www.ipcamlive.com/ajax/getcamerastreamstate.php?cameraalias=' headers = { 'Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36', 'Content-Type': 'application/json; charset=UTF-8', 'Accept': '*/*', 'Origin': 'https://ipcamlive.com', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Dest': 'empty', 'X-HTTP-Method-Override': 'POST', 'Host': 'g1.ipcamlive.com', 'Referer': 'https://ipcamlive.com/', 'Accept-Language': 'en-US,en;q=0.9' } try: # Create a single session for all requests with requests.Session() as session: session.headers.update(headers) # Apply headers to the session for camid, alias in ipcams: if alias: url = f"{base_url}{alias}&targetdomain=g1.ipcamlive.com" get_host_delay(url) # Assuming this function handles delays appropriately r = session.post(url) # Use session for the POST request if r.status_code == 200: ipcam = r.json() #logger.info(ipcam) ipid = ipcam.get('details').get('streamid') ipurl = ipcam.get('details').get('address') #logger.info(f"{ipid} {ipurl}") snapshot_url = f"{ipurl}streams/{ipid}/snapshot.jpg" if ipid == None: cursor.execute("UPDATE cams SET lastfailed = False WHERE camid = %s",(camid,)) continue cursor.execute("UPDATE cams SET url = %s WHERE method = 'ipcam' AND camid = %s", (snapshot_url, camid)) logger.info("IPCamlive update completed successfully") except Exception as e: logger.error(f"IPCamlive update failed: {e}") def get_camera_handler(method): """Return appropriate handler based on camera method""" return { #'rtsp': handle_rtsp_ffmpeg, 'hls': handle_hls, 'rtsp': handle_hls, 'verkada': handle_verkada, 'verkadalow': handle_verkadalow, 'castr': handle_castr, 'normal': handle_normal, 'normalcache': handle_normal_cache, 'youtube': handle_youtube, 'normalproxy': handle_normal_proxy, 'ipcam': handle_normal, 'bloomsky': handle_bloomsky, 'ivideon': handle_ivideon, 'wetmet': handle_wetmet, 'rtspme': handle_rtspme, 'rail': handle_rail }.get(method, handle_normal) def handle_normal(entry): try: camid, url, *_ = entry get_host_delay(url) r = requests.get(url, timeout=3, headers=headers, verify=False) if r.status_code == 200: process_image_response(r.content, r.headers, camid, url) logger.debug(f"Successfully downloaded normal image for camid {camid}") except Exception as e: logger.error(f"Normal download failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_normal_cache(entry): camid, url, *rest = entry modified_entry = (camid, f"{url}?{random.randint(10, 99999)}", *rest) handle_normal(modified_entry) def handle_ohgo(entry): camid, url, *_ = entry try: get_host_delay(url) r = requests.get(url, timeout=3, headers=headers, verify=False, proxies=proxies) if r.status_code == 200: process_image_response(r.content, r.headers, camid, url) logger.debug(f"Successfully downloaded proxy image for camid {camid}") except Exception as e: logger.error(f"Normal proxy download failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_normal_proxy(entry): camid, url, *_ = entry try: get_host_delay(url) r = requests.get(url, timeout=3, headers=headers, verify=False, proxies=proxies) if r.status_code == 200: process_image_response(r.content, r.headers, camid, url) logger.debug(f"Successfully downloaded proxy image for camid {camid}") except Exception as e: logger.error(f"Normal proxy download failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_rtsp_ffmpeg(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) args = ( ffmpeg .input(url, rtsp_transport='tcp', stimeout='5000000') .filter('scale', 800, -1) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .compile() ) args = [args[0]] + ['-nostats', '-loglevel', 'error'] + args[1:] result = subprocess.run(args, capture_output=True, timeout=15, check=True) frame = result.stdout if frame: md5.update(frame) nparr = np.frombuffer(frame, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is not None: process_frame(img, md5.hexdigest(), camid) logger.debug(f"Successfully processed RTSP frame for camid {camid}") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: logger.error(f"RTSP FFmpeg failed for camid {camid} (timeout or process error): {e}") failed_download(camid, str(e)) except Exception as e: logger.error(f"RTSP FFmpeg failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_hls(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) # Get ffmpeg command arguments, now with a User-Agent header args = ( ffmpeg .input(url, user_agent=headers['User-Agent']) .filter('scale', 800, -1) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .compile() ) # Add arguments to suppress output args = [args[0]] + ['-nostats', '-loglevel', 'error'] + args[1:] # Execute using subprocess to allow for a timeout result = subprocess.run(args, capture_output=True, timeout=15, check=True) frame = result.stdout if frame: md5.update(frame) nparr = np.frombuffer(frame, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is not None: process_frame(img, md5.hexdigest(), camid) logger.debug(f"Successfully processed HLS frame for camid {camid}") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: #logger.error(f"HLS FFmpeg failed for camid {camid} (timeout or process error): {e} {url}") failed_download(camid, str(e)) except Exception as e: logger.error(f"HLS FFmpeg failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_verkada(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) wvheaders = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:105.0) Gecko/20100101 Firefox/105.0", "Connection": "keep-alive", "referrer": "https://command.verkada.com/embed.html", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "cookie": "intercom-id-q5re5q6g=183b11d6-5bfb-4e20-8a91-e5733758fbfd; intercom-session-q5re5q6g=; auth='(null)'; token=" } r = requests.get(url, headers=wvheaders) p = re.compile(r'https?://[^\s"<>]+high_res[^\s"<>]*') match = p.search(r.url) if match: urlinfo = unquote(match.group(0)) fragment = unquote(urlinfo.split('#')[1]) data = json.loads(fragment) m3u8_url = data.get('urlHD', data.get('urlSD')) get_host_delay(m3u8_url) cap = cv2.VideoCapture(m3u8_url) if cap.isOpened(): _, frame = cap.read() cap.release() if frame is not None: md5.update(frame.tobytes()) frame = resize_frame(frame) process_frame(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Verkada frame for camid {camid}") else: failed_download(camid, "No stream URL found") except Exception as e: logger.error(f"Verkada failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_verkadalow(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) wvheaders = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:105.0) Gecko/20100101 Firefox/105.0", "Connection": "keep-alive", "referrer": "https://command.verkada.com/embed.html", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "cookie": "intercom-id-q5re5q6g=183b11d6-5bfb-4e20-8a91-e5733758fbfd; intercom-session-q5re5q6g=; auth='(null)'; token=" } r = requests.get(url, headers=wvheaders) p = re.compile(r'https?://[^\s"<>]+high_res[^\s"<>]*') match = p.search(r.url) if match: urlinfo = unquote(match.group(0)) fragment = unquote(urlinfo.split('#')[1]) data = json.loads(fragment) m3u8_url = data.get('urlSD', data.get('urlHD')) get_host_delay(m3u8_url) cap = cv2.VideoCapture(m3u8_url) if cap.isOpened(): _, frame = cap.read() cap.release() if frame is not None: md5.update(frame.tobytes()) frame = resize_frame(frame) process_frame(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Verkada frame for camid {camid}") else: failed_download(camid, "No stream URL found") except Exception as e: logger.error(f"Verkada failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_castr(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) referrer = f'https://player.castr.io/{url.rsplit("/", 1)[-1]}' args = ( ffmpeg .input(url, headers=f'Referer: {referrer}\r\n') .filter('scale', 800, -1) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .compile() ) args = [args[0]] + ['-nostats', '-loglevel', 'error'] + args[1:] result = subprocess.run(args, capture_output=True, timeout=15, check=True) frame = result.stdout if frame: md5.update(frame) process_frame_bytes(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Castr frame for camid {camid}") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: logger.error(f"Castr failed for camid {camid} (timeout or process error): {e}") failed_download(camid, str(e)) except Exception as e: logger.error(f"Castr failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_youtube(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) info = ydl.extract_info(url, download=False) stream_url = info['url'] get_host_delay(stream_url) cap = cv2.VideoCapture(stream_url) if cap.isOpened(): _, frame = cap.read() cap.release() if frame is not None: md5.update(frame.tobytes()) frame = resize_frame(frame) process_frame(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed YouTube frame for camid {camid}") except Exception as e: logger.error(f"YouTube failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_bloomsky(entry): camid, url, *_ = entry if check_if_daytime(): handle_normal(entry) def handle_ivideon(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) referrer = f'https://open.ivideon.io/{url.rsplit("/", 1)[-1]}' args = ( ffmpeg .input(url, headers=f'Referer: {referrer}\r\n') .filter('scale', 800, -1) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .compile() ) args = [args[0]] + ['-nostats', '-loglevel', 'error'] + args[1:] result = subprocess.run(args, capture_output=True, timeout=15, check=True) frame = result.stdout if frame: md5.update(frame) process_frame_bytes(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Ivideon frame for camid {camid}") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: logger.error(f"Ivideon failed for camid {camid} (timeout or process error): {e}") failed_download(camid, str(e)) except Exception as e: logger.error(f"Ivideon failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_wetmet(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) referrer = 'https://api.wetmet.net/' args = ( ffmpeg .input(url, headers=f'Referer: {referrer}\r\n') .filter('scale', 800, -1) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .compile() ) args = [args[0]] + ['-nostats', '-loglevel', 'error'] + args[1:] result = subprocess.run(args, capture_output=True, timeout=15, check=True) frame = result.stdout if frame: md5.update(frame) process_frame_bytes(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Wetmet frame for camid {camid}") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: logger.error(f"Wetmet failed for camid {camid} (timeout or process error): {e}") failed_download(camid, str(e)) except Exception as e: logger.error(f"Wetmet failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_rtspme(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) service = Service(executable_path=r'/usr/bin/chromedriver') options = Options() options.add_argument("--headless") driver = webdriver.Chrome(service=service, options=options) driver.get(url) hlsurl = driver.execute_script('return ur_t') driver.close() get_host_delay(hlsurl) cap = cv2.VideoCapture(hlsurl) if cap.isOpened(): _, frame = cap.read() cap.release() if frame is not None: md5.update(frame.tobytes()) frame = resize_frame(frame) process_frame(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed RTSPME frame for camid {camid}") except Exception as e: logger.error(f"RTSPME failed for camid {camid}: {e}") failed_download(camid, str(e)) def handle_rail(entry): camid, url, *_ = entry md5 = hashlib.md5() try: get_host_delay(url) r = requests.get(url, proxies=proxies) p = re.search(r"\"streaming\": \"(.*)\",", r.text) stream_url = p.group(1) get_host_delay(stream_url) cap = cv2.VideoCapture(stream_url) if cap.isOpened(): _, frame = cap.read() cap.release() if frame is not None: md5.update(frame.tobytes()) frame = resize_frame(frame) process_frame(frame, md5.hexdigest(), camid) logger.debug(f"Successfully processed Rail frame for camid {camid}") except Exception as e: logger.error(f"Rail failed for camid {camid}: {e}") failed_download(camid, str(e)) def process_image_response(content, headers, camid, url): md5 = hashlib.md5(content) nparr = np.frombuffer(content, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is not None: frame = resize_frame(frame) ext = guess_ext(headers.get('content-type', '')) process_frame(frame, md5.hexdigest(), camid, ext) def process_frame(frame, hash_value, camid, ext='.jpg'): path = f"{CAM_DIR}{camid}/{hash_value}{ext}" relpath = f"{camid}/{hash_value}{ext}" save_frame(frame, path, camid) camhandler(path, camid, relpath) def process_frame_bytes(frame_bytes, hash_value, camid, ext='.jpg'): path = f"{CAM_DIR}{camid}/{hash_value}{ext}" relpath = f"{camid}/{hash_value}{ext}" if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, 'wb') as f: f.write(frame_bytes) camhandler(path, camid, relpath) def camhandler(path, camid, relpath): if not os.path.exists(path): return fsize = os.path.getsize(path) if fsize < 100: os.remove(path) failed_download(camid, "File size too small") return with conn.cursor() as cur: cur.execute( "INSERT INTO camdb (camid, filepath) VALUES (%s, %s) ON CONFLICT ON CONSTRAINT filepath DO NOTHING", (camid, relpath) ) cur.execute( "UPDATE cams SET lasttry = current_timestamp, totalfails = totalfails + 1, lastfailed = True " "WHERE camid = %s AND lastimage = %s", (camid, relpath) ) cur.execute( "UPDATE cams SET lasttry = current_timestamp, lastsuccess = current_timestamp, totalfails = 0, " "lastfailed = False, lastimage = %s WHERE camid = %s AND (lastimage <> %s OR lastimage IS NULL)", (relpath, camid, relpath) ) if not hasattr(commit_counter, 'count'): commit_counter.count = 0 commit_counter.count += 1 if commit_counter.count >= COMMIT_INTERVAL: conn.commit() commit_counter.count = 0 logger.debug(f"Committed database changes after processing {COMMIT_INTERVAL} cameras") make_thumbnail(path, camid) def make_thumbnail(path, camid): try: im = Image.open(path) im.thumbnail((320, 320), Image.Resampling.LANCZOS) im.save(f"{CAM_DIR}{camid}/latest.jpg", "JPEG") except Exception as e: logger.error(f"Thumbnail generation failed for camid {camid}: {e}") def failed_download(camid, error_msg=None): with conn.cursor() as cur: if error_msg: # Truncate error message if it's too long for the column if len(error_msg) > 255: error_msg = error_msg[:252] + "..." cur.execute( "UPDATE cams SET lastfailed = True, totalfails = totalfails + 1, lasttry = current_timestamp, " "errorcode = %s WHERE camid = %s", (error_msg, camid) ) else: cur.execute( "UPDATE cams SET lastfailed = True, totalfails = totalfails + 1, lasttry = current_timestamp " "WHERE camid = %s", (camid,) ) conn.commit() logger.debug(f"Marked download as failed and committed for camid {camid}") def clean_up(entry): camid, keephours, lastsuccess, active = entry keephours = 7*24 if not keephours else keephours if not active: shutil.rmtree(f"{CAM_DIR}{camid}", ignore_errors=True) with conn.cursor() as cur: cur.execute( "SELECT filepath, dateutc FROM camdb WHERE camid = %s AND " "(EXTRACT(EPOCH FROM (current_timestamp - dateutc))/3600) > %s", (camid, keephours) ) camfiles = cur.fetchall() for filepath, _ in camfiles: try: os.remove(f"{CAM_DIR}{filepath}") except: pass cur.execute("DELETE FROM camdb WHERE filepath = %s", (filepath,)) conn.commit() logger.debug(f"Cleanup completed for camid {camid}") def fetch_cameras(): cursor.execute(""" SELECT camid, url, method, interval, keephours, lastsuccess FROM cams WHERE ((totalfails < 5 OR totalfails IS NULL) AND active = True) AND ((EXTRACT(EPOCH FROM (current_timestamp - lastsuccess))/60) > interval OR lastsuccess IS NULL) AND (lastfailed IS NULL OR lastfailed = false OR (lastfailed = True AND (EXTRACT(EPOCH FROM (current_timestamp - lasttry))/60) > interval/4)) """) return cursor.fetchall() def fetch_failed_cameras(): cursor.execute(""" SELECT camid, url, method, interval, keephours, lasttry FROM cams WHERE (totalfails >= 5 AND active <> false AND ((EXTRACT(EPOCH FROM (current_timestamp - lasttry))/60) > interval)) """) return cursor.fetchall() def fetch_cleanup_cameras(): cursor.execute("SELECT camid, keephours, lastsuccess, active FROM cams") return cursor.fetchall() def main(): logger.info("Starting camera processing") ipcamlive_update() conn.commit() allcams = fetch_cameras() logger.info(f"Processing {len(allcams)} active cameras") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(get_camera_handler(cam[2]), cam) for cam in allcams] for future in futures: try: future.result() except Exception as e: logger.error(f"Task execution failed: {e}") conn.commit() logger.debug("Final commit for active cameras") failedcams = fetch_failed_cameras() logger.info(f"Processing {len(failedcams)} failed cameras") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(get_camera_handler(cam[2]), cam) for cam in failedcams] for future in futures: try: #logger.info(future) future.result() except Exception as e: logger.error(f"Failed camera task execution failed: {e}") conn.commit() logger.debug("Final commit for failed cameras") cleancams = fetch_cleanup_cameras() logger.info(f"Performing cleanup on {len(cleancams)} cameras") for cam in cleancams: clean_up(cam) logger.info("Camera processing completed") if __name__ == "__main__": try: main() except Exception as e: logger.critical(f"Main execution failed: {e}") conn.rollback() finally: cursor.close() conn.close()