rewrite
This commit is contained in:
parent
c4f355a32e
commit
7a8d5d13fa
72 changed files with 4510 additions and 4104 deletions
|
|
@ -1,12 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Grib interface {
|
||||
Update(ctx context.Context) error
|
||||
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
|
||||
Close() error
|
||||
}
|
||||
|
|
@ -1,516 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/ds"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var ErrInvalidParameters = errcodes.New(400, "missing required prediction parameters")
|
||||
|
||||
// Stage represents a prediction stage (ascent, descent, float)
|
||||
type Stage struct {
|
||||
Name string
|
||||
Results []ds.PredicitonResult
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
}
|
||||
|
||||
// CustomCurve represents a custom ascent/descent curve
|
||||
type CustomCurve struct {
|
||||
Altitude []float64 `json:"altitude"`
|
||||
Time []float64 `json:"time"` // seconds from start
|
||||
}
|
||||
|
||||
func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionParameters) ([]ds.PredicitonResult, error) {
|
||||
// Validate required parameters
|
||||
if params.LaunchLatitude == nil || params.LaunchLongitude == nil || params.LaunchAltitude == nil || params.LaunchDatetime == nil {
|
||||
return nil, ErrInvalidParameters
|
||||
}
|
||||
|
||||
// Get default values
|
||||
profile := "standard_profile"
|
||||
if params.Profile != nil {
|
||||
profile = *params.Profile
|
||||
}
|
||||
|
||||
ascentRate := 5.0
|
||||
if params.AscentRate != nil {
|
||||
ascentRate = *params.AscentRate
|
||||
}
|
||||
|
||||
burstAltitude := 30000.0
|
||||
if params.BurstAltitude != nil {
|
||||
burstAltitude = *params.BurstAltitude
|
||||
}
|
||||
|
||||
descentRate := 5.0
|
||||
if params.DescentRate != nil {
|
||||
descentRate = *params.DescentRate
|
||||
}
|
||||
|
||||
floatAltitude := 0.0
|
||||
if params.FloatAltitude != nil {
|
||||
floatAltitude = *params.FloatAltitude
|
||||
}
|
||||
|
||||
// Parse custom curves if provided
|
||||
var ascentCurve, descentCurve *CustomCurve
|
||||
if params.AscentCurve != nil && *params.AscentCurve != "" {
|
||||
if curve, err := parseCustomCurve(*params.AscentCurve); err == nil {
|
||||
ascentCurve = curve
|
||||
}
|
||||
}
|
||||
if params.DescentCurve != nil && *params.DescentCurve != "" {
|
||||
if curve, err := parseCustomCurve(*params.DescentCurve); err == nil {
|
||||
descentCurve = curve
|
||||
}
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Info("Starting prediction",
|
||||
zap.String("profile", profile),
|
||||
zap.Float64("lat", *params.LaunchLatitude),
|
||||
zap.Float64("lon", *params.LaunchLongitude),
|
||||
zap.Float64("alt", *params.LaunchAltitude),
|
||||
zap.Time("time", *params.LaunchDatetime),
|
||||
)
|
||||
|
||||
var allResults []ds.PredicitonResult
|
||||
|
||||
switch profile {
|
||||
case "standard_profile":
|
||||
allResults = s.standardProfile(ctx, params, ascentRate, burstAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "float_profile":
|
||||
allResults = s.floatProfile(ctx, params, ascentRate, burstAltitude, floatAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "reverse_profile":
|
||||
allResults = s.reverseProfile(ctx, params, ascentRate, burstAltitude, descentRate, ascentCurve, descentCurve)
|
||||
case "custom_profile":
|
||||
allResults = s.customProfile(ctx, params, ascentCurve, descentCurve)
|
||||
default:
|
||||
return nil, errcodes.New(400, "unsupported profile: "+profile)
|
||||
}
|
||||
|
||||
log.Ctx(ctx).Info("Prediction complete", zap.Int("total_steps", len(allResults)))
|
||||
return allResults, nil
|
||||
}
|
||||
|
||||
func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
|
||||
if len(ascentResults) > 0 {
|
||||
// Get final position from ascent
|
||||
lastResult := ascentResults[len(ascentResults)-1]
|
||||
|
||||
// Stage 2: Descent
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) floatProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, floatAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent to float altitude
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
|
||||
if len(ascentResults) > 0 {
|
||||
// Stage 2: Float (simulate for some time)
|
||||
lastResult := ascentResults[len(ascentResults)-1]
|
||||
floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) // Float for 30 minutes
|
||||
results = append(results, floatResults...)
|
||||
|
||||
if len(floatResults) > 0 {
|
||||
// Stage 3: Descent
|
||||
finalFloat := floatResults[len(floatResults)-1]
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: finalFloat.Latitude,
|
||||
LaunchLongitude: finalFloat.Longitude,
|
||||
LaunchAltitude: finalFloat.Altitude,
|
||||
LaunchDatetime: finalFloat.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
|
||||
// Stage 1: Ascent
|
||||
ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
|
||||
if len(ascentResults) > 0 {
|
||||
// Stage 2: Descent to float altitude
|
||||
lastResult := ascentResults[len(ascentResults)-1]
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
// Descent to float altitude (if specified)
|
||||
floatAlt := 0.0
|
||||
if params.FloatAltitude != nil {
|
||||
floatAlt = *params.FloatAltitude
|
||||
}
|
||||
|
||||
descentResults := s.simulateDescent(ctx, descentParams, descentRate, floatAlt, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
|
||||
if floatAlt > 0 && len(descentResults) > 0 {
|
||||
// Stage 3: Float
|
||||
finalDescent := descentResults[len(descentResults)-1]
|
||||
floatResults := s.simulateFloat(ctx, finalDescent, 30*time.Minute)
|
||||
results = append(results, floatResults...)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) customProfile(ctx context.Context, params ds.PredictionParameters, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult {
|
||||
var results []ds.PredicitonResult
|
||||
|
||||
if ascentCurve != nil {
|
||||
ascentResults := s.simulateCustomAscent(ctx, params, ascentCurve)
|
||||
results = append(results, ascentResults...)
|
||||
}
|
||||
|
||||
if descentCurve != nil && len(results) > 0 {
|
||||
lastResult := results[len(results)-1]
|
||||
descentParams := ds.PredictionParameters{
|
||||
LaunchLatitude: lastResult.Latitude,
|
||||
LaunchLongitude: lastResult.Longitude,
|
||||
LaunchAltitude: lastResult.Altitude,
|
||||
LaunchDatetime: lastResult.Timestamp,
|
||||
}
|
||||
|
||||
descentResults := s.simulateCustomDescent(ctx, descentParams, descentCurve)
|
||||
results = append(results, descentResults...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func rk4Step(lat, lon, alt float64, t time.Time, dt float64, windFunc func(lat, lon, alt float64, t time.Time) (float64, float64), altRate float64) (float64, float64, float64) {
|
||||
// Helper for RK4 integration step
|
||||
toRad := math.Pi / 180.0
|
||||
toDeg := 180.0 / math.Pi
|
||||
R := func(alt float64) float64 { return 6371009.0 + alt }
|
||||
|
||||
f := func(lat, lon, alt float64, t time.Time) (float64, float64, float64) {
|
||||
windU, windV := windFunc(lat, lon, alt, t)
|
||||
Rnow := R(alt)
|
||||
dlat := toDeg * windV / Rnow
|
||||
dlon := toDeg * windU / (Rnow * math.Cos(lat*toRad))
|
||||
return dlat, dlon, altRate
|
||||
}
|
||||
|
||||
k1_lat, k1_lon, k1_alt := f(lat, lon, alt, t)
|
||||
k2_lat, k2_lon, k2_alt := f(lat+0.5*k1_lat*dt, lon+0.5*k1_lon*dt, alt+0.5*k1_alt*dt, t.Add(time.Duration(0.5*dt)*time.Second))
|
||||
k3_lat, k3_lon, k3_alt := f(lat+0.5*k2_lat*dt, lon+0.5*k2_lon*dt, alt+0.5*k2_alt*dt, t.Add(time.Duration(0.5*dt)*time.Second))
|
||||
k4_lat, k4_lon, k4_alt := f(lat+k3_lat*dt, lon+k3_lon*dt, alt+k3_alt*dt, t.Add(time.Duration(dt)*time.Second))
|
||||
|
||||
latNew := lat + (dt/6.0)*(k1_lat+2*k2_lat+2*k3_lat+k4_lat)
|
||||
lonNew := lon + (dt/6.0)*(k1_lon+2*k2_lon+2*k3_lon+k4_lon)
|
||||
altNew := alt + (dt/6.0)*(k1_alt+2*k2_alt+2*k3_alt+k4_alt)
|
||||
return latNew, lonNew, altNew
|
||||
}
|
||||
|
||||
func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParameters, ascentRate, targetAltitude float64, customCurve *CustomCurve) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
lat := *params.LaunchLatitude
|
||||
lon := *params.LaunchLongitude
|
||||
alt := *params.LaunchAltitude
|
||||
timeCur := *params.LaunchDatetime
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) {
|
||||
w, err := s.ExtractWind(ctx, lat, lon, alt, t)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("Wind extraction failed during ascent", zap.Error(err))
|
||||
return 0, 0
|
||||
}
|
||||
return w[0], w[1]
|
||||
}
|
||||
|
||||
for alt < targetAltitude {
|
||||
altRate := ascentRate
|
||||
if customCurve != nil {
|
||||
altRate = s.getCustomAltitudeRate(customCurve, alt, ascentRate)
|
||||
}
|
||||
latNew, lonNew, altNew := rk4Step(lat, lon, alt, timeCur, dt, windFunc, altRate)
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
lat = latNew
|
||||
lon = lonNew
|
||||
alt = altNew
|
||||
|
||||
if alt >= targetAltitude {
|
||||
break
|
||||
}
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParameters, descentRate, targetAltitude float64, customCurve *CustomCurve) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
lat := *params.LaunchLatitude
|
||||
lon := *params.LaunchLongitude
|
||||
alt := *params.LaunchAltitude
|
||||
timeCur := *params.LaunchDatetime
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) {
|
||||
w, err := s.ExtractWind(ctx, lat, lon, alt, t)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("Wind extraction failed during descent", zap.Error(err))
|
||||
return 0, 0
|
||||
}
|
||||
return w[0], w[1]
|
||||
}
|
||||
|
||||
for alt > targetAltitude {
|
||||
altRate := -descentRate
|
||||
if customCurve != nil {
|
||||
altRate = -s.getCustomAltitudeRate(customCurve, alt, descentRate)
|
||||
}
|
||||
latNew, lonNew, altNew := rk4Step(lat, lon, alt, timeCur, dt, windFunc, altRate)
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
lat = latNew
|
||||
lon = lonNew
|
||||
alt = altNew
|
||||
|
||||
if alt <= targetAltitude {
|
||||
break
|
||||
}
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
wU, wV := windFunc(lat, lon, alt, timeCur)
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wU
|
||||
windV := wV
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) simulateFloat(ctx context.Context, startResult ds.PredicitonResult, duration time.Duration) []ds.PredicitonResult {
|
||||
const dt = 10.0 // simulation step in seconds
|
||||
const outputInterval = 60.0 // output every 60 seconds
|
||||
|
||||
lat := *startResult.Latitude
|
||||
lon := *startResult.Longitude
|
||||
alt := *startResult.Altitude
|
||||
timeCur := *startResult.Timestamp
|
||||
endTime := timeCur.Add(duration)
|
||||
|
||||
results := make([]ds.PredicitonResult, 0, 1000)
|
||||
|
||||
// Always include the initial float point
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
wind := [2]float64{0, 0}
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
|
||||
var nextOutputTime = timeCur.Add(time.Duration(outputInterval) * time.Second)
|
||||
|
||||
for timeCur.Before(endTime) {
|
||||
wind, err := s.ExtractWind(ctx, lat, lon, alt, timeCur)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("Wind extraction failed during float", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
latDot := (wind[1] / 111320.0)
|
||||
lonDot := (wind[0] / (40075000.0 * math.Cos(lat*math.Pi/180) / 360.0))
|
||||
|
||||
lat += latDot * dt
|
||||
lon += lonDot * dt
|
||||
// alt remains constant during float
|
||||
timeCur = timeCur.Add(time.Duration(dt) * time.Second)
|
||||
|
||||
if !timeCur.Before(nextOutputTime) {
|
||||
latCopy := lat
|
||||
lonCopy := lon
|
||||
altCopy := alt
|
||||
timeCopy := timeCur
|
||||
windU := wind[0]
|
||||
windV := wind[1]
|
||||
results = append(results, ds.PredicitonResult{
|
||||
Latitude: &latCopy,
|
||||
Longitude: &lonCopy,
|
||||
Altitude: &altCopy,
|
||||
Timestamp: &timeCopy,
|
||||
WindU: &windU,
|
||||
WindV: &windV,
|
||||
})
|
||||
nextOutputTime = nextOutputTime.Add(time.Duration(outputInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (s *Service) simulateCustomAscent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult {
|
||||
// Implementation for custom ascent curve
|
||||
// This would interpolate the altitude rate from the custom curve
|
||||
return s.simulateAscent(ctx, params, 5.0, 30000.0, curve)
|
||||
}
|
||||
|
||||
func (s *Service) simulateCustomDescent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult {
|
||||
// Implementation for custom descent curve
|
||||
// This would interpolate the altitude rate from the custom curve
|
||||
return s.simulateDescent(ctx, params, 5.0, 0.0, curve)
|
||||
}
|
||||
|
||||
func (s *Service) getCustomAltitudeRate(curve *CustomCurve, currentAltitude, defaultRate float64) float64 {
|
||||
if curve == nil || len(curve.Altitude) < 2 {
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
// Find the two points in the curve that bracket the current altitude
|
||||
for i := 0; i < len(curve.Altitude)-1; i++ {
|
||||
if curve.Altitude[i] <= currentAltitude && currentAltitude <= curve.Altitude[i+1] {
|
||||
// Linear interpolation
|
||||
alt1, alt2 := curve.Altitude[i], curve.Altitude[i+1]
|
||||
time1, time2 := curve.Time[i], curve.Time[i+1]
|
||||
|
||||
if alt2 == alt1 {
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
// Calculate rate (change in altitude per second)
|
||||
if time2 > time1 {
|
||||
return (alt2 - alt1) / (time2 - time1)
|
||||
}
|
||||
return defaultRate
|
||||
}
|
||||
}
|
||||
|
||||
return defaultRate
|
||||
}
|
||||
|
||||
func parseCustomCurve(base64Data string) (*CustomCurve, error) {
|
||||
data, err := base64.StdEncoding.DecodeString(base64Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var curve CustomCurve
|
||||
if err := json.Unmarshal(data, &curve); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &curve, nil
|
||||
}
|
||||
|
|
@ -2,59 +2,244 @@ package service
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.intra.yksa.space/gsn/predictor/internal/pkg/log"
|
||||
"predictor-refactored/internal/dataset"
|
||||
"predictor-refactored/internal/downloader"
|
||||
"predictor-refactored/internal/elevation"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Service orchestrates the dataset lifecycle and provides prediction capabilities.
|
||||
type Service struct {
|
||||
grib Grib
|
||||
mu sync.RWMutex
|
||||
ds *dataset.File
|
||||
elev *elevation.Dataset
|
||||
cfg *downloader.Config
|
||||
dl *downloader.Downloader
|
||||
log *zap.Logger
|
||||
updating sync.Mutex // prevents concurrent downloads
|
||||
}
|
||||
|
||||
func New(gribService Grib) (*Service, error) {
|
||||
svc := &Service{
|
||||
grib: gribService,
|
||||
// New creates a new Service.
|
||||
func New(cfg *downloader.Config, log *zap.Logger) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
dl: downloader.NewDownloader(cfg, log),
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// LoadElevation loads the ruaumoko-compatible elevation dataset from path.
|
||||
// If the file doesn't exist, elevation termination is disabled (falls back to sea level).
|
||||
func (s *Service) LoadElevation(path string) {
|
||||
ds, err := elevation.Open(path)
|
||||
if err != nil {
|
||||
s.log.Warn("elevation dataset not available, using sea-level termination",
|
||||
zap.String("path", path), zap.Error(err))
|
||||
return
|
||||
}
|
||||
s.elev = ds
|
||||
s.log.Info("elevation dataset loaded", zap.String("path", path))
|
||||
}
|
||||
|
||||
// Elevation returns the elevation dataset (may be nil).
|
||||
func (s *Service) Elevation() *elevation.Dataset {
|
||||
return s.elev
|
||||
}
|
||||
|
||||
// Ready returns true if the service has a loaded dataset.
|
||||
func (s *Service) Ready() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.ds != nil
|
||||
}
|
||||
|
||||
// DatasetTime returns the forecast time of the currently loaded dataset.
|
||||
func (s *Service) DatasetTime() (time.Time, bool) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if s.ds == nil {
|
||||
return time.Time{}, false
|
||||
}
|
||||
return s.ds.DSTime, true
|
||||
}
|
||||
|
||||
// Dataset returns the current dataset for reading.
|
||||
func (s *Service) Dataset() *dataset.File {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.ds
|
||||
}
|
||||
|
||||
// Update checks for and downloads new forecast data if needed.
|
||||
func (s *Service) Update(ctx context.Context) error {
|
||||
if !s.updating.TryLock() {
|
||||
s.log.Info("update already in progress, skipping")
|
||||
return nil
|
||||
}
|
||||
defer s.updating.Unlock()
|
||||
|
||||
// Check if current dataset is still fresh
|
||||
if dsTime, ok := s.DatasetTime(); ok {
|
||||
if time.Since(dsTime) < s.cfg.DatasetTTL {
|
||||
s.log.Info("dataset still fresh, skipping update",
|
||||
zap.Time("dataset_time", dsTime),
|
||||
zap.Duration("age", time.Since(dsTime)))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return svc, nil
|
||||
}
|
||||
// Try loading an existing dataset from disk first
|
||||
if err := s.loadExistingDataset(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateWeatherData updates weather forecast data using the configured grib service
|
||||
func (s *Service) UpdateWeatherData(ctx context.Context) error {
|
||||
return s.grib.Update(ctx)
|
||||
}
|
||||
// Find latest available model run
|
||||
run, err := s.dl.FindLatestRun(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ExtractWind extracts wind data for given coordinates and time
|
||||
func (s *Service) ExtractWind(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
|
||||
return s.grib.Extract(ctx, lat, lon, alt, ts)
|
||||
}
|
||||
// Download and assemble
|
||||
path, err := s.dl.Download(ctx, run)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update updates the GRIB data (implements updater.GribService)
|
||||
func (s *Service) Update(ctx context.Context) error {
|
||||
return s.UpdateWeatherData(ctx)
|
||||
}
|
||||
// Open the new dataset
|
||||
ds, err := dataset.Open(path, run)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start starts the service
|
||||
func (s *Service) Start() {
|
||||
log.Ctx(context.Background()).Info("service started")
|
||||
}
|
||||
// Swap in the new dataset
|
||||
s.setDataset(ds)
|
||||
s.log.Info("dataset loaded", zap.Time("run", run), zap.String("path", path))
|
||||
|
||||
// Stop stops the service
|
||||
func (s *Service) Stop() {
|
||||
log.Ctx(context.Background()).Info("service stopped")
|
||||
}
|
||||
// Clean old datasets
|
||||
s.cleanOldDatasets(path)
|
||||
|
||||
// Close closes the service and releases resources
|
||||
func (s *Service) Close() error {
|
||||
s.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) GetGribStatus(ctx context.Context) (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) {
|
||||
if gribStatus, ok := s.grib.(interface {
|
||||
GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
|
||||
}); ok {
|
||||
return gribStatus.GetStatus()
|
||||
// loadExistingDataset tries to find and load an existing dataset from the data directory.
|
||||
func (s *Service) loadExistingDataset() error {
|
||||
entries, err := os.ReadDir(s.cfg.DataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return false, time.Time{}, false, "grib service does not implement GetStatus"
|
||||
|
||||
// Collect valid dataset files (name is YYYYMMDDHH, no extension, correct size)
|
||||
type candidate struct {
|
||||
name string
|
||||
path string
|
||||
run time.Time
|
||||
}
|
||||
var candidates []candidate
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() || strings.Contains(e.Name(), ".") {
|
||||
continue
|
||||
}
|
||||
if len(e.Name()) != 10 {
|
||||
continue
|
||||
}
|
||||
|
||||
run, err := time.Parse("2006010215", e.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Join(s.cfg.DataDir, e.Name())
|
||||
info, err := os.Stat(path)
|
||||
if err != nil || info.Size() != dataset.DatasetSize {
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(run) > s.cfg.DatasetTTL {
|
||||
continue
|
||||
}
|
||||
|
||||
candidates = append(candidates, candidate{name: e.Name(), path: path, run: run})
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
// Pick the newest
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
return candidates[i].run.After(candidates[j].run)
|
||||
})
|
||||
|
||||
best := candidates[0]
|
||||
ds, err := dataset.Open(best.path, best.run)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.setDataset(ds)
|
||||
s.log.Info("loaded existing dataset",
|
||||
zap.Time("run", best.run),
|
||||
zap.String("path", best.path))
|
||||
return nil
|
||||
}
|
||||
|
||||
// setDataset swaps the current dataset with a new one, closing the old one.
|
||||
func (s *Service) setDataset(ds *dataset.File) {
|
||||
s.mu.Lock()
|
||||
old := s.ds
|
||||
s.ds = ds
|
||||
s.mu.Unlock()
|
||||
|
||||
if old != nil {
|
||||
if err := old.Close(); err != nil {
|
||||
s.log.Error("failed to close old dataset", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanOldDatasets removes dataset files other than the one at keepPath.
|
||||
func (s *Service) cleanOldDatasets(keepPath string) {
|
||||
entries, err := os.ReadDir(s.cfg.DataDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
continue
|
||||
}
|
||||
path := filepath.Join(s.cfg.DataDir, e.Name())
|
||||
if path == keepPath {
|
||||
continue
|
||||
}
|
||||
// Remove old datasets and temp files
|
||||
if len(e.Name()) == 10 || strings.HasSuffix(e.Name(), ".downloading") {
|
||||
if err := os.Remove(path); err != nil {
|
||||
s.log.Warn("failed to remove old file", zap.String("path", path), zap.Error(err))
|
||||
} else {
|
||||
s.log.Info("removed old dataset", zap.String("path", path))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close releases all resources.
|
||||
func (s *Service) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.ds != nil {
|
||||
err := s.ds.Close()
|
||||
s.ds = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue