diff --git a/go.mod b/go.mod index 6885f78..6d9ab2c 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( require ( github.com/dlclark/regexp2 v1.11.5 // indirect + github.com/edsrzf/mmap-go v1.2.0 // indirect github.com/fatih/color v1.18.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-faster/yaml v0.4.6 // indirect @@ -23,6 +24,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nilsmagnus/grib v1.2.8 // indirect github.com/segmentio/asm v1.2.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index e28db0e..64d966f 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/edsrzf/mmap-go v1.2.0 h1:hXLYlkbaPzt1SaQk+anYwKSRNhufIDCchSPkUD6dD84= +github.com/edsrzf/mmap-go v1.2.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= @@ -32,6 +34,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/nilsmagnus/grib v1.2.8 h1:H7ch/1/agaCqM3MC8hW1Ft+EJ+q2XB757uml/IfPvp4= +github.com/nilsmagnus/grib v1.2.8/go.mod h1:XHm+5zuoOk0NSIWaGmA3JaAxI4i50YvD1L1vz+aqPOQ= github.com/ogen-go/ogen v1.14.0 h1:TU1Nj4z9UBsAfTkf+IhuNNp7igdFQKqkk9+6/y4XuWg= github.com/ogen-go/ogen v1.14.0/go.mod h1:Iw1vkqkx6SU7I9th5ceP+fVPJ6Wge4e3kAVzAxJEpPE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/pkg/errcodes/errcodes.go b/internal/pkg/errcodes/errcodes.go index 245f508..32420c3 100644 --- a/internal/pkg/errcodes/errcodes.go +++ b/internal/pkg/errcodes/errcodes.go @@ -1,6 +1,7 @@ package errcodes import ( + "net/http" "strings" ) @@ -10,8 +11,6 @@ type ErrorCode struct { Details string } -var errorCodeCounter int32 - func New(statusCode int, message string, details ...string) *ErrorCode { return &ErrorCode{ StatusCode: statusCode, @@ -23,3 +22,8 @@ func New(statusCode int, message string, details ...string) *ErrorCode { func (e *ErrorCode) Error() string { return e.Message } + +var ( + ErrNoDataset = New(http.StatusNotFound, "no grib dataset found") + ErrOutOfBounds = New(http.StatusBadRequest, "requested time is out of bounds") +) diff --git a/internal/pkg/grib/cache.go b/internal/pkg/grib/cache.go new file mode 100644 index 0000000..9df0ff2 --- /dev/null +++ b/internal/pkg/grib/cache.go @@ -0,0 +1,40 @@ +package grib + +import ( + "encoding/binary" + "math" + "sync" + "time" +) + +type vec [2]float64 + +type item struct { + v vec + exp time.Time +} + +type memCache struct { + ttl time.Duration + m sync.Map +} + +func (c *memCache) get(k uint64) (vec, bool) { + if v, ok := c.m.Load(k); ok { + it := v.(item) + if time.Now().Before(it.exp) { + return it.v, true + } + c.m.Delete(k) + } + return vec{}, false +} + +func (c *memCache) set(k uint64, v vec) { c.m.Store(k, item{v, time.Now().Add(c.ttl)}) } + +func encodeVec(v vec) []byte { + var b [16]byte + binary.LittleEndian.PutUint64(b[:8], math.Float64bits(v[0])) + binary.LittleEndian.PutUint64(b[8:], math.Float64bits(v[1])) + return b[:] +} diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index c0804bf..065c688 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -1,12 +1,14 @@ -package downloader +package grib import ( "fmt" + "net/url" env "github.com/caarlos0/env/v11" ) type Config struct { + DatasetURL url.URL `env:"DATASET_URL"` } func NewConfig(servicePrefix string) (*Config, error) { diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go new file mode 100644 index 0000000..6e8498f --- /dev/null +++ b/internal/pkg/grib/cube.go @@ -0,0 +1,43 @@ +package grib + +import ( + "encoding/binary" + "math" + "os" + + mmap "github.com/edsrzf/mmap-go" +) + +type cube struct { + mm mmap.MMap // read‑only, U followed by V (float32 LE) + t, p, lat, lon int + bytesPerVar int64 +} + +func openCube(path string) (*cube, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + mm, err := mmap.Map(f, mmap.RDONLY, 0) + if err != nil { + return nil, err + } + + const ( + nT = 17 + nP = 34 + nLat = 361 + nLon = 720 + ) + + return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4)}, nil +} + +func (c *cube) val(varIdx, ti, pi, y, x int) float32 { + idx := (((ti*c.p+pi)*c.lat + y) * c.lon) + x + off := int64(varIdx)*c.bytesPerVar + int64(idx)*4 + bits := binary.LittleEndian.Uint32(c.mm[off : off+4]) + return math.Float32frombits(bits) +} diff --git a/internal/pkg/grib/dataset.go b/internal/pkg/grib/dataset.go new file mode 100644 index 0000000..987da59 --- /dev/null +++ b/internal/pkg/grib/dataset.go @@ -0,0 +1,6 @@ +package grib + +type dataset struct { + cube *cube + runUTC int64 // unix seconds +} diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go new file mode 100644 index 0000000..892d84b --- /dev/null +++ b/internal/pkg/grib/downloader.go @@ -0,0 +1,69 @@ +package grib + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + "golang.org/x/sync/errgroup" +) + +// NOMADS only. +const nomadsRoot = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod" + +type Downloader struct { + Dir string + Parallel int + Client *http.Client +} + +func (d *Downloader) fileURL(run string, hour int, step int) string { + return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", nomadsRoot, run, hour, hour, step) +} + +func (d *Downloader) fetch(ctx context.Context, url, dst string) error { + if _, err := os.Stat(dst); err == nil { + return nil + } + tmp := dst + ".part" + f, err := os.Create(tmp) + if err != nil { + return err + } + defer f.Close() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := d.Client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("bad status %s", resp.Status) + } + if _, err := io.Copy(f, resp.Body); err != nil { + return err + } + return os.Rename(tmp, dst) +} + +func (d *Downloader) Run(ctx context.Context, run time.Time) error { + runStr := run.Format("20060102") + hour := run.Hour() + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + for _, step := range steps { + step := step + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.fileURL(runStr, hour, step) + dst := filepath.Join(d.Dir, fileName(run, step)) + return d.fetch(ctx, url, dst) + }) + } + return g.Wait() +} diff --git a/internal/pkg/grib/extractor.go b/internal/pkg/grib/extractor.go new file mode 100644 index 0000000..e56a52d --- /dev/null +++ b/internal/pkg/grib/extractor.go @@ -0,0 +1,53 @@ +package grib + +import "math" + +func lerp(a, b, t float64) float64 { return a + t*(b-a) } + +// Interpolate 16‑point (time, p, lat, lon) +func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { + if lon < 0 { + lon += 360 + } + iy := (lat + 90) * 2 + y0 := int(math.Floor(iy)) + y1 := y0 + 1 + wy := iy - float64(y0) + ix := lon * 2 + x0 := int(math.Floor(ix)) % d.cube.lon + x1 := (x0 + 1) % d.cube.lon + wx := ix - float64(x0) + it0 := int(math.Floor(tHours / 3.0)) + wt := (tHours - float64(it0*3)) / 3.0 + p := pressureFromAlt(alt) + ip0 := 0 + for ip0+1 < len(pressureLevels) && pressureLevels[ip0+1] > p { + ip0++ + } + ip1 := ip0 + 1 + wp := (pressureLevels[ip0] - p) / (pressureLevels[ip0] - pressureLevels[ip1]) + fetch := func(ti, pi int) (float64, float64) { + u00 := d.cube.val(0, ti, pi, y0, x0) + u10 := d.cube.val(0, ti, pi, y0, x1) + u01 := d.cube.val(0, ti, pi, y1, x0) + u11 := d.cube.val(0, ti, pi, y1, x1) + v00 := d.cube.val(1, ti, pi, y0, x0) + v10 := d.cube.val(1, ti, pi, y0, x1) + v01 := d.cube.val(1, ti, pi, y1, x0) + v11 := d.cube.val(1, ti, pi, y1, x1) + uxy := (1-wy)*((1-wx)*float64(u00)+wx*float64(u10)) + wy*((1-wx)*float64(u01)+wx*float64(u11)) + vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11)) + return uxy, vxy + } + u0p0, v0p0 := fetch(it0, ip0) + u0p1, v0p1 := fetch(it0, ip1) + u1p0, v1p0 := fetch(it0+1, ip0) + u1p1, v1p1 := fetch(it0+1, ip1) + uLow := lerp(u0p0, u0p1, wp) + vLow := lerp(v0p0, v0p1, wp) + uHig := lerp(u1p0, u1p1, wp) + vHig := lerp(v1p0, v1p1, wp) + u := lerp(uLow, uHig, wt) + v := lerp(vLow, vHig, wt) + return u, v +} diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index 320ac2a..ff95e90 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -1,3 +1,154 @@ -package downloader +package grib -// +import ( + "context" + "encoding/binary" + "math" + "net/http" + "os" + "path/filepath" + "sync/atomic" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "github.com/edsrzf/mmap-go" + "github.com/nilsmagnus/grib/griblib" +) + +type RedisIface interface { + Lock(ctx context.Context, key string, ttl time.Duration) (func(context.Context), error) + Set(key string, value []byte, ttl time.Duration) error + Get(key string) ([]byte, error) +} + +type ServiceConfig struct { + Dir string + TTL time.Duration + CacheTTL time.Duration + Redis RedisIface + Parallel int + Client *http.Client +} + +type service struct { + cfg ServiceConfig + cache memCache + data atomic.Pointer[dataset] +} + +func New(cfg ServiceConfig) (*service, error) { + if cfg.TTL == 0 { + cfg.TTL = 24 * time.Hour + } + if err := os.MkdirAll(cfg.Dir, 0o755); err != nil { + return nil, err + } + s := &service{cfg: cfg, cache: memCache{ttl: cfg.CacheTTL}} + return s, nil +} + +// Update() downloads missing GRIBs, assembles cube into a single mmap‑file. +func (s *service) Update(ctx context.Context) error { + unlock, err := s.cfg.Redis.Lock(ctx, "grib-dl", 45*time.Minute) + if err != nil { + return err + } + defer unlock(ctx) + + dl := downloader.Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client} + run := nearestRun(time.Now().UTC().Add(-4 * time.Hour)) + if err := dl.Run(ctx, run); err != nil { + return err + } + + cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" + if _, err := os.Stat(cubePath); err != nil { + if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil { + return err + } + } + c, err := openCube(cubePath) + if err != nil { + return err + } + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) + s.cache = memCache{ttl: s.cfg.CacheTTL} + return nil +} + +func assembleCube(dir string, run time.Time, cubePath string) error { + const sizePerVar = 17 * 34 * 361 * 720 * 4 + total := int64(sizePerVar * 2) + f, err := os.Create(cubePath) + if err != nil { + return err + } + if err := f.Truncate(total); err != nil { + return err + } + mm, err := mmap.MapRegion(f, int(total), mmap.RDWR, 0, 0) + if err != nil { + return err + } + defer mm.Unmap() + defer f.Close() + + pIndex := make(map[int]int) + for i, p := range pressureLevels { + pIndex[int(math.Round(p))] = i + } + + for ti, step := range steps { + fn := filepath.Join(dir, fileName(run, step)) + gf, err := griblib.Read(fn) + if err != nil { + return err + } + for _, m := range gf.Messages { + if m.ParameterShortName != "u" && m.ParameterShortName != "v" { + continue + } + if m.TypeOfFirstFixedSurface != 100 { + continue + } + pIdx, ok := pIndex[int(m.PressureLevel)] + if !ok { + continue + } + varIdx := 0 + if m.ParameterShortName == "v" { + varIdx = 1 + } + vals := m.Values + // GRIB library returns scan north->south, west->east already in row-major order + raw := make([]byte, len(vals)*4) + for i, v := range vals { + binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v))) + } + base := int64(varIdx*sizePerVar + (ti*34+pIdx)*361*720*4) + copy(mm[base:base+int64(len(raw))], raw) + } + } + return mm.Flush() +} + +func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Time) ([2]float64, error) { + var zero [2]float64 + d := s.data.Load() + if d == nil { + return zero, errcodes.ErrNoDataset + } + if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) { + return zero, errcodes.ErrOutOfBounds + } + key := encodeKey(lat, lon, alt, ts) + if v, ok := s.cache.get(key); ok { + return [2]float64(v), nil + } + td := ts.Sub(time.Unix(d.runUTC, 0)).Hours() + u, v := d.uv(lat, lon, alt, td) + out := [2]float64{u, v} + s.cache.set(key, vec(out)) + return out, nil +} diff --git a/internal/pkg/grib/pressure.go b/internal/pkg/grib/pressure.go new file mode 100644 index 0000000..f0eff6d --- /dev/null +++ b/internal/pkg/grib/pressure.go @@ -0,0 +1,9 @@ +package grib + +import "math" + +var pressureLevels = []float64{1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, 750, 725, 700, 650, 600, 550, 500, 450, 400, 350, 300, 250, 200, 150, 100, 70, 50, 30, 20, 10, 7, 5, 3, 2} + +func pressureFromAlt(alt float64) float64 { // ICAO ISA + return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877) +} diff --git a/internal/pkg/grib/util.go b/internal/pkg/grib/util.go new file mode 100644 index 0000000..291c85e --- /dev/null +++ b/internal/pkg/grib/util.go @@ -0,0 +1,26 @@ +package grib + +import ( + "fmt" + "hash/fnv" + "time" +) + +var steps = []int{0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48} + +func nearestRun(t time.Time) time.Time { + h := t.UTC().Hour() - t.UTC().Hour()%6 + return time.Date(t.Year(), t.Month(), t.Day(), h, 0, 0, 0, time.UTC) +} + +func fileName(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2.0p50.f%03d", run.Hour(), step) +} + +func encodeKey(a ...any) uint64 { + h := fnv.New64a() + for _, v := range a { + fmt.Fprint(h, v) + } + return h.Sum64() +}