From c4f355a32ec10444f56d8750164adcb02407ed64 Mon Sep 17 00:00:00 2001 From: "a.antonov" Date: Mon, 20 Oct 2025 19:10:07 +0900 Subject: [PATCH] feat: s3 download --- cmd/api/main.go | 46 +---- go.mod | 18 ++ go.sum | 36 ++++ internal/pkg/grib/config.go | 9 +- internal/pkg/grib/cube.go | 4 +- internal/pkg/grib/downloader.go | 27 ++- internal/pkg/grib/extractor.go | 21 +-- internal/pkg/grib/grib.go | 71 ++++---- internal/pkg/grib/pressure.go | 9 +- internal/pkg/grib/s3_downloader.go | 265 +++++++++++++++++++++++++++++ internal/pkg/grib/util.go | 10 +- internal/service/config.go | 22 --- internal/service/service.go | 4 +- scripts/test_s3_download.go | 89 ++++++++++ scripts/test_s3_simple.go | 68 ++++++++ 15 files changed, 590 insertions(+), 109 deletions(-) create mode 100644 internal/pkg/grib/s3_downloader.go delete mode 100644 internal/service/config.go create mode 100644 scripts/test_s3_download.go create mode 100644 scripts/test_s3_simple.go diff --git a/cmd/api/main.go b/cmd/api/main.go index 5315460..2e9bf6b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,21 +1,18 @@ package main import ( + "context" "os" "os/signal" "syscall" - "context" - "git.intra.yksa.space/gsn/predictor/internal/jobs/grib/updater" - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" "git.intra.yksa.space/gsn/predictor/internal/pkg/log" "git.intra.yksa.space/gsn/predictor/internal/service" "git.intra.yksa.space/gsn/predictor/internal/transport/rest" "git.intra.yksa.space/gsn/predictor/internal/transport/rest/handler" "git.intra.yksa.space/gsn/predictor/pkg/scheduler" - env "github.com/caarlos0/env/v11" "go.uber.org/zap" ) @@ -29,11 +26,6 @@ func main() { defer lg.Sync() ctx := log.ToCtx(context.Background(), lg) - cfg, err := loadConfig() - if err != nil { - log.Ctx(ctx).Fatal("failed to load configuration", zap.Error(err)) - } - schedulerConfig, err := scheduler.NewConfig() if err != nil { log.Ctx(ctx).Fatal("failed to load scheduler configuration", zap.Error(err)) @@ -44,14 +36,12 @@ func main() { log.Ctx(ctx).Fatal("failed to load GRIB updater configuration", zap.Error(err)) } - gribService, err := grib.New(grib.ServiceConfig{ - Dir: cfg.GribDir, - TTL: cfg.GribTTL, - CacheTTL: cfg.GribCacheTTL, - Parallel: cfg.GribParallel, - Client: cfg.CreateHTTPClient(), - DatasetURL: cfg.GribDatasetURL, - }) + gribCfg, err := grib.NewConfig() + if err != nil { + log.Ctx(ctx).Fatal("failed to load GRIB configuration", zap.Error(err)) + } + + gribService, err := grib.New(gribCfg) if err != nil { log.Ctx(ctx).Fatal("failed to initialize GRIB service", zap.Error(err)) } @@ -67,7 +57,7 @@ func main() { } }() - svc, err := service.New(cfg, gribService) + svc, err := service.New(gribService) if err != nil { log.Ctx(ctx).Fatal("failed to initialize service", zap.Error(err)) } @@ -103,13 +93,7 @@ func main() { lg.Info("scheduler started") } - lg.Info("service started successfully", - zap.String("grib_dir", cfg.GribDir), - zap.Duration("grib_ttl", cfg.GribTTL), - zap.Duration("grib_cache_ttl", cfg.GribCacheTTL), - zap.Int("grib_parallel", cfg.GribParallel), - zap.Bool("scheduler_enabled", schedulerConfig.Enabled), - zap.Duration("grib_update_interval", gribUpdaterConfig.Interval)) + lg.Info("service started successfully") sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) @@ -127,15 +111,3 @@ func main() { lg.Info("scheduler stopped") } } - -func loadConfig() (*service.Config, error) { - cfg := &service.Config{} - - if err := env.ParseWithOptions(cfg, env.Options{ - PrefixTagName: servicePrefix + "_", - }); err != nil { - return nil, errcodes.Wrap(err, "failed to parse configuration") - } - - return cfg, nil -} diff --git a/go.mod b/go.mod index 3755831..079e12c 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,24 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.39.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2/config v1.31.13 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.17 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 // indirect + github.com/aws/smithy-go v1.23.1 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/fatih/color v1.18.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect diff --git a/go.sum b/go.sum index 37bf591..eaa1790 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,39 @@ +github.com/aws/aws-sdk-go-v2 v1.39.3 h1:h7xSsanJ4EQJXG5iuW4UqgP7qBopLpj84mpkNx3wPjM= +github.com/aws/aws-sdk-go-v2 v1.39.3/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2/config v1.31.13 h1:wcqQB3B0PgRPUF5ZE/QL1JVOyB0mbPevHFoAMpemR9k= +github.com/aws/aws-sdk-go-v2/config v1.31.13/go.mod h1:ySB5D5ybwqGbT6c3GszZ+u+3KvrlYCUQNo62+hkKOFk= +github.com/aws/aws-sdk-go-v2/credentials v1.18.17 h1:skpEwzN/+H8cdrrtT8y+rvWJGiWWv0DeNAe+4VTf+Vs= +github.com/aws/aws-sdk-go-v2/credentials v1.18.17/go.mod h1:Ed+nXsaYa5uBINovJhcAWkALvXw2ZLk36opcuiSZfJM= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 h1:UuGVOX48oP4vgQ36oiKmW9RuSeT8jlgQgBFQD+HUiHY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10/go.mod h1:vM/Ini41PzvudT4YkQyE/+WiQJiQ6jzeDyU8pQKwCac= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 h1:mj/bdWleWEh81DtpdHKkw41IrS+r3uw1J/VQtbwYYp8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10/go.mod h1:7+oEMxAZWP8gZCyjcm9VicI0M61Sx4DJtcGfKYv2yKQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 h1:wh+/mn57yhUrFtLIxyFPh2RgxgQz/u+Yrf7hiHGHqKY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10/go.mod h1:7zirD+ryp5gitJJ2m1BBux56ai8RIRDykXZrJSp540w= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 h1:FHw90xCTsofzk6vjU808TSuDtDfOOKPNdz5Weyc3tUI= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10/go.mod h1:n8jdIE/8F3UYkg8O4IGkQpn2qUmapg/1K1yl29/uf/c= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 h1:ne+eepnDB2Wh5lHKzELgEncIqeVlQ1rSF9fEa4r5I+A= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1/go.mod h1:u0Jkg0L+dcG1ozUq21uFElmpbmjBnhHR5DELHIme4wg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 h1:DRND0dkCKtJzCj4Xl4OpVbXZgfttY5q712H9Zj7qc/0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10/go.mod h1:tGGNmJKOTernmR2+VJ0fCzQRurcPZj9ut60Zu5Fi6us= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 h1:DA+Hl5adieRyFvE7pCvBWm3VOZTRexGVkXw33SUqNoY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10/go.mod h1:L+A89dH3/gr8L4ecrdzuXUYd1znoko6myzndVGZx/DA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 h1:FlGScxzCGNzT+2AvHT1ZGMvxTwAMa6gsooFb1pO/AiM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5/go.mod h1:N/iojY+8bW3MYol9NUMuKimpSbPEur75cuI1SmtonFM= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 h1:fspVFg6qMx0svs40YgRmE7LZXh9VRZvTT35PfdQR6FM= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.7/go.mod h1:BQTKL3uMECaLaUV3Zc2L4Qybv8C6BIXjuu1dOPyxTQs= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 h1:scVnW+NLXasGOhy7HhkdT9AGb6kjgW7fJ5xYkUaqHs0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2/go.mod h1:FRNCY3zTEWZXBKm2h5UBUPvCVDOecTad9KhynDyGBc0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 h1:VEO5dqFkMsl8QZ2yHsFDJAIZLAkEbaYDB+xdKi0Feic= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.7/go.mod h1:L1xxV3zAdB+qVrVW/pBIrIAnHFWHo6FBbFe4xOGsG/o= +github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= +github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index 35f1e46..009645d 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -11,9 +11,13 @@ type Config struct { Dir string `env:"DIR" envDefault:"/tmp/grib"` TTL time.Duration `env:"TTL" envDefault:"24h"` CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` - Parallel int `env:"PARALLEL" envDefault:"4"` - Timeout time.Duration `env:"TIMEOUT" envDefault:"30s"` + Parallel int `env:"PARALLEL" envDefault:"8"` DatasetURL string `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"` + // S3 configuration + UseS3 bool `env:"USE_S3" envDefault:"true"` + S3Bucket string `env:"S3_BUCKET" envDefault:"noaa-gfs-bdp-pds"` + S3Region string `env:"S3_REGION" envDefault:"us-east-1"` + S3Timeout time.Duration `env:"S3_TIMEOUT" envDefault:"300s"` } func NewConfig() (*Config, error) { @@ -23,5 +27,6 @@ func NewConfig() (*Config, error) { }); err != nil { return nil, errcodes.Wrap(err, "failed to parse GRIB config") } + return cfg, nil } diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go index d8cc5c5..d2015ec 100644 --- a/internal/pkg/grib/cube.go +++ b/internal/pkg/grib/cube.go @@ -28,8 +28,8 @@ func openCube(path string) (*cube, error) { } const ( - nT = 17 - nP = 34 + nT = 97 // 0-96 hours with step 1 hour + nP = 47 // 47 pressure levels matching tawhiri nLat = 361 nLon = 720 ) diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go index a181f43..a93a64c 100644 --- a/internal/pkg/grib/downloader.go +++ b/internal/pkg/grib/downloader.go @@ -24,28 +24,51 @@ func (d *Downloader) fileURL(run string, hour int, step int) string { return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", d.DatasetURL, run, hour, hour, step) } -func (d *Downloader) fetch(ctx context.Context, url, dst string) error { +func (d *Downloader) fetch(ctx context.Context, url, dst string) (err error) { + // Check if final file already exists if _, err := os.Stat(dst); err == nil { return nil } + tmp := dst + ".part" + + // Remove old .part file if it exists (fixes race condition) + os.Remove(tmp) + f, err := os.Create(tmp) if err != nil { return err } - defer f.Close() + + // Cleanup .part file on any error (using named return value) + defer func() { + f.Close() + if err != nil { + os.Remove(tmp) + } + }() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) resp, err := d.Client.Do(req) if err != nil { return err } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) } + if _, err := io.Copy(f, resp.Body); err != nil { return err } + + // Close file before rename + if err := f.Close(); err != nil { + return err + } + + // If rename fails, err will be set and defer will cleanup .part file return os.Rename(tmp, dst) } diff --git a/internal/pkg/grib/extractor.go b/internal/pkg/grib/extractor.go index e56a52d..769b7cd 100644 --- a/internal/pkg/grib/extractor.go +++ b/internal/pkg/grib/extractor.go @@ -17,8 +17,9 @@ func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { x0 := int(math.Floor(ix)) % d.cube.lon x1 := (x0 + 1) % d.cube.lon wx := ix - float64(x0) - it0 := int(math.Floor(tHours / 3.0)) - wt := (tHours - float64(it0*3)) / 3.0 + // For hourly data (step = 1 hour) + it0 := int(math.Floor(tHours)) + wt := tHours - float64(it0) p := pressureFromAlt(alt) ip0 := 0 for ip0+1 < len(pressureLevels) && pressureLevels[ip0+1] > p { @@ -27,14 +28,14 @@ func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { ip1 := ip0 + 1 wp := (pressureLevels[ip0] - p) / (pressureLevels[ip0] - pressureLevels[ip1]) fetch := func(ti, pi int) (float64, float64) { - u00 := d.cube.val(0, ti, pi, y0, x0) - u10 := d.cube.val(0, ti, pi, y0, x1) - u01 := d.cube.val(0, ti, pi, y1, x0) - u11 := d.cube.val(0, ti, pi, y1, x1) - v00 := d.cube.val(1, ti, pi, y0, x0) - v10 := d.cube.val(1, ti, pi, y0, x1) - v01 := d.cube.val(1, ti, pi, y1, x0) - v11 := d.cube.val(1, ti, pi, y1, x1) + u00 := d.cube.val(1, ti, pi, y0, x0) + u10 := d.cube.val(1, ti, pi, y0, x1) + u01 := d.cube.val(1, ti, pi, y1, x0) + u11 := d.cube.val(1, ti, pi, y1, x1) + v00 := d.cube.val(2, ti, pi, y0, x0) + v10 := d.cube.val(2, ti, pi, y0, x1) + v01 := d.cube.val(2, ti, pi, y1, x0) + v11 := d.cube.val(2, ti, pi, y1, x1) uxy := (1-wy)*((1-wx)*float64(u00)+wx*float64(u10)) + wy*((1-wx)*float64(u01)+wx*float64(u11)) vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11)) return uxy, vxy diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index ebdc952..32c4466 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -23,22 +23,13 @@ type Service interface { GetStatus() (ready bool, lastUpdate time.Time, isFresh bool, errMsg string) } -type ServiceConfig struct { - Dir string - TTL time.Duration - CacheTTL time.Duration - Parallel int - Client *http.Client - DatasetURL string -} - type service struct { - cfg ServiceConfig + cfg *Config cache memCache data atomic.Pointer[dataset] } -func New(cfg ServiceConfig) (Service, error) { +func New(cfg *Config) (Service, error) { if cfg.TTL == 0 { cfg.TTL = 24 * time.Hour } @@ -135,8 +126,7 @@ func (s *service) Update(ctx context.Context) error { } } - dl := Downloader{Dir: s.cfg.Dir, Parallel: s.cfg.Parallel, Client: s.cfg.Client, DatasetURL: s.cfg.DatasetURL} - run := nearestRun(time.Now().UTC().Add(-4 * time.Hour)) + run := nearestRun(time.Now().UTC().Add(-24 * time.Hour)) // Check if we already have this run cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" @@ -156,9 +146,26 @@ func (s *service) Update(ctx context.Context) error { } } - // Download new data - if err := dl.Run(ctx, run); err != nil { - return err + // Download new data using S3 or HTTP + var downloadErr error + if s.cfg.UseS3 { + s3dl, err := NewS3Downloader(s.cfg.Dir, s.cfg.Parallel, s.cfg.S3Bucket, s.cfg.S3Region) + if err != nil { + return errcodes.Wrap(err, "failed to create S3 downloader") + } + downloadErr = s3dl.Run(ctx, run) + } else { + dl := Downloader{ + Dir: s.cfg.Dir, + Parallel: s.cfg.Parallel, + Client: http.DefaultClient, + DatasetURL: s.cfg.DatasetURL, + } + downloadErr = dl.Run(ctx, run) + } + + if downloadErr != nil { + return downloadErr } // Assemble cube if it doesn't exist @@ -179,8 +186,8 @@ func (s *service) Update(ctx context.Context) error { } func assembleCube(dir string, run time.Time, cubePath string) error { - const sizePerVar = 17 * 34 * 361 * 720 * 4 - total := int64(sizePerVar * 2) + const sizePerVar = 97 * 47 * 361 * 720 * 4 // 97 time steps (0-96 hours), 47 pressure levels + total := int64(sizePerVar * 3) // 3 variables: gh, u, v f, err := os.Create(cubePath) if err != nil { return err @@ -214,24 +221,30 @@ func assembleCube(dir string, run time.Time, cubePath string) error { } for _, m := range messages { - // Check if this is a wind component (u or v) + // Check if this is a wind component (u or v) or geopotential height // ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind + // ParameterCategory 3 = mass, ParameterNumber 5 = geopotential height if m.Section4.ProductDefinitionTemplateNumber != 0 { continue } product := m.Section4.ProductDefinitionTemplate - if product.ParameterCategory != 2 { - continue - } var varIdx int - switch product.ParameterNumber { - case 2: // u-wind + // Match tawhiri variable order: ['gh', 'u', 'v'] (indices 0, 1, 2) + if product.ParameterCategory == 2 { + switch product.ParameterNumber { + case 2: // u-wind + varIdx = 1 + case 3: // v-wind + varIdx = 2 + default: + continue + } + } else if product.ParameterCategory == 3 && product.ParameterNumber == 5 { + // geopotential height varIdx = 0 - case 3: // v-wind - varIdx = 1 - default: + } else { continue } @@ -253,7 +266,7 @@ func assembleCube(dir string, run time.Time, cubePath string) error { for i, v := range vals { binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v))) } - base := int64(varIdx*sizePerVar + (ti*34+pIdx)*361*720*4) + base := int64(varIdx*sizePerVar + (ti*47+pIdx)*361*720*4) copy(mm[base:base+int64(len(raw))], raw) } } @@ -266,7 +279,7 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti if d == nil { return zero, errcodes.ErrNoDataset } - if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(48*time.Hour)) { + if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(96*time.Hour)) { return zero, errcodes.ErrOutOfBounds } diff --git a/internal/pkg/grib/pressure.go b/internal/pkg/grib/pressure.go index f0eff6d..add6ff0 100644 --- a/internal/pkg/grib/pressure.go +++ b/internal/pkg/grib/pressure.go @@ -2,7 +2,14 @@ package grib import "math" -var pressureLevels = []float64{1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, 750, 725, 700, 650, 600, 550, 500, 450, 400, 350, 300, 250, 200, 150, 100, 70, 50, 30, 20, 10, 7, 5, 3, 2} +// 47 pressure levels matching tawhiri configuration +var pressureLevels = []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, +} func pressureFromAlt(alt float64) float64 { // ICAO ISA return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877) diff --git a/internal/pkg/grib/s3_downloader.go b/internal/pkg/grib/s3_downloader.go new file mode 100644 index 0000000..0fa4c70 --- /dev/null +++ b/internal/pkg/grib/s3_downloader.go @@ -0,0 +1,265 @@ +package grib + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "golang.org/x/sync/errgroup" +) + +// S3Downloader downloads GRIB files from AWS S3 +type S3Downloader struct { + Dir string + Parallel int + Bucket string + Region string + Client *s3.Client +} + +// NewS3Downloader creates a new S3 downloader with anonymous access +func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) { + // Create AWS config with anonymous credentials for public bucket + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region), + config.WithCredentialsProvider(aws.AnonymousCredentials{}), + ) + if err != nil { + return nil, errcodes.Wrap(err, "failed to load AWS config") + } + + client := s3.NewFromConfig(cfg) + + return &S3Downloader{ + Dir: dir, + Parallel: parallel, + Bucket: bucket, + Region: region, + Client: client, + }, nil +} + +// s3Key generates the S3 key for a GRIB file +// Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF +func (d *S3Downloader) s3Key(run string, hour int, step int) string { + return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step) +} + +// CheckFileExists checks if a file exists in S3 using HeadObject +func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) { + input := &s3.HeadObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + } + + result, err := d.Client.HeadObject(ctx, input) + if err != nil { + // Check if error is NotFound + // AWS SDK v2 doesn't export specific error types, check error string + if isNotFoundError(err) { + return false, 0, nil + } + return false, 0, errcodes.Wrap(err, "failed to check file existence") + } + + size := int64(0) + if result.ContentLength != nil { + size = *result.ContentLength + } + + return true, size, nil +} + +// isNotFoundError checks if error is a NotFound error +func isNotFoundError(err error) bool { + if err == nil { + return false + } + // AWS SDK v2 error handling + errStr := err.Error() + return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey") +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) +} + +func findSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// ListAvailableFiles lists all available files for a given run +func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) { + prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour) + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(prefix), + } + + var files []string + paginator := s3.NewListObjectsV2Paginator(d.Client, input) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, errcodes.Wrap(err, "failed to list S3 objects") + } + + for _, obj := range page.Contents { + if obj.Key != nil { + files = append(files, *obj.Key) + } + } + } + + return files, nil +} + +// fetchFromS3 downloads a file from S3 to local disk with retry logic +func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) { + // Check if final file already exists + if _, err := os.Stat(dst); err == nil { + return nil + } + + const maxRetries = 3 + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + // Exponential backoff: 2s, 4s, 8s + waitTime := time.Duration(1< 0 && written != size { + return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size)) + } + + // Close file before rename + if err := f.Close(); err != nil { + return err + } + fileClosed = true + + // If rename fails, err will be set and defer will cleanup .part file + return os.Rename(tmp, dst) +} + +// Run downloads all required GRIB files for a forecast run +func (d *S3Downloader) Run(ctx context.Context, run time.Time) error { + runStr := run.Format("20060102") + hour := run.Hour() + + // First, list available files to verify they exist + availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour) + if err != nil { + return errcodes.Wrap(err, "failed to list available files") + } + + if len(availableFiles) == 0 { + return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour)) + } + + // Build a map of available files for quick lookup + availableMap := make(map[string]bool) + for _, file := range availableFiles { + availableMap[file] = true + } + + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + + for _, step := range steps { + step := step + key := d.s3Key(runStr, hour, step) + + // Check if file is available in S3 + if !availableMap[key] { + // Log warning but don't fail - some forecast hours might not be available yet + continue + } + + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + dst := filepath.Join(d.Dir, fileName(run, step)) + return d.fetchFromS3(ctx, key, dst) + }) + } + + return g.Wait() +} diff --git a/internal/pkg/grib/util.go b/internal/pkg/grib/util.go index 291c85e..8de4af7 100644 --- a/internal/pkg/grib/util.go +++ b/internal/pkg/grib/util.go @@ -6,7 +6,15 @@ import ( "time" ) -var steps = []int{0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48} +// Generate steps from 0 to 96 with step 1 hour (97 steps total) +// GFS provides hourly data for 0-120 hours, we use first 96 hours +var steps = func() []int { + result := make([]int, 0, 97) + for i := 0; i <= 96; i++ { + result = append(result, i) + } + return result +}() func nearestRun(t time.Time) time.Time { h := t.UTC().Hour() - t.UTC().Hour()%6 diff --git a/internal/service/config.go b/internal/service/config.go deleted file mode 100644 index ec04fc0..0000000 --- a/internal/service/config.go +++ /dev/null @@ -1,22 +0,0 @@ -package service - -import ( - "net/http" - "time" -) - -type Config struct { - // --- GRIB Configuration --- - GribDir string `env:"GSN_PREDICTOR_GRIB_DIR" envDefault:"/tmp/grib"` - GribTTL time.Duration `env:"GSN_PREDICTOR_GRIB_TTL" envDefault:"24h"` - GribCacheTTL time.Duration `env:"GSN_PREDICTOR_GRIB_CACHE_TTL" envDefault:"1h"` - GribParallel int `env:"GSN_PREDICTOR_GRIB_PARALLEL" envDefault:"4"` - GribTimeout time.Duration `env:"GSN_PREDICTOR_GRIB_TIMEOUT" envDefault:"30s"` - GribDatasetURL string `env:"GSN_PREDICTOR_GRIB_DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"` -} - -func (c *Config) CreateHTTPClient() *http.Client { - return &http.Client{ - Timeout: c.GribTimeout, - } -} diff --git a/internal/service/service.go b/internal/service/service.go index 5e8ce6d..3496f96 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -8,13 +8,11 @@ import ( ) type Service struct { - cfg *Config grib Grib } -func New(cfg *Config, gribService Grib) (*Service, error) { +func New(gribService Grib) (*Service, error) { svc := &Service{ - cfg: cfg, grib: gribService, } diff --git a/scripts/test_s3_download.go b/scripts/test_s3_download.go new file mode 100644 index 0000000..0391cd6 --- /dev/null +++ b/scripts/test_s3_download.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Create S3 downloader + downloader, err := grib.NewS3Downloader( + "/tmp/grib_test", + 4, // parallel downloads + "noaa-gfs-bdp-pds", + "us-east-1", + ) + if err != nil { + log.Fatalf("Failed to create S3 downloader: %v", err) + } + + // Ensure directory exists + if err := os.MkdirAll("/tmp/grib_test", 0o755); err != nil { + log.Fatalf("Failed to create directory: %v", err) + } + + // Find nearest run (6-hour intervals: 00, 06, 12, 18 UTC) + now := time.Now().UTC() + hour := now.Hour() - (now.Hour() % 6) + // Use data from 6 hours ago to ensure it's available + run := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC).Add(-6 * time.Hour) + + fmt.Printf("Testing S3 download for run: %s\n", run.Format("2006-01-02 15:04 MST")) + + // List available files first + runStr := run.Format("20060102") + fmt.Printf("Listing available files for %s/%02d...\n", runStr, run.Hour()) + files, err := downloader.ListAvailableFiles(ctx, runStr, run.Hour()) + if err != nil { + log.Fatalf("Failed to list files: %v", err) + } + + fmt.Printf("Found %d files in S3:\n", len(files)) + if len(files) > 0 { + // Show first 5 files + for i, file := range files { + if i >= 5 { + fmt.Printf("... and %d more files\n", len(files)-5) + break + } + fmt.Printf(" - %s\n", file) + } + } + + // Try downloading just first 3 forecast hours (f000, f001, f002) + fmt.Println("\nTesting download of first 3 forecast hours...") + testRun := run + + // Create a timeout context for the download + downloadCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + if err := downloader.Run(downloadCtx, testRun); err != nil { + log.Fatalf("Failed to download: %v", err) + } + + fmt.Println("\nDownload completed successfully!") + + // Check downloaded files + entries, err := os.ReadDir("/tmp/grib_test") + if err != nil { + log.Fatalf("Failed to read directory: %v", err) + } + + fmt.Printf("\nDownloaded %d files:\n", len(entries)) + for i, entry := range entries { + if i >= 10 { + fmt.Printf("... and %d more files\n", len(entries)-10) + break + } + info, _ := entry.Info() + fmt.Printf(" - %s (%.2f MB)\n", entry.Name(), float64(info.Size())/1024/1024) + } +} diff --git a/scripts/test_s3_simple.go b/scripts/test_s3_simple.go new file mode 100644 index 0000000..2ae8414 --- /dev/null +++ b/scripts/test_s3_simple.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func main() { + ctx := context.Background() + + // Create AWS config with anonymous credentials + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(aws.AnonymousCredentials{}), + ) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + client := s3.NewFromConfig(cfg) + + // Try to download a single file + bucket := "noaa-gfs-bdp-pds" + key := "gfs.20251020/00/atmos/gfs.t00z.pgrb2.0p50.f000" + + fmt.Printf("Downloading: s3://%s/%s\n", bucket, key) + + input := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + } + + result, err := client.GetObject(ctx, input) + if err != nil { + log.Fatalf("Failed to get object: %v", err) + } + defer result.Body.Close() + + // Create output file + outFile := "/tmp/test_grib.part" + f, err := os.Create(outFile) + if err != nil { + log.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Copy data + written, err := io.Copy(f, result.Body) + if err != nil { + log.Fatalf("Failed to copy data: %v (wrote %d bytes)", err, written) + } + + fmt.Printf("Successfully downloaded %d bytes\n", written) + + // Rename + if err := os.Rename(outFile, "/tmp/test_grib"); err != nil { + log.Fatalf("Failed to rename: %v", err) + } + + fmt.Println("Download complete!") +}