-
Tucker Gary Siegel authoredTucker Gary Siegel authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
forecast_service.go 11.43 KiB
package services
import (
"math"
"sort"
"time"
"github.com/tgs266/dawn-go-common/common"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/models"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/models/enums"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/persistence"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/persistence/entities"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/utils"
"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/utils/dispatch"
)
// this just adjusts cfs by accumulating it with the base value being the provided accumulated value
func calculateAccumulatedCfsBasedOnAccumulatedObserved(product enums.Product, accumulated float64, cfs []entities.CfsGdd) [][]float64 {
out := [][]float64{}
for _, c := range cfs {
tempAccum := accumulated
temp := []float64{}
for i := range c.MinTemps {
tempAccum = tempAccum + utils.CalculateSingleGdd(c.MinTemps[i], c.MaxTemps[i], product)
temp = append(temp, tempAccum)
}
out = append(out, temp)
}
return out
}
// Since cfs is multiple predictions of a single time frame, we need to boil down
// the data into an array of dates to then bin later on
func getDatesForCfsMatches(cfs [][]float64, lastDateInt int, currentMatch int, keys []string, matches map[string]float64) map[string][]int {
out := map[string][]int{}
for _, c := range cfs {
lastDist := matches[keys[currentMatch]]
tempCMatch := currentMatch
for i, v := range c {
dist := math.Abs(matches[keys[currentMatch]] - v)
// check if the last value is closer than the current. if it is, then the last value is the one to return
if dist > lastDist {
if v, exists := out[keys[tempCMatch]]; exists {
out[keys[tempCMatch]] = append(v, lastDateInt+i-1)
} else {
out[keys[tempCMatch]] = []int{lastDateInt + i - 1}
}
tempCMatch += 1
if tempCMatch >= len(keys) {
break
}
}
lastDist = dist
}
}
return out
}
// bin array of dates into a frequency map and return that map and keys
func bin(data []int) (map[int]int, []int) {
out := map[int]int{}
keys := []int{}
for _, d := range data {
if count, exists := out[d]; exists {
out[d] = count + 1
} else {
out[d] = 1
keys = append(keys, d)
}
}
return out, keys
}
// get the highest frequency bin
func getLargestBinCount(bins map[int]int) int {
largest := 0
largestKey := 0
for key, count := range bins {
if count > largest {
largest = count
largestKey = key
}
}
return largestKey
}
// get keys of the stage matches in sorted order
func getMatchKeys(matches map[string]float64) []string {
keys := make([]string, 0, len(matches))
for key := range matches {
keys = append(keys, key)
}
sort.SliceStable(keys, func(i, j int) bool {
return matches[keys[i]] < matches[keys[j]]
})
return keys
}
/* this does the actual forecasting
hardcode the alpha (for lagrange smoothing) and binCount
*/
func forecast(ctx common.DawnCtx, gddReq models.GddRequest, plantdate time.Time, matches map[string]float64, observed models.GddResponse, cfs []entities.CfsGdd) map[string]*models.Bins {
alpha := 1.0
binCount := 5
product := enums.GetProductFromString(gddReq.Product)
start := plantdate.YearDay()
if plantdate.Year()%4 == 0 && plantdate.Year()%100 != 0 || plantdate.Year()%400 == 0 {
start -= 1
}
out := map[string]*models.Bins{}
// need to get the match order
keys := getMatchKeys(matches)
currentMatch := 0 // first match
// start at plantdate, begin accumulating OBSERVED until we run out
// since this observed, we should only have 1 bin with a value of 100%
// at the end, we should have all observed stages along with all accumulated gdds of the planting year
observedValues := observed.GddValues[start:]
accumulatedGdds := 0.0
lastDist := matches[keys[currentMatch]]
date := 0
for i := 0; i < len(observedValues); i++ {
date = i
dist := math.Abs(matches[keys[currentMatch]] - accumulatedGdds)
// check if the last value is closer than the current. if it is, then the last value is the one to return
if dist > lastDist {
out[keys[currentMatch]] = &models.Bins{
Bins: []models.Bin{
{
Value: 1,
Date: plantdate.AddDate(0, 0, i-1),
},
},
}
currentMatch += 1
if currentMatch >= len(keys) {
break
}
dist = matches[keys[currentMatch]]
}
accumulatedGdds += observedValues[i]
lastDist = dist
}
if currentMatch == len(keys) {
return out
}
// adjust cfs values to start at the accumulated value
adjustedCfs := calculateAccumulatedCfsBasedOnAccumulatedObserved(product, accumulatedGdds, cfs)
cfsHist := getDatesForCfsMatches(adjustedCfs, date, currentMatch, keys, matches)
// this loop will actually build the 5 bins
for k, v := range cfsHist {
binnedDates, keys := bin(v)
stepSize := int(math.Ceil(AvgDiff(keys)) + 1) // add 1 to increase range and for uncertainty
largestKey := getLargestBinCount(binnedDates)
sort.Ints(keys)
min := largestKey - stepSize*2
temp := map[int]int{}
// need to rebin now
sum := 0
// we force the highest frequency to be the center date, then find dates on each side that fit
for i := 0; i < binCount; i++ {
for date, v := range binnedDates {
if min <= date && date < min+stepSize {
sum += v
if count, exists := temp[min]; exists {
temp[min] = count + v
} else {
temp[min] = v
}
}
}
min += stepSize
}
// convert to output model
bins := []models.Bin{}
for d, v := range temp {
bins = append(bins, models.Bin{
Value: (float64(v) + alpha) / (float64(sum) + float64(binCount)*alpha),
Date: plantdate.AddDate(0, 0, d),
})
}
out[k] = &models.Bins{
Bins: bins,
}
}
return out
}
// will use normals to determine dates for each stage
//
// this function allows normals to "wrap around" to get continuous values
func comparisonNormals(request models.GddRequest, plantdate time.Time, matches map[string]float64) map[string]time.Time {
normals := GetNormalValues(request)
observedValues := normals.GddValues
accumulatedGdds := 0.0
currentMatch := 0
keys := getMatchKeys(matches)
lastDist := matches[keys[currentMatch]]
start := plantdate.YearDay()
year := plantdate.Year()
if year%4 == 0 && year%100 != 0 || year%400 == 0 {
start -= 1
}
i := start
date := 0
out := map[string]time.Time{}
for {
dist := math.Abs(matches[keys[currentMatch]] - accumulatedGdds)
// check if the last value is closer than the current. if it is, then the last value is the one to return
if dist > lastDist {
out[keys[currentMatch]] = plantdate.AddDate(0, 0, date-1)
currentMatch += 1
if currentMatch >= len(keys) {
break
}
dist = matches[keys[currentMatch]]
}
accumulatedGdds += observedValues[i]
lastDist = dist
i += 1
date += 1
if i >= len(observedValues) {
i = 0
}
}
return out
}
// dispatches a go routine to handle the comparison values
func comparisonGoRoutine(request models.GddRequest, plantdate time.Time, matches map[string]float64, comparisonMode int, thread *dispatch.Thread[map[string]time.Time]) {
defer func() {
thread.Recover(recover())
}()
if comparisonMode == -1 {
thread.Result(comparisonNormals(request, plantdate, matches))
return
}
thread.Result(map[string]time.Time{})
return
}
func asyncCollectGddsAndCfs(ctx common.DawnCtx, gddReq models.GddRequest) (models.GddResponse, []entities.CfsGdd) {
gddThread := dispatch.New[models.GddResponse]()
go func() {
defer func() {
r := recover()
gddThread.Recover(r)
}()
gdds := GetGddValues(ctx, gddReq)
gddThread.Result(gdds)
}()
cfsThread := dispatch.New[[]entities.CfsGdd]()
go func() {
defer func() { cfsThread.Recover(recover()) }()
cfs := persistence.CfsFindByLocationMultiple(gddReq.BuildLocation(), 4)
cfsThread.Result(cfs)
}()
gdds, err := gddThread.Get()
if err != nil {
panic(err)
}
cfs, err := cfsThread.Get()
if err != nil {
panic(err)
}
return gdds, cfs
}
/* HOW DOES THIS WORK:
1. Collect observed and cfs data
2. Determine matches for corn based on rm/gdds provided
2.b. Dispatch comparison routine
3. Match observed dates to matches for corn with the closest accum gdds
4. Bin cfs projections (since there are a lot of them) into groups. Use same matching algo
5. Join, and merge comparison results
*/
func CalculateStages(ctx common.DawnCtx, request models.StageRequest) map[string]*models.Bins {
gddReq := models.GddRequest{
Year: request.PlantDate.Year(),
Latitude: request.Latitude,
Longitude: request.Longitude,
Accumulate: false,
Product: "CORN",
}
gdds, cfs := asyncCollectGddsAndCfs(ctx, gddReq)
stageMatches := models.BuildStageMatches(request.Mode, request.Value)
comparisonThread := dispatch.New[map[string]time.Time]()
go comparisonGoRoutine(gddReq, request.PlantDate, stageMatches, request.Comparison, comparisonThread)
out := forecast(ctx, gddReq, request.PlantDate, stageMatches, gdds, cfs)
// block, wait for comparison results
compResults, compError := comparisonThread.Get()
if compError != nil {
panic(compError)
}
for k, v := range compResults {
out[k].ComparisonMean = v
}
return out
}
// find the average difference of a sorted array
func AvgDiff(data []int) float64 {
sort.Ints(data)
sum := 0.0
c := 0
for i := 0; i < len(data)-1; i++ {
diff := math.Abs(float64(data[i] - data[i+1]))
sum += diff
c += 1
}
return sum / float64(c)
}
func ForecastFirstLastFreeze(ctx common.DawnCtx, request models.FreezingForecastRequest) models.FreezingForecastResponse {
lastFreezeIdx := 0
firstFreezeIdx := 0
baseData := persistence.CurrentGddFindFirstByYearAndLocation(ctx, models.BuildLocation(request.Latitude, request.Longitude))
cfsData := persistence.CfsFindAllByLocation(models.BuildLocation(request.Latitude, request.Longitude))
normalsData := persistence.NormalsFindFirstByYearAndLocation(models.BuildLocation(request.Latitude, request.Longitude))
cfsData.MinTemps = append(baseData.MinTemps, cfsData.MinTemps...)
if len(cfsData.MinTemps) < len(normalsData.MinTemps) {
smallerNormalRegion := normalsData.MinTemps[len(cfsData.MinTemps):]
cfsData.MinTemps = append(cfsData.MinTemps, smallerNormalRegion...)
}
startDate := time.Date(time.Now().Year(), time.January, 1, 0, 0, 0, 0, time.UTC)
firstHalfFirstDate := time.Date(time.Now().Year(), time.January, 1, 0, 0, 0, 0, time.UTC)
firstHalfLastDate := time.Date(time.Now().Year(), time.July, 31, 0, 0, 0, 0, time.UTC)
lastHalfFirstDate := time.Date(time.Now().Year(), time.August, 1, 0, 0, 0, 0, time.UTC)
lastHalfLastDate := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.UTC)
for i := 0; i < len(cfsData.MinTemps); i++ {
currentDate := startDate.AddDate(0, 0, i)
if cfsData.MinTemps[i] <= request.FreezingTemp && currentDate.After(firstHalfFirstDate) && currentDate.Before(firstHalfLastDate) {
lastFreezeIdx = i
}
if cfsData.MinTemps[i] <= request.FreezingTemp && currentDate.After(lastHalfFirstDate) && currentDate.Before(lastHalfLastDate) && firstFreezeIdx == 0 {
firstFreezeIdx = i
break
}
}
lastFreezeDate := startDate.AddDate(0, 0, lastFreezeIdx)
firstFreezeDate := startDate.AddDate(0, 0, firstFreezeIdx)
return models.FreezingForecastResponse{
LastFreeze: []time.Time{lastFreezeDate},
FirstFreeze: []time.Time{firstFreezeDate},
}
}