predictor/internal/pkg/grib/grib.go
2025-06-22 22:36:10 +03:00

154 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package grib
import (
"context"
"encoding/binary"
"math"
"net/http"
"os"
"path/filepath"
"sync/atomic"
"time"
"git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes"
"github.com/edsrzf/mmap-go"
"github.com/nilsmagnus/grib/griblib"
)
type RedisIface interface {
Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error)
Set(key string, value []byte, ttl time.Duration) error
Get(key string) ([]byte, error)
}
type ServiceConfig struct {
Dir string
TTL time.Duration
CacheTTL time.Duration
Redis RedisIface
Parallel int
Client *http.Client
}
type service struct {
cfg ServiceConfig
cache memCache
data atomic.Pointer[dataset]
}
func New(cfg ServiceConfig) (*service, error) {
if cfg.TTL == 0 {
cfg.TTL = 24 * time.Hour
}
if err := os.MkdirAll(cfg.Dir, 0o755); err != nil {
return nil, err
}
s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}}
return s, nil
}
// Update() downloads missing GRIBs, assembles cube into a single mmapfile.
func (s *service) Update(ctx context.Context) error {
unlock, err := s.cfg.Redis.Lock(ctx, "grib-dl", 45*time.Minute)
if err != nil {
return err
}
defer unlock(ctx)
dl := downloader.Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client}
run := nearestRun(time.Now().UTC().Add(-4 * time.Hour))
if err := dl.Run(ctx, run); err != nil {
return err
}
cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube"
if _, err := os.Stat(cubePath); err != nil {
if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil {
return err
}
}
c, err := openCube(cubePath)
if err != nil {
return err
}
ds := &dataset{cube: c, runUTC: run.Unix()}
s.data.Store(ds)
s.cache = memCache{ttl: s.cfg.CacheTTL}
return nil
}
func assembleCube(dir string, run time.Time, cubePath string) error {
const sizePerVar = 17 * 34 * 361 * 720 * 4
total := int64(sizePerVar * 2)
f, err := os.Create(cubePath)
if err != nil {
return err
}
if err := f.Truncate(total); err != nil {
return err
}
mm, err := mmap.MapRegion(f, int(total), mmap.RDWR, 0, 0)
if err != nil {
return err
}
defer mm.Unmap()
defer f.Close()
pIndex := make(map[int]int)
for i, p := range pressureLevels {
pIndex[int(math.Round(p))] = i
}
for ti, step := range steps {
fn := filepath.Join(dir, fileName(run, step))
gf, err := griblib.Read(fn)
if err != nil {
return err
}
for _, m := range gf.Messages {
if m.ParameterShortName != "u" && m.ParameterShortName != "v" {
continue
}
if m.TypeOfFirstFixedSurface != 100 {
continue
}
pIdx, ok := pIndex[int(m.PressureLevel)]
if !ok {
continue
}
varIdx := 0
if m.ParameterShortName == "v" {
varIdx = 1
}
vals := m.Values
// GRIB library returns scan north->south, west->east already in row-major order
raw := make([]byte, len(vals)*4)
for i, v := range vals {
binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v)))
}
base := int64(varIdx*sizePerVar + (ti*34+pIdx)*361*720*4)
copy(mm[base:base+int64(len(raw))], raw)
}
}
return mm.Flush()
}
func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) {
var zero [2]float64
d := s.data.Load()
if d == nil {
return zero, errcodes.ErrNoDataset
}
if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) {
return zero, errcodes.ErrOutOfBounds
}
key := encodeKey(lat, lon, alt, ts)
if v, ok := s.cache.get(key); ok {
return [2]float64(v), nil
}
td := ts.Sub(time.Unix(d.runUTC, 0)).Hours()
u, v := d.uv(lat, lon, alt, td)
out := [2]float64{u, v}
s.cache.set(key, vec(out))
return out, nil
}