Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
forecast_service.go 11.43 KiB
package services

import (
	"math"
	"sort"
	"time"

	"github.com/tgs266/dawn-go-common/common"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/models"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/models/enums"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/persistence"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/persistence/entities"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/utils"
	"gitlab.cs.umd.edu/dawn/go-backend/dawn-gdd/utils/dispatch"
)

// this just adjusts cfs by accumulating it with the base value being the provided accumulated value
func calculateAccumulatedCfsBasedOnAccumulatedObserved(product enums.Product, accumulated float64, cfs []entities.CfsGdd) [][]float64 {
	out := [][]float64{}
	for _, c := range cfs {
		tempAccum := accumulated
		temp := []float64{}
		for i := range c.MinTemps {
			tempAccum = tempAccum + utils.CalculateSingleGdd(c.MinTemps[i], c.MaxTemps[i], product)
			temp = append(temp, tempAccum)
		}
		out = append(out, temp)
	}
	return out
}

// Since cfs is multiple predictions of a single time frame, we need to boil down
// the data into an array of dates to then bin later on
func getDatesForCfsMatches(cfs [][]float64, lastDateInt int, currentMatch int, keys []string, matches map[string]float64) map[string][]int {
	out := map[string][]int{}
	for _, c := range cfs {
		lastDist := matches[keys[currentMatch]]
		tempCMatch := currentMatch
		for i, v := range c {
			dist := math.Abs(matches[keys[currentMatch]] - v)
			// check if the last value is closer than the current. if it is, then the last value is the one to return
			if dist > lastDist {
				if v, exists := out[keys[tempCMatch]]; exists {
					out[keys[tempCMatch]] = append(v, lastDateInt+i-1)
				} else {
					out[keys[tempCMatch]] = []int{lastDateInt + i - 1}
				}
				tempCMatch += 1
				if tempCMatch >= len(keys) {
					break
				}
			}
			lastDist = dist
		}
	}
	return out
}

// bin array of dates into a frequency map and return that map and keys
func bin(data []int) (map[int]int, []int) {
	out := map[int]int{}
	keys := []int{}
	for _, d := range data {
		if count, exists := out[d]; exists {
			out[d] = count + 1
		} else {
			out[d] = 1
			keys = append(keys, d)
		}
	}
	return out, keys
}

// get the highest frequency bin
func getLargestBinCount(bins map[int]int) int {
	largest := 0
	largestKey := 0
	for key, count := range bins {
		if count > largest {
			largest = count
			largestKey = key
		}
	}
	return largestKey
}

// get keys of the stage matches in sorted order
func getMatchKeys(matches map[string]float64) []string {
	keys := make([]string, 0, len(matches))

	for key := range matches {
		keys = append(keys, key)
	}

	sort.SliceStable(keys, func(i, j int) bool {
		return matches[keys[i]] < matches[keys[j]]
	})
	return keys
}

/* this does the actual forecasting

hardcode the alpha (for lagrange smoothing) and binCount
*/
func forecast(ctx common.DawnCtx, gddReq models.GddRequest, plantdate time.Time, matches map[string]float64, observed models.GddResponse, cfs []entities.CfsGdd) map[string]*models.Bins {
	alpha := 1.0
	binCount := 5
	product := enums.GetProductFromString(gddReq.Product)
	start := plantdate.YearDay()
	if plantdate.Year()%4 == 0 && plantdate.Year()%100 != 0 || plantdate.Year()%400 == 0 {
		start -= 1
	}

	out := map[string]*models.Bins{}
	// need to get the match order
	keys := getMatchKeys(matches)

	currentMatch := 0 // first match

	// start at plantdate, begin accumulating OBSERVED until we run out
	// since this observed, we should only have 1 bin with a value of 100%
	// at the end, we should have all observed stages along with all accumulated gdds of the planting year
	observedValues := observed.GddValues[start:]
	accumulatedGdds := 0.0
	lastDist := matches[keys[currentMatch]]
	date := 0
	for i := 0; i < len(observedValues); i++ {
		date = i
		dist := math.Abs(matches[keys[currentMatch]] - accumulatedGdds)
		// check if the last value is closer than the current. if it is, then the last value is the one to return
		if dist > lastDist {
			out[keys[currentMatch]] = &models.Bins{
				Bins: []models.Bin{
					{
						Value: 1,
						Date:  plantdate.AddDate(0, 0, i-1),
					},
				},
			}
			currentMatch += 1
			if currentMatch >= len(keys) {
				break
			}
			dist = matches[keys[currentMatch]]
		}
		accumulatedGdds += observedValues[i]
		lastDist = dist
	}

	if currentMatch == len(keys) {
		return out
	}

	// adjust cfs values to start at the accumulated value
	adjustedCfs := calculateAccumulatedCfsBasedOnAccumulatedObserved(product, accumulatedGdds, cfs)
	cfsHist := getDatesForCfsMatches(adjustedCfs, date, currentMatch, keys, matches)

	// this loop will actually build the 5 bins
	for k, v := range cfsHist {

		binnedDates, keys := bin(v)
		stepSize := int(math.Ceil(AvgDiff(keys)) + 1) // add 1 to increase range and for uncertainty
		largestKey := getLargestBinCount(binnedDates)

		sort.Ints(keys)

		min := largestKey - stepSize*2
		temp := map[int]int{}
		// need to rebin now
		sum := 0

		// we force the highest frequency to be the center date, then find dates on each side that fit
		for i := 0; i < binCount; i++ {
			for date, v := range binnedDates {
				if min <= date && date < min+stepSize {
					sum += v
					if count, exists := temp[min]; exists {
						temp[min] = count + v
					} else {
						temp[min] = v
					}
				}
			}
			min += stepSize
		}

		// convert to output model
		bins := []models.Bin{}
		for d, v := range temp {
			bins = append(bins, models.Bin{
				Value: (float64(v) + alpha) / (float64(sum) + float64(binCount)*alpha),
				Date:  plantdate.AddDate(0, 0, d),
			})
		}
		out[k] = &models.Bins{
			Bins: bins,
		}

	}

	return out
}

// will use normals to determine dates for each stage
//
// this function allows normals to "wrap around" to get continuous values
func comparisonNormals(request models.GddRequest, plantdate time.Time, matches map[string]float64) map[string]time.Time {
	normals := GetNormalValues(request)
	observedValues := normals.GddValues
	accumulatedGdds := 0.0
	currentMatch := 0
	keys := getMatchKeys(matches)
	lastDist := matches[keys[currentMatch]]

	start := plantdate.YearDay()
	year := plantdate.Year()
	if year%4 == 0 && year%100 != 0 || year%400 == 0 {
		start -= 1
	}

	i := start
	date := 0
	out := map[string]time.Time{}

	for {
		dist := math.Abs(matches[keys[currentMatch]] - accumulatedGdds)
		// check if the last value is closer than the current. if it is, then the last value is the one to return
		if dist > lastDist {
			out[keys[currentMatch]] = plantdate.AddDate(0, 0, date-1)
			currentMatch += 1
			if currentMatch >= len(keys) {
				break
			}
			dist = matches[keys[currentMatch]]
		}
		accumulatedGdds += observedValues[i]
		lastDist = dist
		i += 1
		date += 1
		if i >= len(observedValues) {
			i = 0
		}
	}
	return out
}

// dispatches a go routine to handle the comparison values
func comparisonGoRoutine(request models.GddRequest, plantdate time.Time, matches map[string]float64, comparisonMode int, thread *dispatch.Thread[map[string]time.Time]) {
	defer func() {
		thread.Recover(recover())
	}()
	if comparisonMode == -1 {
		thread.Result(comparisonNormals(request, plantdate, matches))
		return
	}
	thread.Result(map[string]time.Time{})
	return
}

func asyncCollectGddsAndCfs(ctx common.DawnCtx, gddReq models.GddRequest) (models.GddResponse, []entities.CfsGdd) {
	gddThread := dispatch.New[models.GddResponse]()
	go func() {
		defer func() {
			r := recover()
			gddThread.Recover(r)
		}()
		gdds := GetGddValues(ctx, gddReq)
		gddThread.Result(gdds)
	}()

	cfsThread := dispatch.New[[]entities.CfsGdd]()
	go func() {
		defer func() { cfsThread.Recover(recover()) }()
		cfs := persistence.CfsFindByLocationMultiple(gddReq.BuildLocation(), 4)
		cfsThread.Result(cfs)
	}()

	gdds, err := gddThread.Get()
	if err != nil {
		panic(err)
	}
	cfs, err := cfsThread.Get()
	if err != nil {
		panic(err)
	}

	return gdds, cfs
}

/* HOW DOES THIS WORK:

1. Collect observed and cfs data

2. Determine matches for corn based on rm/gdds provided

2.b. Dispatch comparison routine

3. Match observed dates to matches for corn with the closest accum gdds

4. Bin cfs projections (since there are a lot of them) into groups. Use same matching algo

5. Join, and merge comparison results

*/
func CalculateStages(ctx common.DawnCtx, request models.StageRequest) map[string]*models.Bins {
	gddReq := models.GddRequest{
		Year:       request.PlantDate.Year(),
		Latitude:   request.Latitude,
		Longitude:  request.Longitude,
		Accumulate: false,
		Product:    "CORN",
	}

	gdds, cfs := asyncCollectGddsAndCfs(ctx, gddReq)

	stageMatches := models.BuildStageMatches(request.Mode, request.Value)

	comparisonThread := dispatch.New[map[string]time.Time]()

	go comparisonGoRoutine(gddReq, request.PlantDate, stageMatches, request.Comparison, comparisonThread)
	out := forecast(ctx, gddReq, request.PlantDate, stageMatches, gdds, cfs)

	// block, wait for comparison results
	compResults, compError := comparisonThread.Get()
	if compError != nil {
		panic(compError)
	}
	for k, v := range compResults {
		out[k].ComparisonMean = v
	}
	return out

}

// find the average difference of a sorted array
func AvgDiff(data []int) float64 {
	sort.Ints(data)
	sum := 0.0
	c := 0
	for i := 0; i < len(data)-1; i++ {
		diff := math.Abs(float64(data[i] - data[i+1]))
		sum += diff
		c += 1
	}
	return sum / float64(c)
}

func ForecastFirstLastFreeze(ctx common.DawnCtx, request models.FreezingForecastRequest) models.FreezingForecastResponse {
	lastFreezeIdx := 0
	firstFreezeIdx := 0

	baseData := persistence.CurrentGddFindFirstByYearAndLocation(ctx, models.BuildLocation(request.Latitude, request.Longitude))
	cfsData := persistence.CfsFindAllByLocation(models.BuildLocation(request.Latitude, request.Longitude))
	normalsData := persistence.NormalsFindFirstByYearAndLocation(models.BuildLocation(request.Latitude, request.Longitude))

	cfsData.MinTemps = append(baseData.MinTemps, cfsData.MinTemps...)

	if len(cfsData.MinTemps) < len(normalsData.MinTemps) {
		smallerNormalRegion := normalsData.MinTemps[len(cfsData.MinTemps):]
		cfsData.MinTemps = append(cfsData.MinTemps, smallerNormalRegion...)
	}

	startDate := time.Date(time.Now().Year(), time.January, 1, 0, 0, 0, 0, time.UTC)

	firstHalfFirstDate := time.Date(time.Now().Year(), time.January, 1, 0, 0, 0, 0, time.UTC)
	firstHalfLastDate := time.Date(time.Now().Year(), time.July, 31, 0, 0, 0, 0, time.UTC)

	lastHalfFirstDate := time.Date(time.Now().Year(), time.August, 1, 0, 0, 0, 0, time.UTC)
	lastHalfLastDate := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.UTC)

	for i := 0; i < len(cfsData.MinTemps); i++ {
		currentDate := startDate.AddDate(0, 0, i)
		if cfsData.MinTemps[i] <= request.FreezingTemp && currentDate.After(firstHalfFirstDate) && currentDate.Before(firstHalfLastDate) {
			lastFreezeIdx = i
		}
		if cfsData.MinTemps[i] <= request.FreezingTemp && currentDate.After(lastHalfFirstDate) && currentDate.Before(lastHalfLastDate) && firstFreezeIdx == 0 {
			firstFreezeIdx = i
			break
		}
	}

	lastFreezeDate := startDate.AddDate(0, 0, lastFreezeIdx)
	firstFreezeDate := startDate.AddDate(0, 0, firstFreezeIdx)

	return models.FreezingForecastResponse{
		LastFreeze:  []time.Time{lastFreezeDate},
		FirstFreeze: []time.Time{firstFreezeDate},
	}

}