Skip to content
Snippets Groups Projects
Commit b4ad6dd3 authored by tuckersiegel's avatar tuckersiegel
Browse files

implement cron job base python file, as well as upgrades to data processing

parent 0ca89e34
No related branches found
No related tags found
1 merge request!3Cron job
import gdal
import numpy as np
import xarray as xr
import random
def to_freedom_units(data):
return (data * 9/5) + 32
def ReadBilFile(bil):
gdal.GetDriverByName('EHdr').Register()
img = gdal.Open(bil)
band = img.GetRasterBand(1)
data = band.ReadAsArray()
return data
if __name__ == '__main__':
import glob
years = list(range(2020, 2020 + 1))
for y in years:
print (y)
all_ = []
print ("data/PRISM/tmin/PRISM_tmin_stable_4kmD2_%s0101_%s1231/*.bil" % (y, y))
for file in sorted(list(glob.glob("data/PRISM/tmin/PRISM_tmin_stable_4kmD2_%s0101_%s1231_bil/*.bil" % (y, y)))):
# print (file)
a = ReadBilFile(file)
a[a == -9999] = np.nan
a = np.flipud(a)
all_.append(a)
all_ = np.stack(all_)
all_ = to_freedom_units(all_)
tmin = xr.Variable(['date', 'row', 'col'], all_, attrs={"long_name": "min temperature in f"}).chunk({"date": -1}).astype(np.dtype(np.float32))
all_ = []
print ("data/PRISM/tmax/PRISM_tmax_stable_4kmD2_%s0101_%s1231/*.bil" % (y, y))
for file in sorted(list(glob.glob("data/PRISM/tmax/PRISM_tmax_stable_4kmD2_%s0101_%s1231_bil/*.bil" % (y, y)))):
# print (file)
a = ReadBilFile(file)
a[a == -9999] = np.nan
a = np.flipud(a)
all_.append(a)
all_ = np.stack(all_)
all_ = to_freedom_units(all_)
tmax = xr.Variable(['date', 'row', 'col'], all_, attrs={"long_name": "max temperature in f"}).chunk({"date": -1}).astype(np.dtype(np.float32))
data = xr.Dataset(
{
"tmax": tmax,
"tmin": tmin
},
)
print (data)
data.to_netcdf("data/temps_%s.nc" % y)
import os, zipfile, tqdm
import gdal, shutil, datetime
import numpy as np
import xarray as xr
from ftplib import FTP
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
from pymongo import MongoClient
def to_freedom_units(data):
return (data * 9/5) + 32
def read_bil_file(bil):
gdal.GetDriverByName('EHdr').Register()
img = gdal.Open(bil)
band = img.GetRasterBand(1)
data = band.ReadAsArray()
return data
client = MongoClient("mongodb+srv://gdd-server:u8i3icLAJXjZEhTs@cluster0.wdxf4.mongodb.net")
db = client["gdd_database"]
gdd = db.gdd_current
gdd.drop()
gdd = db["gdd_current"]
resp = gdd.create_index([ ("location", "2dsphere") ])
resp = gdd.create_index([ ("year", 1) ])
def process_file(data):
file, lock, data_type = data
ftp = FTP(ftp_url)
ftp.login(user=ftp_user, passwd=ftp_pass)
ftp.cwd("daily")
ftp.cwd(data_type)
ftp.cwd("2021")
base_name = file.split(".")[0]
with open(file, 'wb') as f:
ftp.retrbinary('RETR ' + file, f.write)
ftp.quit()
with zipfile.ZipFile(file, "r") as f:
f.extractall(base_name)
data = read_bil_file("%s/%s.bil" % (base_name, base_name))
data[data == -9999] = np.nan
data = np.flipud(data)
shutil.rmtree(base_name)
os.remove(file)
return (data, base_name.split("_")[4])
ftp_url = "prism.nacse.org"
ftp_user = "anonymous"
ftp_pass = "tgsiegel@umd.edu"
ftp = FTP(ftp_url)
ftp.login(user=ftp_user, passwd=ftp_pass)
ftp.dir()
\ No newline at end of file
def run(year, dtype):
ftp = FTP(ftp_url)
ftp.login(user=ftp_user, passwd=ftp_pass)
ftp.cwd("daily")
ftp.cwd(dtype)
ftp.cwd(str(year))
files = ftp.nlst()
ftp.quit()
all_data = []
files.sort(key=lambda x: x.split("_")[4])
# files = files[-10:]
lock = Lock()
locks = [lock] * len(files)
types = [dtype] * len(files)
pool = ThreadPool(10)
results = pool.map(process_file, zip(files, locks, types))
data = [r[0] for r in results]
data = to_freedom_units(np.stack(data))
return data
year = 2021
print ("Pulling tmax data")
tmax_data = run(year, "tmax")
print ("Pulling tmin data")
tmin_data = run(year, "tmin")
coords = xr.open_dataset("coords.nc")
lat = coords.latitude.data
lon = coords.longitude.data
soy = np.datetime64("%s-01-01" % year)
x = np.where(~np.isnan(np.nanmean(tmin_data, axis=0)))
lat = lat[::-1]
# FORCE LOCATIONS TO COLLEGE PARK, LAT 38.99 LON -76.94 BECAUSE OF ATLAS LIMIT
a1 = np.where(38 < lat)[0].tolist()
a2 = np.where(lat < 40)[0].tolist()
lat_a = np.array(list(set(a1) & set(a2)))
a1 = np.where(-77 < lon)[0].tolist()
a2 = np.where(lon < -75)[0].tolist()
lon_a = np.array(list(set(a1) & set(a2)))
x1 = np.array(np.meshgrid(lat_a, lon_a)).T.reshape(len(lat_a) * len(lon_a), 2).tolist()
x1 = [(z[0], z[1]) for z in x1]
x2 = [(a, b) for a, b in zip(x[0], x[1])] # fix to x = [..... (x[0], x[1])] and all limiting stuff above and below when atlas limit removed
x = list(set(x1) & set(x2))
tmins = tmin_data
tmaxs = tmax_data
locs = []
print ("uploading to mongo")
count = 0
for i in tqdm.tqdm(x):
if len(locs) % 100 == 0 and len(locs) != 0:
new_result = gdd.insert_many(locs)
locs = []
tmin_ = tmins[:, i[0], i[1]]
tmax_ = tmaxs[:, i[0], i[1]]
lat_ = lat[i[0]]
lon_ = lon[i[1]]
a = i
t = {}
_id = str(year) + "_"
_id += str(a[0]) + "_" + str(a[1])
t["location"] = {"type": "Point", "coordinates": [float(lon_), float(lat_)]}
t["prism_lat"] = int(a[0])
t["prism_lon"] = int(a[1])
t["last_date"] = datetime.datetime.strptime(str(soy + np.timedelta64(len(tmin_) - 1, "D")) , "%Y-%m-%d")
t["year"] = int(year)
t["min_temps"] = list([float(a) for a in tmin_])
t["max_temps"] = list([float(a) for a in tmax_])
t["_id"] = _id
locs.append(t)
count += 1
if len(locs) != 0:
new_result = gdd.insert_many(locs)
\ No newline at end of file
......@@ -2,4 +2,5 @@ xarray
numpy
gdal
tqdm
pymongo
\ No newline at end of file
pymongo
dnspython
\ No newline at end of file
......@@ -4,7 +4,7 @@ import tqdm
import datetime
from pymongo import MongoClient
client = MongoClient('0.0.0.0', 27017)
client = MongoClient("mongodb+srv://gdd-server:u8i3icLAJXjZEhTs@cluster0.wdxf4.mongodb.net")
db = client["gdd_database"]
......@@ -20,17 +20,31 @@ resp = gdd.create_index([ ("year", 1) ])
coords = xr.open_dataset("coords.nc")
lat = coords.latitude.data
lon = coords.longitude.data
lat = lat[::-1]
years = list(range(1981, 1982 + 1))
years = list(range(1981, 2020 + 1))
for year in years:
soy = np.datetime64("%s-01-01" % year)
print (year)
data = xr.open_dataset("data/temps_2019.nc")
data = xr.open_dataset("data/temps_%s.nc" % year)
x = np.where(~np.isnan(np.nanmean(data.tmin.data, axis=0)))
x = [(a, b) for a, b in zip(x[0], x[1])]
# x = [(a, b) for a, b in zip(lat_a, lon_a)] # fix to x[0], x[1] when atlas limit removed
# FORCE LOCATIONS TO COLLEGE PARK, LAT 38.99 LON -76.94 BECAUSE OF ATLAS LIMIT
lat = lat[::-1]
a1 = np.where(38.5 < lat)[0].tolist()
a2 = np.where(lat < 39.5)[0].tolist()
lat_a = np.array(list(set(a1) & set(a2)))
a1 = np.where(-77 < lon)[0].tolist()
a2 = np.where(lon < -76)[0].tolist()
lon_a = np.array(list(set(a1) & set(a2)))
x1 = np.array(np.meshgrid(lat_a, lon_a)).T.reshape(len(lat_a) * len(lon_a), 2).tolist()
x1 = [(z[0], z[1]) for z in x1]
x2 = [(a, b) for a, b in zip(x[0], x[1])] # fix to x = [..... (x[0], x[1])] and all limiting stuff above and below when atlas limit removed
x = list(set(x1) & set(x2))
tmins = data.tmin.data
tmaxs = data.tmax.data
......@@ -71,3 +85,5 @@ for year in years:
count += 1
if len(locs) != 0:
new_result = gdd.insert_many(locs)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment