predictor/internal/pkg/grib/grib.go
2025-10-20 16:31:45 +09:00

308 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package grib
import (
"context"
"encoding/binary"
"math"
"net/http"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
"github.com/edsrzf/mmap-go"
"github.com/nilsmagnus/grib/griblib"
)
type Service interface {
Update(ctx context.Context) error
Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error)
Close() error
GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string)
}
type ServiceConfig struct {
Dir string
TTL time.Duration
CacheTTL time.Duration
Parallel int
Client *http.Client
DatasetURL string
}
type service struct {
cfg ServiceConfig
cache memCache
data atomic.Pointer[dataset]
}
func New(cfg ServiceConfig) (Service, error) {
if cfg.TTL == 0 {
cfg.TTL = 24 * time.Hour
}
if err := os.MkdirAll(cfg.Dir, 0o755); err != nil {
return nil, err
}
s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}}
// Try to load existing dataset on startup
if err := s.loadExistingDataset(); err != nil {
// Log error but don't fail startup - dataset will be loaded on first Update()
// This allows the service to start even if no data is available yet
}
return s, nil
}
// loadExistingDataset tries to load the most recent available dataset
func (s *service) loadExistingDataset() error {
// Find the most recent cube file
pattern := filepath.Join(s.cfg.Dir, "*.cube")
matches, err := filepath.Glob(pattern)
if err != nil {
return err
}
if len(matches) == 0 {
return errcodes.ErrNoCubeFilesFound
}
// Sort by modification time (newest first)
var latestFile string
var latestTime time.Time
for _, match := range matches {
info, err := os.Stat(match)
if err != nil {
continue
}
if info.ModTime().After(latestTime) {
latestTime = info.ModTime()
latestFile = match
}
}
if latestFile == "" {
return errcodes.ErrNoValidCubeFilesFound
}
// Check if the file is fresh enough
if time.Since(latestTime) > s.cfg.TTL {
return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old")
}
// Load the dataset
c, err := openCube(latestFile)
if err != nil {
return err
}
// Extract run time from filename
base := filepath.Base(latestFile)
runStr := strings.TrimSuffix(base, ".cube")
run, err := time.Parse("20060102_15", runStr)
if err != nil {
c.Close()
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
return nil
}
// Update() downloads missing GRIBs, assembles cube into a single mmapfile.
func (s *service) Update(ctx context.Context) error {
// Check if we already have fresh data
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
// Data is still fresh, no need to update
return nil
}
}
// Check again after acquiring lock (double-checked locking pattern)
if d := s.data.Load(); d != nil {
runTime := time.Unix(d.runUTC, 0)
if time.Since(runTime) < s.cfg.TTL {
// Another instance already updated the data
return nil
}
}
dl := Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client, DatasetURL: s.cfg.DatasetURL}
run := nearestRun(time.Now().UTC().Add(-4 * time.Hour))
// Check if we already have this run
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
if _, err := os.Stat(cubePath); err == nil {
// File exists, check if it's fresh
info, err := os.Stat(cubePath)
if err == nil && time.Since(info.ModTime()) < s.cfg.TTL {
// File is fresh, just load it
c, err := openCube(cubePath)
if err != nil {
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
}
// Download new data
if err := dl.Run(ctx, run); err != nil {
return err
}
// Assemble cube if it doesn't exist
if _, err := os.Stat(cubePath); err != nil {
if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil {
return err
}
}
c, err := openCube(cubePath)
if err != nil {
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
func assembleCube(dir string, run time.Time, cubePath string) error {
const sizePerVar = 17 * 34 * 361 * 720 * 4
total := int64(sizePerVar * 2)
f, err := os.Create(cubePath)
if err != nil {
return err
}
if err := f.Truncate(total); err != nil {
return err
}
mm, err := mmap.MapRegion(f, int(total), mmap.RDWR, 0, 0)
if err != nil {
return err
}
defer mm.Unmap()
defer f.Close()
pIndex := make(map[int]int)
for i, p := range pressureLevels {
pIndex[int(math.Round(p))] = i
}
for ti, step := range steps {
fn := filepath.Join(dir, fileName(run, step))
file, err := os.Open(fn)
if err != nil {
return err
}
messages, err := griblib.ReadMessages(file)
file.Close() // Close immediately after reading
if err != nil {
return err
}
for _, m := range messages {
// Check if this is a wind component (u or v)
// ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind
if m.Section4.ProductDefinitionTemplateNumber != 0 {
continue
}
product := m.Section4.ProductDefinitionTemplate
if product.ParameterCategory != 2 {
continue
}
var varIdx int
switch product.ParameterNumber {
case 2: // u-wind
varIdx = 0
case 3: // v-wind
varIdx = 1
default:
continue
}
// Check if this is a pressure level (type 100)
if product.FirstSurface.Type != 100 {
continue
}
// Get pressure level in hPa
pressure := float64(product.FirstSurface.Value) / 100.0
pIdx, ok := pIndex[int(math.Round(pressure))]
if !ok {
continue
}
vals := m.Data()
// GRIB library returns scan north->south, west->east already in row-major order
raw := make([]byte, len(vals)*4)
for i, v := range vals {
binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v)))
}
base := int64(varIdx*sizePerVar + (ti*34+pIdx)*361*720*4)
copy(mm[base:base+int64(len(raw))], raw)
}
}
return mm.Flush()
}
func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
var zero [2]float64
d := s.data.Load()
if d == nil {
return zero, errcodes.ErrNoDataset
}
if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) {
return zero, errcodes.ErrOutOfBounds
}
// Try memory cache first
key := encodeKey(lat, lon, alt, ts)
if v, ok := s.cache.get(key); ok {
return [2]float64(v), nil
}
// Calculate result
td := ts.Sub(time.Unix(d.runUTC, 0)).Hours()
u, v := d.uv(lat, lon, alt, td)
out := [2]float64{u, v}
// Cache in memory
s.cache.set(key, vec(out))
return out, nil
}
func (s *service) Close() error {
if d := s.data.Load(); d != nil {
return d.Close()
}
return nil
}
func (s *service) GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) {
d := s.data.Load()
if d == nil {
return false, time.Time{}, false, "no dataset loaded"
}
runTime := time.Unix(d.runUTC, 0)
fresh := time.Since(runTime) < s.cfg.TTL
if !fresh {
return false, runTime, false, "dataset is too old"
}
return true, runTime, true, ""
}