From b4ad6dd3d9a356c58ec505f07637c76b7f1f65c4 Mon Sep 17 00:00:00 2001
From: tuckersiegel <siegel.tucker@gmail.com>
Date: Mon, 22 Feb 2021 14:05:42 -0500
Subject: [PATCH] implement cron job base python file, as well as upgrades to
 data processing

---
 preprocess.py    |  58 -----------------
 pull_data.py     | 159 ++++++++++++++++++++++++++++++++++++++++++++++-
 requirements.txt |   3 +-
 to_mongo.py      |  26 ++++++--
 4 files changed, 179 insertions(+), 67 deletions(-)
 delete mode 100644 preprocess.py

diff --git a/preprocess.py b/preprocess.py
deleted file mode 100644
index a0914ff..0000000
--- a/preprocess.py
+++ /dev/null
@@ -1,58 +0,0 @@
-import gdal
-import numpy as np
-import xarray as xr
-import random 
-
-def to_freedom_units(data):
-    return (data * 9/5) + 32 
-
-
-def ReadBilFile(bil):
-
-    gdal.GetDriverByName('EHdr').Register()
-    img = gdal.Open(bil)
-    band = img.GetRasterBand(1)
-    data = band.ReadAsArray()
-    return data
-
-if __name__ == '__main__':
-
-    import glob 
-    years = list(range(2020, 2020 + 1))
-
-
-    for y in years:
-        print (y)
-        all_ = []
-        print ("data/PRISM/tmin/PRISM_tmin_stable_4kmD2_%s0101_%s1231/*.bil" % (y, y))
-        for file in sorted(list(glob.glob("data/PRISM/tmin/PRISM_tmin_stable_4kmD2_%s0101_%s1231_bil/*.bil" % (y, y)))):
-            # print (file)
-            a = ReadBilFile(file)
-            a[a == -9999] = np.nan
-            a = np.flipud(a)
-            all_.append(a)
-
-        all_ = np.stack(all_)
-        all_ = to_freedom_units(all_)
-        tmin = xr.Variable(['date', 'row', 'col'], all_, attrs={"long_name": "min temperature in f"}).chunk({"date": -1}).astype(np.dtype(np.float32))
-        all_ = []
-        print ("data/PRISM/tmax/PRISM_tmax_stable_4kmD2_%s0101_%s1231/*.bil" % (y, y))
-        for file in sorted(list(glob.glob("data/PRISM/tmax/PRISM_tmax_stable_4kmD2_%s0101_%s1231_bil/*.bil" % (y, y)))):
-            # print (file)
-            a = ReadBilFile(file)
-            a[a == -9999] = np.nan
-            a = np.flipud(a)
-            all_.append(a)
-
-        all_ = np.stack(all_)
-        all_ = to_freedom_units(all_)
-        tmax = xr.Variable(['date', 'row', 'col'], all_, attrs={"long_name": "max temperature in f"}).chunk({"date": -1}).astype(np.dtype(np.float32))
-        
-        data = xr.Dataset(
-            {
-                "tmax": tmax,
-                "tmin": tmin
-            }, 
-        )
-        print (data)
-        data.to_netcdf("data/temps_%s.nc" % y)
diff --git a/pull_data.py b/pull_data.py
index 701ee1d..08cbb5e 100644
--- a/pull_data.py
+++ b/pull_data.py
@@ -1,9 +1,162 @@
+import os, zipfile, tqdm
+import gdal, shutil, datetime
+import numpy as np
+import xarray as xr
 from ftplib import FTP
+from multiprocessing.pool import ThreadPool
+from multiprocessing import Lock
+from pymongo import MongoClient
+
+def to_freedom_units(data):
+    return (data * 9/5) + 32 
+
+def read_bil_file(bil):
+
+    gdal.GetDriverByName('EHdr').Register()
+    img = gdal.Open(bil)
+    band = img.GetRasterBand(1)
+    data = band.ReadAsArray()
+    return data
+
+client = MongoClient("mongodb+srv://gdd-server:u8i3icLAJXjZEhTs@cluster0.wdxf4.mongodb.net")
+
+db = client["gdd_database"]
+
+gdd = db.gdd_current
+
+gdd.drop()
+
+gdd = db["gdd_current"]
+
+resp = gdd.create_index([ ("location", "2dsphere") ])
+resp = gdd.create_index([ ("year", 1) ])
+
+def process_file(data):
+    file, lock, data_type = data
+
+    ftp = FTP(ftp_url)
+    ftp.login(user=ftp_user, passwd=ftp_pass)
+    ftp.cwd("daily")
+    ftp.cwd(data_type)
+    ftp.cwd("2021")
+
+    base_name = file.split(".")[0]
+    
+    with open(file, 'wb') as f:
+        ftp.retrbinary('RETR ' + file, f.write)
+    ftp.quit()
+    
+    with zipfile.ZipFile(file, "r") as f:
+        f.extractall(base_name)
+    
+    data = read_bil_file("%s/%s.bil" % (base_name, base_name))
+
+    data[data == -9999] = np.nan
+    data = np.flipud(data)
+    shutil.rmtree(base_name)
+    os.remove(file)
+
+    return (data, base_name.split("_")[4])
 
 ftp_url = "prism.nacse.org"
 ftp_user = "anonymous"
 ftp_pass = "tgsiegel@umd.edu"
 
-ftp = FTP(ftp_url)
-ftp.login(user=ftp_user, passwd=ftp_pass)
-ftp.dir()
\ No newline at end of file
+def run(year, dtype):
+
+    ftp = FTP(ftp_url)
+    ftp.login(user=ftp_user, passwd=ftp_pass)
+    ftp.cwd("daily")
+    ftp.cwd(dtype)
+    ftp.cwd(str(year))
+    files = ftp.nlst()
+    ftp.quit()
+
+    all_data = []
+
+    files.sort(key=lambda x: x.split("_")[4])
+    # files = files[-10:]
+    lock = Lock()
+    locks = [lock] * len(files)
+    types = [dtype] * len(files)
+
+    pool = ThreadPool(10)
+
+    results = pool.map(process_file, zip(files, locks, types))
+
+    data = [r[0] for r in results]
+    data = to_freedom_units(np.stack(data))
+    return data 
+
+year = 2021
+print ("Pulling tmax data")
+tmax_data = run(year, "tmax")
+print ("Pulling tmin data")
+tmin_data = run(year, "tmin")
+
+coords = xr.open_dataset("coords.nc")
+lat = coords.latitude.data
+lon = coords.longitude.data
+
+soy = np.datetime64("%s-01-01" % year)
+x = np.where(~np.isnan(np.nanmean(tmin_data, axis=0)))
+lat = lat[::-1]
+
+# FORCE LOCATIONS TO COLLEGE PARK, LAT 38.99 LON -76.94 BECAUSE OF ATLAS LIMIT
+
+a1 = np.where(38 < lat)[0].tolist()
+a2 = np.where(lat < 40)[0].tolist()
+lat_a = np.array(list(set(a1) & set(a2)))
+
+a1 = np.where(-77 < lon)[0].tolist()
+a2 = np.where(lon < -75)[0].tolist()
+lon_a = np.array(list(set(a1) & set(a2)))
+
+x1 = np.array(np.meshgrid(lat_a, lon_a)).T.reshape(len(lat_a) * len(lon_a), 2).tolist()
+x1 = [(z[0], z[1]) for z in x1]
+x2 = [(a, b) for a, b in zip(x[0], x[1])] # fix to x = [..... (x[0], x[1])] and all limiting stuff above and below when atlas limit removed
+
+x = list(set(x1) & set(x2))
+
+
+tmins = tmin_data
+tmaxs = tmax_data
+
+locs = []
+print ("uploading to mongo")
+count = 0
+for i in tqdm.tqdm(x):
+    if len(locs) % 100 == 0 and len(locs) != 0:
+        new_result = gdd.insert_many(locs)
+        locs = []
+
+    tmin_ = tmins[:, i[0], i[1]]
+    tmax_ = tmaxs[:, i[0], i[1]]
+    
+    lat_ = lat[i[0]]
+    lon_ = lon[i[1]]
+
+    a = i
+
+    t = {}
+
+    _id = str(year) + "_"
+
+    _id += str(a[0]) + "_" + str(a[1])
+    
+    t["location"] = {"type": "Point", "coordinates": [float(lon_), float(lat_)]}
+    t["prism_lat"] = int(a[0])
+    t["prism_lon"] = int(a[1])
+    
+    t["last_date"] = datetime.datetime.strptime(str(soy + np.timedelta64(len(tmin_) - 1, "D")) , "%Y-%m-%d")
+    t["year"] = int(year)
+    t["min_temps"] = list([float(a) for a in tmin_])
+    t["max_temps"] = list([float(a) for a in tmax_])
+    t["_id"] = _id
+    
+    locs.append(t)
+    
+    count += 1
+
+if len(locs) != 0:
+    new_result = gdd.insert_many(locs)
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index f886252..b1e18d9 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,4 +2,5 @@ xarray
 numpy
 gdal
 tqdm
-pymongo
\ No newline at end of file
+pymongo
+dnspython
\ No newline at end of file
diff --git a/to_mongo.py b/to_mongo.py
index 1e33c21..9ad11e5 100644
--- a/to_mongo.py
+++ b/to_mongo.py
@@ -4,7 +4,7 @@ import tqdm
 import datetime
 from pymongo import MongoClient
 
-client = MongoClient('0.0.0.0', 27017)
+client = MongoClient("mongodb+srv://gdd-server:u8i3icLAJXjZEhTs@cluster0.wdxf4.mongodb.net")
 
 db = client["gdd_database"]
 
@@ -20,17 +20,31 @@ resp = gdd.create_index([ ("year", 1) ])
 coords = xr.open_dataset("coords.nc")
 lat = coords.latitude.data
 lon = coords.longitude.data
+lat = lat[::-1]
 
-years = list(range(1981, 1982 + 1))
+years = list(range(1981, 2020 + 1))
 
 for year in years:
     soy = np.datetime64("%s-01-01" % year)
     print (year)
-    data = xr.open_dataset("data/temps_2019.nc")
+    data = xr.open_dataset("data/temps_%s.nc" % year)
     x = np.where(~np.isnan(np.nanmean(data.tmin.data, axis=0)))
-    x = [(a, b) for a, b in zip(x[0], x[1])]
+    # x = [(a, b) for a, b in zip(lat_a, lon_a)] # fix to x[0], x[1] when atlas limit removed
+    # FORCE LOCATIONS TO COLLEGE PARK, LAT 38.99 LON -76.94 BECAUSE OF ATLAS LIMIT
 
-    lat = lat[::-1]
+    a1 = np.where(38.5 < lat)[0].tolist()
+    a2 = np.where(lat < 39.5)[0].tolist()
+    lat_a = np.array(list(set(a1) & set(a2)))
+
+    a1 = np.where(-77 < lon)[0].tolist()
+    a2 = np.where(lon < -76)[0].tolist()
+    lon_a = np.array(list(set(a1) & set(a2)))
+
+    x1 = np.array(np.meshgrid(lat_a, lon_a)).T.reshape(len(lat_a) * len(lon_a), 2).tolist()
+    x1 = [(z[0], z[1]) for z in x1]
+    x2 = [(a, b) for a, b in zip(x[0], x[1])] # fix to x = [..... (x[0], x[1])] and all limiting stuff above and below when atlas limit removed
+
+    x = list(set(x1) & set(x2))
 
     tmins = data.tmin.data
     tmaxs = data.tmax.data 
@@ -71,3 +85,5 @@ for year in years:
         
         count += 1
 
+if len(locs) != 0:
+    new_result = gdd.insert_many(locs)
\ No newline at end of file
-- 
GitLab