Files
test/warncheck.py
2025-11-27 22:25:36 +00:00

101 lines
4.4 KiB
Python

import requests
import json
import psycopg2
import psycopg2.extensions
from datetime import datetime, timezone
import re
conn = psycopg2.connect(host='localhost', database='nws', user='nws', password='nws')
cursor = conn.cursor()
S = requests.Session()
#select power.realgeom from power,svr where st_contains(svr.nwspoly,power.realgeom)
def decode_vtec(pvtec_string):
pattern = r"/([A-Z]+)\.([A-Z]+)\.([A-Z]+)\.([A-Z]+)\.([A-Z])\.([0-9]+)\.([0-9A-Z]+)T([0-9A-Z]+)-([0-9A-Z]+)T([0-9A-Z]+)/"
match = re.match(pattern, pvtec_string)
if match:
action_code = match.group(1)
phenomena = match.group(2)
office = match.group(3)
significance = match.group(4)
event_type = match.group(5)
event_tracking_number = match.group(6)
result = [action_code,phenomena,significance,event_type,event_tracking_number,office]
else:
print("P-VTEC string format not recognized.")
return result
def new_check_for_products():
products = json.loads(S.get(allalerts, headers=headers).text)
for i in products['features']:
for j in i['properties']['parameters']['AWIPSidentifier']:
#print(j,' first loop')
if j == 'SVRRLX' or j == 'TORRLX':
warntype = j
geometry = i['geometry']
gisgeom = json.dumps(geometry)
issuetime = i['properties']['sent']
issuetime = datetime.strptime(issuetime,"%Y-%m-%dT%H:%M:%S%f%z")
endtime = i['properties']['ends']
endtime = datetime.strptime(endtime,"%Y-%m-%dT%H:%M:%S%f%z")
vtec = i['properties']['parameters']['VTEC'][0]
sql = "insert into svr (nwspoly, issue, endtime, warntype, vtec) values (ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326),%s,%s,%s,%s) on conflict (vtec) do nothing"
parms = (gisgeom,issuetime,endtime,warntype,vtec)
cursor.execute(sql,parms)
conn.commit()
for i in products['features']:
for j in i['properties']['parameters']['AWIPSidentifier']:
# print(j,'second loop')
try:
if 'SVS' in j or 'SVR' in j or 'TOR' in j or 'FFW' in j or 'FFS' in j or 'FLW' in j or 'FLS' in j:
warntype = j
geometry = i['geometry']
gisgeom = json.dumps(geometry)
issuetime = i['properties']['sent']
issuetime = datetime.strptime(issuetime,"%Y-%m-%dT%H:%M:%S%f%z")
endtime = i['properties']['ends']
print(endtime)
endtime = datetime.strptime(endtime,"%Y-%m-%dT%H:%M:%S%f%z")
year = issuetime.year
vtec = i['properties']['parameters']['VTEC'][0]
pil = i['properties']['parameters']['AWIPSidentifier'][0]
ugc = i['properties']['geocode']['UGC']
vtecelements = decode_vtec(vtec)
etin = vtecelements[4]
operational = vtecelements[0]
svstype = vtecelements[1]
warntype = vtecelements[2]
eventtype = vtecelements[3]
office = vtecelements[5]
print(ugc)
sql = "insert into warntracker (nwspoly, issue, endtime, warntype, vtectext,etin,svstype,pil,ugc,year,office,sig) values (ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326),%s,%s,%s,%s,%s,%s,%s,%s, %s,%s,%s) on conflict do nothing"
parms = (gisgeom,issuetime,endtime,warntype,vtec,etin,svstype,pil,ugc,year,office,eventtype)
print(parms)
cursor.execute(sql,parms)
conn.commit()
except Exception as e:
print(i,e)
#new_check_for_products()
conn.commit()
cursor.close()
conn.close()