predictor/internal/pkg/grib/grib.go
2026-03-22 16:29:53 +09:00

291 lines
6.4 KiB
Go

package grib
import (
"context"
"encoding/binary"
"math"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
"github.com/edsrzf/mmap-go"
"github.com/nilsmagnus/grib/griblib"
)
type Service interface {
Update(ctx context.Context) error
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
Close() error
GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
}
type service struct {
cfg *Config
cache memCache
data atomic.Pointer[dataset]
}
func New(cfg *Config) (Service, error) {
if cfg.TTL == 0 {
cfg.TTL = 24 * time.Hour
}
if err := os.MkdirAll(cfg.Dir, 0o755); err != nil {
return nil, err
}
s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}}
// Try to load existing dataset on startup
if err := s.loadExistingDataset(); err != nil {
// Log error but don't fail startup - dataset will be loaded on first Update()
}
return s, nil
}
func (s *service) loadExistingDataset() error {
pattern := filepath.Join(s.cfg.Dir, "*.cube")
matches, err := filepath.Glob(pattern)
if err != nil {
return err
}
if len(matches) == 0 {
return errcodes.ErrNoCubeFilesFound
}
var latestFile string
var latestTime time.Time
for _, match := range matches {
info, err := os.Stat(match)
if err != nil {
continue
}
if info.ModTime().After(latestTime) {
latestTime = info.ModTime()
latestFile = match
}
}
if latestFile == "" {
return errcodes.ErrNoValidCubeFilesFound
}
if time.Since(latestTime) > s.cfg.TTL {
return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old")
}
dc := &s.cfg.Dataset
c, err := openCube(latestFile, dc)
if err != nil {
return err
}
base := filepath.Base(latestFile)
runStr := strings.TrimSuffix(base, ".cube")
run, err := time.Parse("20060102_15", runStr)
if err != nil {
c.Close()
return err
}
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
return nil
}
func (s *service) Update(ctx context.Context) error {
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
return nil
}
}
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
return nil
}
}
dc := &s.cfg.Dataset
run := nearestRun(time.Now().UTC().Add(-6 * time.Hour))
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
if _, err := os.Stat(cubePath); err == nil {
info, err := os.Stat(cubePath)
if err == nil && time.Since(info.ModTime()) < s.cfg.TTL {
c, err := openCube(cubePath, dc)
if err != nil {
return err
}
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
}
downloadCtx, cancel := context.WithTimeout(ctx, 60*time.Minute)
defer cancel()
dl := NewPartialDownloader(s.cfg.Dir, s.cfg.Parallel, dc)
if err := dl.Run(downloadCtx, run); err != nil {
return err
}
if _, err := os.Stat(cubePath); err != nil {
if err := assembleCube(s.cfg.Dir, run, cubePath, dc); err != nil {
return err
}
}
c, err := openCube(cubePath, dc)
if err != nil {
return err
}
s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()})
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) error {
sizePerVar := dc.SizePerVar()
total := dc.CubeSize()
gridBytes := int64(dc.GridSize()) * 4
f, err := os.Create(cubePath)
if err != nil {
return err
}
if err := f.Truncate(total); err != nil {
return err
}
mm, err := mmap.MapRegion(f, int(total), mmap.RDWR, 0, 0)
if err != nil {
return err
}
defer mm.Unmap()
defer f.Close()
pIndex := make(map[int]int)
for i, p := range dc.Levels {
pIndex[int(math.Round(p))] = i
}
processFile := func(fn string, ti int) error {
file, err := os.Open(fn)
if err != nil {
return err
}
messages, err := griblib.ReadMessages(file)
file.Close()
if err != nil {
return err
}
for _, m := range messages {
if m.Section4.ProductDefinitionTemplateNumber != 0 {
continue
}
product := m.Section4.ProductDefinitionTemplate
var varIdx int
if product.ParameterCategory == 2 {
switch product.ParameterNumber {
case 2: // u-wind
varIdx = 1
case 3: // v-wind
varIdx = 2
default:
continue
}
} else if product.ParameterCategory == 3 && product.ParameterNumber == 5 {
varIdx = 0 // geopotential height
} else {
continue
}
if product.FirstSurface.Type != 100 {
continue
}
pressure := float64(product.FirstSurface.Value) / 100.0
pIdx, ok := pIndex[int(math.Round(pressure))]
if !ok {
continue
}
vals := m.Data()
raw := make([]byte, len(vals)*4)
for i, v := range vals {
binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v)))
}
base := int64(varIdx)*sizePerVar + (int64(ti)*int64(dc.NP)+int64(pIdx))*gridBytes
copy(mm[base:base+int64(len(raw))], raw)
}
return nil
}
steps := dc.Steps()
for ti, step := range steps {
fn := filepath.Join(dir, dc.FileName(run, step))
if err := processFile(fn, ti); err != nil {
return err
}
fnB := filepath.Join(dir, dc.FileNameB(run, step))
if err := processFile(fnB, ti); err != nil {
return err
}
}
return mm.Flush()
}
func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
var zero [2]float64
d := s.data.Load()
if d == nil {
return zero, errcodes.ErrNoDataset
}
maxDur := time.Duration(s.cfg.Dataset.MaxHour) * time.Hour
if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(maxDur)) {
return zero, errcodes.ErrOutOfBounds
}
key := encodeKey(lat, lon, alt, ts)
if v, ok := s.cache.get(key); ok {
return [2]float64(v), nil
}
td := ts.Sub(time.Unix(d.runUTC, 0)).Hours()
u, v := d.uv(lat, lon, alt, td)
out := [2]float64{u, v}
s.cache.set(key, vec(out))
return out, nil
}
func (s *service) Close() error {
if d := s.data.Load(); d != nil {
return d.Close()
}
return nil
}
func (s *service) GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) {
d := s.data.Load()
if d == nil {
return false, time.Time{}, false, "no dataset loaded"
}
runTime := time.Unix(d.runUTC, 0)
fresh := time.Since(runTime) < s.cfg.TTL
if !fresh {
return false, runTime, false, "dataset is too old"
}
return true, runTime, true, ""
}