From 8e9f117799a5fd2ae5028950ab3b66a5a1b3148b Mon Sep 17 00:00:00 2001 From: straitz Date: Sun, 22 Mar 2026 16:29:53 +0900 Subject: [PATCH] updated downloader --- .claude/settings.local.json | 9 +- .dockerignore | 2 +- .gitignore | 8 +- assemble_cube.go | 26 ++ cmd/api/main.go | 1 + go.mod | 18 -- go.sum | 36 --- internal/pkg/grib/assemble_test.go | 25 ++ internal/pkg/grib/config.go | 129 ++++++++- internal/pkg/grib/create_dataset.go | 0 internal/pkg/grib/cube.go | 19 +- internal/pkg/grib/dataset.go | 1 + internal/pkg/grib/downloader.go | 91 ------ internal/pkg/grib/extractor.go | 89 +++++- internal/pkg/grib/grib.go | 114 +++----- internal/pkg/grib/partial_downloader.go | 350 ++++++++++++++++++++++++ internal/pkg/grib/pressure.go | 9 - internal/pkg/grib/s3_downloader.go | 265 ------------------ internal/pkg/grib/util.go | 14 - internal/service/predictor.go | 99 ++++++- scripts/compare.go | 114 ++++++++ scripts/rebuild_cube.go | 44 +++ scripts/test_download.go | 36 +++ scripts/test_gh.go | 60 ++++ scripts/test_grib_read.go | 38 +++ scripts/test_prediction.go | 87 ++++++ scripts/test_s3_download.go | 89 ------ scripts/test_s3_simple.go | 68 ----- scripts/test_wind.go | 55 ++++ start_with_http.sh | 11 + 30 files changed, 1209 insertions(+), 698 deletions(-) create mode 100644 assemble_cube.go create mode 100644 internal/pkg/grib/assemble_test.go create mode 100644 internal/pkg/grib/create_dataset.go delete mode 100644 internal/pkg/grib/downloader.go create mode 100644 internal/pkg/grib/partial_downloader.go delete mode 100644 internal/pkg/grib/s3_downloader.go create mode 100644 scripts/compare.go create mode 100644 scripts/rebuild_cube.go create mode 100644 scripts/test_download.go create mode 100644 scripts/test_gh.go create mode 100644 scripts/test_grib_read.go create mode 100644 scripts/test_prediction.go delete mode 100644 scripts/test_s3_download.go delete mode 100644 scripts/test_s3_simple.go create mode 100644 scripts/test_wind.go create mode 100644 start_with_http.sh diff --git a/.claude/settings.local.json b/.claude/settings.local.json index c58df5e..af05740 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -2,7 +2,14 @@ "permissions": { "allow": [ "Bash(cat:*)", - "Bash(xargs:*)" + "Bash(xargs:*)", + "Bash(ls:*)", + "Bash(done)", + "Bash(curl:*)", + "WebFetch(domain:raw.githubusercontent.com)", + "WebFetch(domain:github.com)", + "Bash(go run:*)", + "Bash(pkill:*)" ], "deny": [], "ask": [] diff --git a/.dockerignore b/.dockerignore index 287fc68..22e12b6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -47,7 +47,7 @@ predictor # Temporary files /tmp/ /temp/ - +*.py # Test coverage *.out diff --git a/.gitignore b/.gitignore index 8519b69..9ffe4dd 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +*.ps1 # Test binary, built with `go test -c` *.test @@ -28,6 +29,9 @@ go.work *.swp *.swo *~ +*.bak +*.py +*.json # OS generated files .DS_Store @@ -59,4 +63,6 @@ Thumbs.db # Tawhiri /tawhiri -/tawhiri/* \ No newline at end of file +/tawhiri/* + +*.md \ No newline at end of file diff --git a/assemble_cube.go b/assemble_cube.go new file mode 100644 index 0000000..c624e92 --- /dev/null +++ b/assemble_cube.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "os" + "time" +) + +// This will be a simple wrapper that calls the internal assembleCube function +// We'll compile it as part of the grib package + +func main() { + dir := "C:/tmp/grib" + run := time.Date(2025, 12, 6, 0, 0, 0, 0, time.UTC) + cubePath := fmt.Sprintf("%s/%s.cube", dir, run.Format("20060102_15")) + + fmt.Printf("Assembling cube from existing GRIB files...\n") + fmt.Printf("Directory: %s\n", dir) + fmt.Printf("Run: %s\n", run.Format("2006-01-02 15:04 MST")) + fmt.Printf("Output: %s\n", cubePath) + fmt.Println() + + // Just print instructions - we'll do it directly + fmt.Println("Run this Go code to assemble:") + fmt.Printf("cd internal/pkg/grib && go test -run TestAssemble\n") +} diff --git a/cmd/api/main.go b/cmd/api/main.go index 2e9bf6b..f250199 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -24,6 +24,7 @@ func main() { panic(err) } defer lg.Sync() + zap.ReplaceGlobals(lg) ctx := log.ToCtx(context.Background(), lg) schedulerConfig, err := scheduler.NewConfig() diff --git a/go.mod b/go.mod index 54fb283..b186e37 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module git.intra.yksa.space/gsn/predictor go 1.24.4 require ( - github.com/aws/aws-sdk-go-v2 v1.39.3 - github.com/aws/aws-sdk-go-v2/config v1.31.13 - github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 github.com/caarlos0/env/v11 v11.3.1 github.com/edsrzf/mmap-go v1.2.0 github.com/go-co-op/gocron v1.37.0 @@ -23,21 +20,6 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.18.17 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 // indirect - github.com/aws/smithy-go v1.23.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/fatih/color v1.18.0 // indirect diff --git a/go.sum b/go.sum index 6957306..7c95f5c 100644 --- a/go.sum +++ b/go.sum @@ -1,39 +1,3 @@ -github.com/aws/aws-sdk-go-v2 v1.39.3 h1:h7xSsanJ4EQJXG5iuW4UqgP7qBopLpj84mpkNx3wPjM= -github.com/aws/aws-sdk-go-v2 v1.39.3/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= -github.com/aws/aws-sdk-go-v2/config v1.31.13 h1:wcqQB3B0PgRPUF5ZE/QL1JVOyB0mbPevHFoAMpemR9k= -github.com/aws/aws-sdk-go-v2/config v1.31.13/go.mod h1:ySB5D5ybwqGbT6c3GszZ+u+3KvrlYCUQNo62+hkKOFk= -github.com/aws/aws-sdk-go-v2/credentials v1.18.17 h1:skpEwzN/+H8cdrrtT8y+rvWJGiWWv0DeNAe+4VTf+Vs= -github.com/aws/aws-sdk-go-v2/credentials v1.18.17/go.mod h1:Ed+nXsaYa5uBINovJhcAWkALvXw2ZLk36opcuiSZfJM= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 h1:UuGVOX48oP4vgQ36oiKmW9RuSeT8jlgQgBFQD+HUiHY= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10/go.mod h1:vM/Ini41PzvudT4YkQyE/+WiQJiQ6jzeDyU8pQKwCac= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 h1:mj/bdWleWEh81DtpdHKkw41IrS+r3uw1J/VQtbwYYp8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10/go.mod h1:7+oEMxAZWP8gZCyjcm9VicI0M61Sx4DJtcGfKYv2yKQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 h1:wh+/mn57yhUrFtLIxyFPh2RgxgQz/u+Yrf7hiHGHqKY= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10/go.mod h1:7zirD+ryp5gitJJ2m1BBux56ai8RIRDykXZrJSp540w= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 h1:FHw90xCTsofzk6vjU808TSuDtDfOOKPNdz5Weyc3tUI= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10/go.mod h1:n8jdIE/8F3UYkg8O4IGkQpn2qUmapg/1K1yl29/uf/c= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 h1:ne+eepnDB2Wh5lHKzELgEncIqeVlQ1rSF9fEa4r5I+A= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1/go.mod h1:u0Jkg0L+dcG1ozUq21uFElmpbmjBnhHR5DELHIme4wg= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 h1:DRND0dkCKtJzCj4Xl4OpVbXZgfttY5q712H9Zj7qc/0= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10/go.mod h1:tGGNmJKOTernmR2+VJ0fCzQRurcPZj9ut60Zu5Fi6us= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 h1:DA+Hl5adieRyFvE7pCvBWm3VOZTRexGVkXw33SUqNoY= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10/go.mod h1:L+A89dH3/gr8L4ecrdzuXUYd1znoko6myzndVGZx/DA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 h1:FlGScxzCGNzT+2AvHT1ZGMvxTwAMa6gsooFb1pO/AiM= -github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5/go.mod h1:N/iojY+8bW3MYol9NUMuKimpSbPEur75cuI1SmtonFM= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 h1:fspVFg6qMx0svs40YgRmE7LZXh9VRZvTT35PfdQR6FM= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.7/go.mod h1:BQTKL3uMECaLaUV3Zc2L4Qybv8C6BIXjuu1dOPyxTQs= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 h1:scVnW+NLXasGOhy7HhkdT9AGb6kjgW7fJ5xYkUaqHs0= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2/go.mod h1:FRNCY3zTEWZXBKm2h5UBUPvCVDOecTad9KhynDyGBc0= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 h1:VEO5dqFkMsl8QZ2yHsFDJAIZLAkEbaYDB+xdKi0Feic= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.7/go.mod h1:L1xxV3zAdB+qVrVW/pBIrIAnHFWHo6FBbFe4xOGsG/o= -github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= -github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/internal/pkg/grib/assemble_test.go b/internal/pkg/grib/assemble_test.go new file mode 100644 index 0000000..8d9d7ce --- /dev/null +++ b/internal/pkg/grib/assemble_test.go @@ -0,0 +1,25 @@ +package grib + +import ( + "testing" + "time" +) + +func TestAssembleCubeFromExisting(t *testing.T) { + dir := "C:/tmp/grib" + run := time.Date(2026, 1, 16, 6, 0, 0, 0, time.UTC) + cubePath := dir + "/" + run.Format("20060102_15") + ".cube" + + t.Logf("Assembling cube from existing GRIB files...") + t.Logf("Directory: %s", dir) + t.Logf("Run: %s", run.Format("2006-01-02 15:04 MST")) + t.Logf("Output: %s", cubePath) + + err := assembleCube(dir, run, cubePath) + if err != nil { + t.Fatalf("Failed to assemble cube: %v", err) + } + + t.Logf("✓ Cube assembled successfully!") + t.Logf("Cube file: %s", cubePath) +} diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index 0132a92..c80462e 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -1,23 +1,130 @@ package grib import ( + "fmt" "time" "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" env "github.com/caarlos0/env/v11" ) +// DatasetConfig описывает параметры GFS-датасета: сетку, временные шаги, +// уровни давления и URL для загрузки. +type DatasetConfig struct { + // Сетка + Resolution float64 // шаг сетки в градусах (0.25 или 0.5) + NLat int // точек по широте (721 для 0.25°, 361 для 0.5°) + NLon int // точек по долготе (1440 для 0.25°, 720 для 0.5°) + + // Время + NT int // кол-во временных шагов (97 для 0–96 ч с шагом 1) + MaxHour int // последний час прогноза (96) + TimeStep int // интервал между шагами, часы (1 или 3) + + // Вертикаль + NP int // кол-во уровней давления + Levels []float64 // уровни давления в гПа, по убыванию (1000 … 1) + + // Переменные в кубе (порядок важен: индексы 0, 1, 2, …) + NVar int // кол-во переменных + Variables []string // GRIB-имена для фильтрации idx (HGT, UGRD, VGRD) + + // URL загрузки (fmt-шаблоны: date, hour, hour, step) + URLMask string // основной pgrb2 + URLMaskB string // дополнительный pgrb2b + + // Имена файлов + FileSuffix string // токен разрешения в именах файлов ("0p25", "0p50") +} + +// SizePerVar возвращает размер одной переменной в кубе, байт. +func (dc *DatasetConfig) SizePerVar() int64 { + return int64(dc.NT) * int64(dc.NP) * int64(dc.NLat) * int64(dc.NLon) * 4 +} + +// CubeSize возвращает полный размер куба, байт. +func (dc *DatasetConfig) CubeSize() int64 { + return dc.SizePerVar() * int64(dc.NVar) +} + +// GridSize возвращает NLat * NLon. +func (dc *DatasetConfig) GridSize() int { + return dc.NLat * dc.NLon +} + +// InvResolution возвращает 1/Resolution — множитель для перевода координат в индексы. +func (dc *DatasetConfig) InvResolution() float64 { + return 1.0 / dc.Resolution +} + +// Steps возвращает список часов прогноза [0, TimeStep, 2*TimeStep, …, MaxHour]. +func (dc *DatasetConfig) Steps() []int { + out := make([]int, 0, dc.NT) + for h := 0; h <= dc.MaxHour; h += dc.TimeStep { + out = append(out, h) + } + return out +} + +// FileName возвращает имя основного GRIB-файла (pgrb2). +func (dc *DatasetConfig) FileName(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2.%s.f%03d", run.Hour(), dc.FileSuffix, step) +} + +// FileNameB возвращает имя вторичного GRIB-файла (pgrb2b). +func (dc *DatasetConfig) FileNameB(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2b.%s.f%03d", run.Hour(), dc.FileSuffix, step) +} + +// GribURL возвращает URL основного GRIB-файла. +func (dc *DatasetConfig) GribURL(run time.Time, step int) string { + return fmt.Sprintf(dc.URLMask, run.Format("20060102"), run.Hour(), run.Hour(), step) +} + +// GribURLB возвращает URL вторичного GRIB-файла. +func (dc *DatasetConfig) GribURLB(run time.Time, step int) string { + return fmt.Sprintf(dc.URLMaskB, run.Format("20060102"), run.Hour(), run.Hour(), step) +} + +// DefaultDatasetConfig возвращает конфиг GFS 0.25° / 1 час / 47 уровней. +func DefaultDatasetConfig() DatasetConfig { + return DatasetConfig{ + Resolution: 0.25, + NLat: 721, + NLon: 1440, + + NT: 97, + MaxHour: 96, + TimeStep: 1, + + NP: 47, + Levels: []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, + }, + + NVar: 3, + Variables: []string{"HGT", "UGRD", "VGRD"}, + + URLMask: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p25.f%03d", + URLMaskB: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2b.0p25.f%03d", + + FileSuffix: "0p25", + } +} + +// --------------------------------------------------------------------------- + type Config struct { - Dir string `env:"DIR" envDefault:"C:/tmp/grib"` - TTL time.Duration `env:"TTL" envDefault:"48h"` - CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` - Parallel int `env:"PARALLEL" envDefault:"8"` - DatasetURL string `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"` - // S3 configuration - UseS3 bool `env:"USE_S3" envDefault:"true"` - S3Bucket string `env:"S3_BUCKET" envDefault:"noaa-gfs-bdp-pds"` - S3Region string `env:"S3_REGION" envDefault:"us-east-1"` - S3Timeout time.Duration `env:"S3_TIMEOUT" envDefault:"300s"` + Dir string `env:"DIR" envDefault:"C:/tmp/grib"` + TTL time.Duration `env:"TTL" envDefault:"48h"` + CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` + Parallel int `env:"PARALLEL" envDefault:"8"` + + Dataset DatasetConfig } func NewConfig() (*Config, error) { @@ -27,6 +134,6 @@ func NewConfig() (*Config, error) { }); err != nil { return nil, errcodes.Wrap(err, "failed to parse GRIB config") } - + cfg.Dataset = DefaultDatasetConfig() return cfg, nil } diff --git a/internal/pkg/grib/create_dataset.go b/internal/pkg/grib/create_dataset.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go index 64818dd..dfc3606 100644 --- a/internal/pkg/grib/cube.go +++ b/internal/pkg/grib/cube.go @@ -15,7 +15,7 @@ type cube struct { file *os.File } -func openCube(path string) (*cube, error) { +func openCube(path string, dc *DatasetConfig) (*cube, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -27,14 +27,15 @@ func openCube(path string) (*cube, error) { return nil, err } - const ( - nT = 33 // 0-96 hours with step 3 hours (33 time steps) - nP = 47 // 47 pressure levels matching tawhiri - nLat = 361 - nLon = 720 - ) - - return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4), file: f}, nil + return &cube{ + mm: mm, + t: dc.NT, + p: dc.NP, + lat: dc.NLat, + lon: dc.NLon, + bytesPerVar: dc.SizePerVar(), + file: f, + }, nil } func (c *cube) val(varIdx, ti, pi, y, x int) float32 { diff --git a/internal/pkg/grib/dataset.go b/internal/pkg/grib/dataset.go index e539f65..f994dc9 100644 --- a/internal/pkg/grib/dataset.go +++ b/internal/pkg/grib/dataset.go @@ -2,6 +2,7 @@ package grib type dataset struct { cube *cube + ds *DatasetConfig runUTC int64 // unix seconds } diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go deleted file mode 100644 index a93a64c..0000000 --- a/internal/pkg/grib/downloader.go +++ /dev/null @@ -1,91 +0,0 @@ -package grib - -import ( - "context" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" - "golang.org/x/sync/errgroup" -) - -type Downloader struct { - Dir string - Parallel int - Client *http.Client - DatasetURL string -} - -func (d *Downloader) fileURL(run string, hour int, step int) string { - return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", d.DatasetURL, run, hour, hour, step) -} - -func (d *Downloader) fetch(ctx context.Context, url, dst string) (err error) { - // Check if final file already exists - if _, err := os.Stat(dst); err == nil { - return nil - } - - tmp := dst + ".part" - - // Remove old .part file if it exists (fixes race condition) - os.Remove(tmp) - - f, err := os.Create(tmp) - if err != nil { - return err - } - - // Cleanup .part file on any error (using named return value) - defer func() { - f.Close() - if err != nil { - os.Remove(tmp) - } - }() - - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - resp, err := d.Client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) - } - - if _, err := io.Copy(f, resp.Body); err != nil { - return err - } - - // Close file before rename - if err := f.Close(); err != nil { - return err - } - - // If rename fails, err will be set and defer will cleanup .part file - return os.Rename(tmp, dst) -} - -func (d *Downloader) Run(ctx context.Context, run time.Time) error { - runStr := run.Format("20060102") - hour := run.Hour() - g, ctx := errgroup.WithContext(ctx) - sem := make(chan struct{}, d.Parallel) - for _, step := range steps { - step := step - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - url := d.fileURL(runStr, hour, step) - dst := filepath.Join(d.Dir, fileName(run, step)) - return d.fetch(ctx, url, dst) - }) - } - return g.Wait() -} diff --git a/internal/pkg/grib/extractor.go b/internal/pkg/grib/extractor.go index 850a0e3..d27f3a4 100644 --- a/internal/pkg/grib/extractor.go +++ b/internal/pkg/grib/extractor.go @@ -4,30 +4,100 @@ import "math" func lerp(a, b, t float64) float64 { return a + t*(b-a) } -// Interpolate 16‑point (time, p, lat, lon) +// ghInterp returns interpolated geopotential height at given time/pressure/lat/lon +func (d *dataset) ghInterp(ti, pi int, y0, y1, x0, x1 int, wy, wx float64) float64 { + g00 := d.cube.val(0, ti, pi, y0, x0) + g10 := d.cube.val(0, ti, pi, y0, x1) + g01 := d.cube.val(0, ti, pi, y1, x0) + g11 := d.cube.val(0, ti, pi, y1, x1) + return (1-wy)*((1-wx)*float64(g00)+wx*float64(g10)) + wy*((1-wx)*float64(g01)+wx*float64(g11)) +} + +// searchAltLevel uses geopotential height to find pressure level bracket for target altitude. +func (d *dataset) searchAltLevel(alt float64, ti, y0, y1, x0, x1 int, wy, wx float64) (int, float64) { + levels := d.ds.Levels + nLevels := len(levels) + + lo, hi := 0, nLevels-1 + for lo < hi-1 { + mid := (lo + hi) / 2 + ghMid := d.ghInterp(ti, mid, y0, y1, x0, x1, wy, wx) + if ghMid < alt { + lo = mid + } else { + hi = mid + } + } + + ghLo := d.ghInterp(ti, lo, y0, y1, x0, x1, wy, wx) + ghHi := d.ghInterp(ti, hi, y0, y1, x0, x1, wy, wx) + + wp := 0.0 + if ghHi != ghLo { + wp = (alt - ghLo) / (ghHi - ghLo) + } + if wp < 0 { + wp = 0 + } + if wp > 1 { + wp = 1 + } + + return lo, wp +} + +// uv выполняет интерполяцию ветра по 4 измерениям (time, pressure, lat, lon). func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { if lon < 0 { lon += 360 } - iy := (lat + 90) * 2 + + inv := d.ds.InvResolution() + + // GRIB scan north→south: index 0 = 90°N + iy := (90 - lat) * inv y0 := int(math.Floor(iy)) + if y0 < 0 { + y0 = 0 + } + if y0 >= d.cube.lat-1 { + y0 = d.cube.lat - 2 + } y1 := y0 + 1 wy := iy - float64(y0) - ix := lon * 2 + + ix := lon * inv x0 := int(math.Floor(ix)) % d.cube.lon x1 := (x0 + 1) % d.cube.lon wx := ix - float64(x0) - // For 3-hourly data (step = 3 hours) - // Convert tHours to 3-hour index (e.g., 1.5 hours -> index 0.5, interpolate between 0 and 1) - it0 := int(math.Floor(tHours / 3.0)) - wt := (tHours - float64(it0*3)) / 3.0 // Interpolation weight within 3-hour window + + // Время: tHours делим на шаг, чтобы получить индекс в кубе + tIdx := tHours / float64(d.ds.TimeStep) + it0 := int(math.Floor(tIdx)) + if it0 < 0 { + it0 = 0 + } + if it0 >= d.cube.t-1 { + it0 = d.cube.t - 2 + } + wt := tIdx - float64(it0) + + // ISA: высота → давление → индекс уровня + levels := d.ds.Levels p := pressureFromAlt(alt) ip0 := 0 - for ip0+1 < len(pressureLevels) && pressureLevels[ip0+1] > p { + for ip0+1 < len(levels) && levels[ip0+1] > p { ip0++ } ip1 := ip0 + 1 - wp := (pressureLevels[ip0] - p) / (pressureLevels[ip0] - pressureLevels[ip1]) + if ip1 >= len(levels) { + ip1 = len(levels) - 1 + } + wp := 0.0 + if levels[ip0] != levels[ip1] { + wp = (levels[ip0] - p) / (levels[ip0] - levels[ip1]) + } + fetch := func(ti, pi int) (float64, float64) { u00 := d.cube.val(1, ti, pi, y0, x0) u10 := d.cube.val(1, ti, pi, y0, x1) @@ -41,6 +111,7 @@ func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11)) return uxy, vxy } + u0p0, v0p0 := fetch(it0, ip0) u0p1, v0p1 := fetch(it0, ip1) u1p0, v1p0 := fetch(it0+1, ip0) diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index 637a9aa..7bf6bfc 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "math" - "net/http" "os" "path/filepath" "strings" @@ -41,15 +40,12 @@ func New(cfg *Config) (Service, error) { // Try to load existing dataset on startup if err := s.loadExistingDataset(); err != nil { // Log error but don't fail startup - dataset will be loaded on first Update() - // This allows the service to start even if no data is available yet } return s, nil } -// loadExistingDataset tries to load the most recent available dataset func (s *service) loadExistingDataset() error { - // Find the most recent cube file pattern := filepath.Join(s.cfg.Dir, "*.cube") matches, err := filepath.Glob(pattern) if err != nil { @@ -60,7 +56,6 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoCubeFilesFound } - // Sort by modification time (newest first) var latestFile string var latestTime time.Time @@ -69,7 +64,6 @@ func (s *service) loadExistingDataset() error { if err != nil { continue } - if info.ModTime().After(latestTime) { latestTime = info.ModTime() latestFile = match @@ -80,18 +74,16 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoValidCubeFilesFound } - // Check if the file is fresh enough if time.Since(latestTime) > s.cfg.TTL { return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old") } - // Load the dataset - c, err := openCube(latestFile) + dc := &s.cfg.Dataset + c, err := openCube(latestFile, dc) if err != nil { return err } - // Extract run time from filename base := filepath.Base(latestFile) runStr := strings.TrimSuffix(base, ".cube") run, err := time.Parse("20060102_15", runStr) @@ -100,94 +92,70 @@ func (s *service) loadExistingDataset() error { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) - + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) return nil } -// Update() downloads missing GRIBs, assembles cube into a single mmap‑file. func (s *service) Update(ctx context.Context) error { - // Check if we already have fresh data if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { - // Data is still fresh, no need to update return nil } } - // Check again after acquiring lock (double-checked locking pattern) if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { - // Another instance already updated the data return nil } } - run := nearestRun(time.Now().UTC().Add(-24 * time.Hour)) + dc := &s.cfg.Dataset + run := nearestRun(time.Now().UTC().Add(-6 * time.Hour)) - // Check if we already have this run cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" if _, err := os.Stat(cubePath); err == nil { - // File exists, check if it's fresh info, err := os.Stat(cubePath) if err == nil && time.Since(info.ModTime()) < s.cfg.TTL { - // File is fresh, just load it - c, err := openCube(cubePath) + c, err := openCube(cubePath, dc) if err != nil { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } } - // Download new data using S3 or HTTP - var downloadErr error - if s.cfg.UseS3 { - s3dl, err := NewS3Downloader(s.cfg.Dir, s.cfg.Parallel, s.cfg.S3Bucket, s.cfg.S3Region) - if err != nil { - return errcodes.Wrap(err, "failed to create S3 downloader") - } - downloadErr = s3dl.Run(ctx, run) - } else { - dl := Downloader{ - Dir: s.cfg.Dir, - Parallel: s.cfg.Parallel, - Client: http.DefaultClient, - DatasetURL: s.cfg.DatasetURL, - } - downloadErr = dl.Run(ctx, run) + downloadCtx, cancel := context.WithTimeout(ctx, 60*time.Minute) + defer cancel() + + dl := NewPartialDownloader(s.cfg.Dir, s.cfg.Parallel, dc) + if err := dl.Run(downloadCtx, run); err != nil { + return err } - if downloadErr != nil { - return downloadErr - } - - // Assemble cube if it doesn't exist if _, err := os.Stat(cubePath); err != nil { - if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil { + if err := assembleCube(s.cfg.Dir, run, cubePath, dc); err != nil { return err } } - c, err := openCube(cubePath) + c, err := openCube(cubePath, dc) if err != nil { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } -func assembleCube(dir string, run time.Time, cubePath string) error { - const sizePerVar = 33 * 47 * 361 * 720 * 4 // 33 time steps (0-96 hours, 3-hour intervals), 47 pressure levels - total := int64(sizePerVar * 3) // 3 variables: gh, u, v +func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) error { + sizePerVar := dc.SizePerVar() + total := dc.CubeSize() + gridBytes := int64(dc.GridSize()) * 4 + f, err := os.Create(cubePath) if err != nil { return err @@ -203,27 +171,23 @@ func assembleCube(dir string, run time.Time, cubePath string) error { defer f.Close() pIndex := make(map[int]int) - for i, p := range pressureLevels { + for i, p := range dc.Levels { pIndex[int(math.Round(p))] = i } - for ti, step := range steps { - fn := filepath.Join(dir, fileName(run, step)) + processFile := func(fn string, ti int) error { file, err := os.Open(fn) if err != nil { return err } messages, err := griblib.ReadMessages(file) - file.Close() // Close immediately after reading + file.Close() if err != nil { return err } for _, m := range messages { - // Check if this is a wind component (u or v) or geopotential height - // ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind - // ParameterCategory 3 = mass, ParameterNumber 5 = geopotential height if m.Section4.ProductDefinitionTemplateNumber != 0 { continue } @@ -231,7 +195,6 @@ func assembleCube(dir string, run time.Time, cubePath string) error { product := m.Section4.ProductDefinitionTemplate var varIdx int - // Match tawhiri variable order: ['gh', 'u', 'v'] (indices 0, 1, 2) if product.ParameterCategory == 2 { switch product.ParameterNumber { case 2: // u-wind @@ -242,18 +205,15 @@ func assembleCube(dir string, run time.Time, cubePath string) error { continue } } else if product.ParameterCategory == 3 && product.ParameterNumber == 5 { - // geopotential height - varIdx = 0 + varIdx = 0 // geopotential height } else { continue } - // Check if this is a pressure level (type 100) if product.FirstSurface.Type != 100 { continue } - // Get pressure level in hPa pressure := float64(product.FirstSurface.Value) / 100.0 pIdx, ok := pIndex[int(math.Round(pressure))] if !ok { @@ -261,14 +221,27 @@ func assembleCube(dir string, run time.Time, cubePath string) error { } vals := m.Data() - // GRIB library returns scan north->south, west->east already in row-major order raw := make([]byte, len(vals)*4) for i, v := range vals { binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v))) } - base := int64(varIdx*sizePerVar + (ti*47+pIdx)*361*720*4) + base := int64(varIdx)*sizePerVar + (int64(ti)*int64(dc.NP)+int64(pIdx))*gridBytes copy(mm[base:base+int64(len(raw))], raw) } + return nil + } + + steps := dc.Steps() + for ti, step := range steps { + fn := filepath.Join(dir, dc.FileName(run, step)) + if err := processFile(fn, ti); err != nil { + return err + } + + fnB := filepath.Join(dir, dc.FileNameB(run, step)) + if err := processFile(fnB, ti); err != nil { + return err + } } return mm.Flush() } @@ -279,24 +252,21 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti if d == nil { return zero, errcodes.ErrNoDataset } - if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(96*time.Hour)) { + maxDur := time.Duration(s.cfg.Dataset.MaxHour) * time.Hour + if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(maxDur)) { return zero, errcodes.ErrOutOfBounds } - // Try memory cache first key := encodeKey(lat, lon, alt, ts) if v, ok := s.cache.get(key); ok { return [2]float64(v), nil } - // Calculate result td := ts.Sub(time.Unix(d.runUTC, 0)).Hours() u, v := d.uv(lat, lon, alt, td) out := [2]float64{u, v} - // Cache in memory s.cache.set(key, vec(out)) - return out, nil } diff --git a/internal/pkg/grib/partial_downloader.go b/internal/pkg/grib/partial_downloader.go new file mode 100644 index 0000000..3752504 --- /dev/null +++ b/internal/pkg/grib/partial_downloader.go @@ -0,0 +1,350 @@ +package grib + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "git.intra.yksa.space/gsn/predictor/internal/pkg/log" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// PartialDownloader загружает только необходимые поля из GRIB файлов +// используя HTTP Range requests и .idx индексные файлы +type PartialDownloader struct { + Dir string + Parallel int + Client *http.Client + Variables []string + ds *DatasetConfig +} + +// NewPartialDownloader создаёт новый partial downloader +func NewPartialDownloader(dir string, parallel int, dc *DatasetConfig) *PartialDownloader { + return &PartialDownloader{ + Dir: dir, + Parallel: parallel, + Client: &http.Client{ + Timeout: 60 * time.Second, + }, + Variables: dc.Variables, + ds: dc, + } +} + +// idxEntry представляет запись из .idx файла +type idxEntry struct { + Index int + ByteStart int64 + Date string + Variable string + Level string + Forecast string +} + +type ProgressWriter struct { + Total int64 + Downloaded int64 + OnProgress func(percent float64) +} + +func (pw *ProgressWriter) Write(p []byte) (int, error) { + n := len(p) + pw.Downloaded += int64(n) + if pw.Total > 0 && pw.OnProgress != nil { + percent := float64(pw.Downloaded) / float64(pw.Total) * 100 + pw.OnProgress(percent) + } + return n, nil +} + +// parseIdx парсит .idx файл и возвращает записи +func (d *PartialDownloader) parseIdx(body []byte) []idxEntry { + var entries []idxEntry + lines := strings.Split(string(body), "\n") + + for _, line := range lines { + if line == "" { + continue + } + parts := strings.Split(line, ":") + if len(parts) < 7 { + continue + } + + byteStart, _ := strconv.ParseInt(parts[1], 10, 64) + entries = append(entries, idxEntry{ + Index: len(entries), + ByteStart: byteStart, + Date: parts[2], + Variable: parts[3], + Level: parts[4], + Forecast: parts[5], + }) + } + return entries +} + +// filterEntries фильтрует записи по нужным переменным и уровням давления +func (d *PartialDownloader) filterEntries(entries []idxEntry) []idxEntry { + var filtered []idxEntry + + for _, e := range entries { + isNeededVar := false + for _, v := range d.Variables { + if v == e.Variable { + isNeededVar = true + break + } + } + + isPressureLevel := strings.HasSuffix(e.Level, " mb") + + if isNeededVar && isPressureLevel { + filtered = append(filtered, e) + } + } + + return filtered +} + +// Вспомогательная функция для выполнения запроса с повторами +func (d *PartialDownloader) doWithRetry(ctx context.Context, req *http.Request) (*http.Response, error) { + var resp *http.Response + var err error + + backoff := 1 * time.Second + maxRetries := 3 + + for i := 0; i < maxRetries; i++ { + resp, err = d.Client.Do(req) + if err == nil && resp.StatusCode < 500 { + return resp, nil + } + + if resp != nil { + resp.Body.Close() + } + + log.Ctx(ctx).Warn("retry download", zap.Int("attempt", i+1), zap.Error(err)) + + select { + case <-time.After(backoff): + backoff *= 2 + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return nil, err +} + +// downloadRange загружает диапазон байтов из URL +func (d *PartialDownloader) downloadRange(ctx context.Context, url string, start, end int64, out io.Writer) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + + rangeHeader := fmt.Sprintf("bytes=%d-", start) + if end > 0 { + rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end) + } + req.Header.Set("Range", rangeHeader) + + resp, err := d.Client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { + return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) + } + + _, err = io.Copy(out, resp.Body) + return err +} + +func (d *PartialDownloader) downloadFieldsFromURL(ctx context.Context, url string, dst string, step int) (err error) { + idxURL := url + ".idx" + tmp := dst + ".part" + + if info, err := os.Stat(dst); err == nil && info.Size() > 0 { + return nil + } + + reqIdx, _ := http.NewRequestWithContext(ctx, http.MethodGet, idxURL, nil) + respIdx, err := d.doWithRetry(ctx, reqIdx) + if err != nil { + return errcodes.Wrap(err, "failed to get idx") + } + defer respIdx.Body.Close() + + idxBody, _ := io.ReadAll(respIdx.Body) + entries := d.parseIdx(idxBody) + filtered := d.filterEntries(entries) + if len(filtered) == 0 { + return nil + } + + var totalBytes int64 + type chunk struct{ start, end int64 } + chunks := make([]chunk, 0, len(filtered)) + + for _, entry := range filtered { + var endByte int64 = -1 + for j, e := range entries { + if e.ByteStart == entry.ByteStart && j+1 < len(entries) { + endByte = entries[j+1].ByteStart - 1 + break + } + } + chunks = append(chunks, chunk{entry.ByteStart, endByte}) + if endByte > 0 { + totalBytes += (endByte - entry.ByteStart + 1) + } + } + + f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + + var downloaded int64 + + err = func() error { + defer f.Close() + bufWriter := bufio.NewWriterSize(f, 1024*1024) + + for i, c := range chunks { + countingWriter := &proxyWriter{ + Writer: bufWriter, + OnWrite: func(n int) { + downloaded += int64(n) + if totalBytes > 0 && i%20 == 0 { + pct := float64(downloaded) / float64(totalBytes) * 100 + log.Ctx(ctx).Debug("download progress", + zap.Int("step", step), + zap.String("pct", fmt.Sprintf("%.1f%%", pct))) + } + }, + } + + if err := d.downloadRange(ctx, url, c.start, c.end, countingWriter); err != nil { + return err + } + } + return bufWriter.Flush() + }() + + if err != nil { + f.Close() + os.Remove(tmp) + return err + } + + return d.safeRename(tmp, dst) +} + +type proxyWriter struct { + io.Writer + OnWrite func(int) +} + +func (p *proxyWriter) Write(data []byte) (int, error) { + n, err := p.Writer.Write(data) + if n > 0 && p.OnWrite != nil { + p.OnWrite(n) + } + return n, err +} + +func (d *PartialDownloader) safeRename(src, dst string) error { + var lastErr error + for i := 0; i < 5; i++ { + if err := os.Rename(src, dst); err == nil { + return nil + } else { + lastErr = err + } + time.Sleep(150 * time.Millisecond) + } + return fmt.Errorf("rename failed: %w", lastErr) +} + +// Run запускает загрузку всех необходимых файлов (pgrb2 + pgrb2b) +func (d *PartialDownloader) Run(ctx context.Context, run time.Time) error { + log.Ctx(ctx).Info("starting partial download", + zap.Time("run", run), + zap.Strings("variables", d.Variables)) + + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + steps := d.ds.Steps() + + for _, step := range steps { + step := step + + // Download primary pgrb2 + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.ds.GribURL(run, step) + dst := filepath.Join(d.Dir, d.ds.FileName(run, step)) + return d.downloadFieldsFromURL(ctx, url, dst, step) + }) + + // Download secondary pgrb2b + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.ds.GribURLB(run, step) + dst := filepath.Join(d.Dir, d.ds.FileNameB(run, step)) + return d.downloadFieldsFromURL(ctx, url, dst, step) + }) + } + + return g.Wait() +} + +// GetLatestModelRun находит последний доступный прогноз GFS +func GetLatestModelRun(ctx context.Context, dc *DatasetConfig) (time.Time, error) { + now := time.Now().UTC() + hour := now.Hour() - (now.Hour() % 6) + current := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC) + + client := &http.Client{Timeout: 10 * time.Second} + + for i := 0; i < 8; i++ { + url := dc.GribURL(current, dc.MaxHour) + + req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) + if err != nil { + current = current.Add(-6 * time.Hour) + continue + } + + resp, err := client.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + log.Ctx(ctx).Info("found latest model run", zap.Time("run", current)) + return current, nil + } + if resp != nil { + resp.Body.Close() + } + + current = current.Add(-6 * time.Hour) + } + + return time.Time{}, errcodes.Wrap(errcodes.ErrDownload, "no recent GFS forecast found") +} diff --git a/internal/pkg/grib/pressure.go b/internal/pkg/grib/pressure.go index add6ff0..1b6dec0 100644 --- a/internal/pkg/grib/pressure.go +++ b/internal/pkg/grib/pressure.go @@ -2,15 +2,6 @@ package grib import "math" -// 47 pressure levels matching tawhiri configuration -var pressureLevels = []float64{ - 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, - 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, - 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, - 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, - 20, 10, 7, 5, 3, 2, 1, -} - func pressureFromAlt(alt float64) float64 { // ICAO ISA return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877) } diff --git a/internal/pkg/grib/s3_downloader.go b/internal/pkg/grib/s3_downloader.go deleted file mode 100644 index 0fa4c70..0000000 --- a/internal/pkg/grib/s3_downloader.go +++ /dev/null @@ -1,265 +0,0 @@ -package grib - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - "golang.org/x/sync/errgroup" -) - -// S3Downloader downloads GRIB files from AWS S3 -type S3Downloader struct { - Dir string - Parallel int - Bucket string - Region string - Client *s3.Client -} - -// NewS3Downloader creates a new S3 downloader with anonymous access -func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) { - // Create AWS config with anonymous credentials for public bucket - cfg, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(region), - config.WithCredentialsProvider(aws.AnonymousCredentials{}), - ) - if err != nil { - return nil, errcodes.Wrap(err, "failed to load AWS config") - } - - client := s3.NewFromConfig(cfg) - - return &S3Downloader{ - Dir: dir, - Parallel: parallel, - Bucket: bucket, - Region: region, - Client: client, - }, nil -} - -// s3Key generates the S3 key for a GRIB file -// Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF -func (d *S3Downloader) s3Key(run string, hour int, step int) string { - return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step) -} - -// CheckFileExists checks if a file exists in S3 using HeadObject -func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) { - input := &s3.HeadObjectInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(key), - } - - result, err := d.Client.HeadObject(ctx, input) - if err != nil { - // Check if error is NotFound - // AWS SDK v2 doesn't export specific error types, check error string - if isNotFoundError(err) { - return false, 0, nil - } - return false, 0, errcodes.Wrap(err, "failed to check file existence") - } - - size := int64(0) - if result.ContentLength != nil { - size = *result.ContentLength - } - - return true, size, nil -} - -// isNotFoundError checks if error is a NotFound error -func isNotFoundError(err error) bool { - if err == nil { - return false - } - // AWS SDK v2 error handling - errStr := err.Error() - return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey") -} - -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false -} - -// ListAvailableFiles lists all available files for a given run -func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) { - prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour) - - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(d.Bucket), - Prefix: aws.String(prefix), - } - - var files []string - paginator := s3.NewListObjectsV2Paginator(d.Client, input) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - return nil, errcodes.Wrap(err, "failed to list S3 objects") - } - - for _, obj := range page.Contents { - if obj.Key != nil { - files = append(files, *obj.Key) - } - } - } - - return files, nil -} - -// fetchFromS3 downloads a file from S3 to local disk with retry logic -func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) { - // Check if final file already exists - if _, err := os.Stat(dst); err == nil { - return nil - } - - const maxRetries = 3 - var lastErr error - - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - // Exponential backoff: 2s, 4s, 8s - waitTime := time.Duration(1< 0 && written != size { - return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size)) - } - - // Close file before rename - if err := f.Close(); err != nil { - return err - } - fileClosed = true - - // If rename fails, err will be set and defer will cleanup .part file - return os.Rename(tmp, dst) -} - -// Run downloads all required GRIB files for a forecast run -func (d *S3Downloader) Run(ctx context.Context, run time.Time) error { - runStr := run.Format("20060102") - hour := run.Hour() - - // First, list available files to verify they exist - availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour) - if err != nil { - return errcodes.Wrap(err, "failed to list available files") - } - - if len(availableFiles) == 0 { - return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour)) - } - - // Build a map of available files for quick lookup - availableMap := make(map[string]bool) - for _, file := range availableFiles { - availableMap[file] = true - } - - g, ctx := errgroup.WithContext(ctx) - sem := make(chan struct{}, d.Parallel) - - for _, step := range steps { - step := step - key := d.s3Key(runStr, hour, step) - - // Check if file is available in S3 - if !availableMap[key] { - // Log warning but don't fail - some forecast hours might not be available yet - continue - } - - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - dst := filepath.Join(d.Dir, fileName(run, step)) - return d.fetchFromS3(ctx, key, dst) - }) - } - - return g.Wait() -} diff --git a/internal/pkg/grib/util.go b/internal/pkg/grib/util.go index 14b00be..1145a61 100644 --- a/internal/pkg/grib/util.go +++ b/internal/pkg/grib/util.go @@ -6,25 +6,11 @@ import ( "time" ) -// Generate steps from 0 to 96 with step 3 hours (33 steps total) -// GFS provides 3-hourly data for 0-120 hours, we use first 96 hours (0, 3, 6, ..., 96) -var steps = func() []int { - result := make([]int, 0, 33) - for i := 0; i <= 96; i += 3 { - result = append(result, i) - } - return result -}() - func nearestRun(t time.Time) time.Time { h := t.UTC().Hour() - t.UTC().Hour()%6 return time.Date(t.Year(), t.Month(), t.Day(), h, 0, 0, 0, time.UTC) } -func fileName(run time.Time, step int) string { - return fmt.Sprintf("gfs.t%02dz.pgrb2.0p50.f%03d", run.Hour(), step) -} - func encodeKey(a ...any) uint64 { h := fnv.New64a() for _, v := range a { diff --git a/internal/service/predictor.go b/internal/service/predictor.go index bb53b3f..9138c5e 100644 --- a/internal/service/predictor.go +++ b/internal/service/predictor.go @@ -90,7 +90,7 @@ func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionPar } } - log.Ctx(ctx).Info("Starting prediction", + log.Ctx(ctx).Warn("🚀 PREDICTION STARTING", zap.String("profile", profile), zap.Float64("lat", *params.LaunchLatitude), zap.Float64("lon", *params.LaunchLongitude), @@ -325,6 +325,10 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame const dt = 10.0 // simulation step in seconds const outputInterval = 60.0 // output every 60 seconds + log.Ctx(ctx).Warn("⬆️ ASCENT SIMULATION STARTING", + zap.Float64("ascentRate", ascentRate), + zap.Float64("targetAlt", targetAltitude)) + lat := *params.LaunchLatitude lon := *params.LaunchLongitude alt := *params.LaunchAltitude @@ -349,12 +353,29 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame }) nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second) + firstExtraction := true windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Warn("Wind extraction failed during ascent", zap.Error(err)) + log.Ctx(ctx).Error("Wind extraction FAILED during ascent", + zap.Error(err), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Time("time", t)) return 0, 0 } + // Log only first extraction and when wind is zero + if firstExtraction || (w[0] == 0 && w[1] == 0) { + log.Ctx(ctx).Warn("Wind data check", + zap.Bool("first", firstExtraction), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Float64("u", w[0]), + zap.Float64("v", w[1])) + firstExtraction = false + } return w[0], w[1] } @@ -370,6 +391,23 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame alt = altNew if alt >= targetAltitude { + alt = targetAltitude + // Record burst point + wU, wV := windFunc(lat, lon, alt, timeCur) + latCopy := lat + lonCopy := lon + altCopy := alt + timeCopy := timeCur + windU := wU + windV := wV + results = append(results, ds.PredicitonResult{ + Latitude: &latCopy, + Longitude: &lonCopy, + Altitude: &altCopy, + Timestamp: &timeCopy, + WindU: &windU, + WindV: &windV, + }) break } @@ -427,14 +465,19 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Warn("Wind extraction failed during descent", zap.Error(err)) + log.Ctx(ctx).Error("Wind extraction FAILED during descent", + zap.Error(err), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Time("time", t)) return 0, 0 } return w[0], w[1] } for alt > targetAltitude { - altRate := -descentRate + altRate := -descentRateAtAlt(descentRate, alt) if customCurve != nil { altRate = -s.getCustomAltitudeRate(customCurve, alt, descentRate) } @@ -445,6 +488,23 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam alt = altNew if alt <= targetAltitude { + alt = targetAltitude + // Record landing point + wU, wV := windFunc(lat, lon, alt, timeCur) + latCopy := lat + lonCopy := lon + altCopy := alt + timeCopy := timeCur + windU := wU + windV := wV + results = append(results, ds.PredicitonResult{ + Latitude: &latCopy, + Longitude: &lonCopy, + Altitude: &altCopy, + Timestamp: &timeCopy, + WindU: &windU, + WindV: &windV, + }) break } @@ -539,6 +599,37 @@ func (s *Service) simulateFloat(ctx context.Context, startResult ds.PredicitonRe return results } +// airDensity returns ISA air density in kg/m³ at given altitude in meters +func airDensity(h float64) float64 { + var T, p float64 + switch { + case h < 11000: + T = 288.15 - 0.0065*h + p = 101325 * math.Pow(T/288.15, 5.2561) + case h < 20000: + T = 216.65 + p = 22632.1 * math.Exp(-0.00015769*(h-11000)) + case h < 32000: + T = 216.65 + 0.001*(h-20000) + p = 5474.89 * math.Pow(T/216.65, -34.1632) + default: + T = 228.65 + 0.0028*(h-32000) + p = 868.019 * math.Pow(T/228.65, -12.2009) + } + return p / (287.05 * T) +} + +// descentRateAtAlt returns descent rate adjusted for air density at altitude. +// descent_rate parameter is the sea-level rate. At altitude, thinner air means faster descent. +func descentRateAtAlt(seaLevelRate, alt float64) float64 { + rho0 := airDensity(0) + rhoH := airDensity(alt) + if rhoH <= 0 { + return seaLevelRate + } + return seaLevelRate * math.Sqrt(rho0/rhoH) +} + func (s *Service) simulateCustomAscent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult { // Implementation for custom ascent curve // This would interpolate the altitude rate from the custom curve diff --git a/scripts/compare.go b/scripts/compare.go new file mode 100644 index 0000000..5e50653 --- /dev/null +++ b/scripts/compare.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/json" + "fmt" + "math" + "os" +) + +type Point struct { + Datetime string `json:"datetime"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` +} + +type Stage struct { + Stage string `json:"stage"` + Trajectory []Point `json:"trajectory"` +} + +type Prediction struct { + Prediction []Stage `json:"prediction"` +} + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + R := 6371000.0 + phi1, phi2 := lat1*math.Pi/180, lat2*math.Pi/180 + dphi := (lat2 - lat1) * math.Pi / 180 + dlam := (lon2 - lon1) * math.Pi / 180 + a := math.Sin(dphi/2)*math.Sin(dphi/2) + math.Cos(phi1)*math.Cos(phi2)*math.Sin(dlam/2)*math.Sin(dlam/2) + return R * 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) +} + +func load(path string) Prediction { + data, _ := os.ReadFile(path) + var p Prediction + json.Unmarshal(data, &p) + return p +} + +func main() { + our := load("c:/tmp/our.json") + taw := load("c:/tmp/tawhiri.json") + + // Find burst and landing points + var ourBurst, ourLand, tawBurst, tawLand Point + for _, s := range our.Prediction { + t := s.Trajectory + if s.Stage == "ascent" { + ourBurst = t[len(t)-1] + } + if s.Stage == "descent" { + ourLand = t[len(t)-1] + } + } + for _, s := range taw.Prediction { + t := s.Trajectory + if s.Stage == "ascent" { + tawBurst = t[len(t)-1] + } + if s.Stage == "descent" { + tawLand = t[len(t)-1] + } + } + + fmt.Println("=== Burst Point ===") + fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourBurst.Latitude, ourBurst.Longitude, ourBurst.Altitude, ourBurst.Datetime) + fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawBurst.Latitude, tawBurst.Longitude, tawBurst.Altitude, tawBurst.Datetime) + burstDist := haversine(ourBurst.Latitude, ourBurst.Longitude, tawBurst.Latitude, tawBurst.Longitude) + fmt.Printf(" Distance: %.2f km\n", burstDist/1000) + + fmt.Println() + fmt.Println("=== Landing Point ===") + fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourLand.Latitude, ourLand.Longitude, ourLand.Altitude, ourLand.Datetime) + fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawLand.Latitude, tawLand.Longitude, tawLand.Altitude, tawLand.Datetime) + landDist := haversine(ourLand.Latitude, ourLand.Longitude, tawLand.Latitude, tawLand.Longitude) + fmt.Printf(" Distance: %.2f km\n", landDist/1000) + + fmt.Println() + fmt.Println("=== Trajectory Comparison (every 10 min) ===") + ourPts := map[string]Point{} + tawPts := map[string]Point{} + for _, s := range our.Prediction { + for _, p := range s.Trajectory { + ourPts[p.Datetime] = p + } + } + for _, s := range taw.Prediction { + for _, p := range s.Trajectory { + tawPts[p.Datetime] = p + } + } + + // Collect common times + var common []string + for _, s := range our.Prediction { + for _, p := range s.Trajectory { + if _, ok := tawPts[p.Datetime]; ok { + common = append(common, p.Datetime) + } + } + } + + for i, t := range common { + if i%10 == 0 { + o := ourPts[t] + tw := tawPts[t] + d := haversine(o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) + fmt.Printf(" %s: dist=%.2f km (our: %.3f,%.3f vs taw: %.3f,%.3f)\n", + t, d/1000, o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) + } + } +} diff --git a/scripts/rebuild_cube.go b/scripts/rebuild_cube.go new file mode 100644 index 0000000..240b707 --- /dev/null +++ b/scripts/rebuild_cube.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + // Delete old cube to force rebuild + cubePath := "C:/tmp/grib/20260212_12.cube" + if err := os.Remove(cubePath); err != nil && !os.IsNotExist(err) { + fmt.Printf("Remove cube error: %v\n", err) + } else { + fmt.Println("Old cube removed") + } + + // Update will download missing pgrb2b files and rebuild cube + fmt.Println("Starting update (download pgrb2b + rebuild cube)...") + start := time.Now() + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + fmt.Printf("Done in %v\n", time.Since(start)) +} diff --git a/scripts/test_download.go b/scripts/test_download.go new file mode 100644 index 0000000..73934a5 --- /dev/null +++ b/scripts/test_download.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Найти последний доступный прогноз + run, err := grib.GetLatestModelRun(ctx) + if err != nil { + fmt.Printf("Error finding model run: %v\n", err) + return + } + fmt.Printf("Found model run: %v\n", run) + + // Создать downloader + dl := grib.NewPartialDownloader("C:/tmp/grib", 8) + + // Запустить загрузку + start := time.Now() + fmt.Println("Starting download...") + + err = dl.Run(ctx, run) + if err != nil { + fmt.Printf("Download error: %v\n", err) + return + } + + fmt.Printf("Download completed in %v\n", time.Since(start)) +} diff --git a/scripts/test_gh.go b/scripts/test_gh.go new file mode 100644 index 0000000..bfb592c --- /dev/null +++ b/scripts/test_gh.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "os" + + mmap "github.com/edsrzf/mmap-go" +) + +var pressureLevels = []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, +} + +func main() { + f, _ := os.Open("C:/tmp/grib/20260212_12.cube") + mm, _ := mmap.Map(f, mmap.RDONLY, 0) + defer mm.Unmap() + defer f.Close() + + const ( + nT = 97 + nP = 47 + nLat = 721 + nLon = 1440 + ) + bytesPerVar := int64(nT * nP * nLat * nLon * 4) + + val := func(varIdx, ti, pi, y, x int) float32 { + idx := (((ti*nP + pi) * nLat) + y) * nLon + x + off := int64(varIdx)*bytesPerVar + int64(idx)*4 + bits := binary.LittleEndian.Uint32(mm[off : off+4]) + return math.Float32frombits(bits) + } + + // Check gh values at lat=52.2N (y=(90-52.2)*4=151.2 → y=151), lon=0.1E (x=0.1*4=0.4 → x=0) + // Time step 9 (9 hours into forecast) + ti := 9 + y := 151 + x := 0 + + fmt.Println("GH values at (52.25N, 0E), t=+9h:") + fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "GH(m)") + for pi := 0; pi < nP; pi++ { + gh := val(0, ti, pi, y, x) + fmt.Printf("%8d %8.0f %10.1f\n", pi, pressureLevels[pi], gh) + } + + fmt.Println("\nU-wind values at same point:") + fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "U(m/s)") + for pi := 0; pi < nP; pi++ { + u := val(1, ti, pi, y, x) + fmt.Printf("%8d %8.0f %10.2f\n", pi, pressureLevels[pi], u) + } +} diff --git a/scripts/test_grib_read.go b/scripts/test_grib_read.go new file mode 100644 index 0000000..3b8b7eb --- /dev/null +++ b/scripts/test_grib_read.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "os" + + "github.com/nilsmagnus/grib/griblib" +) + +func main() { + f, err := os.Open("C:/tmp/grib/gfs.t18z.pgrb2.0p25.f000") + if err != nil { + fmt.Printf("Error opening file: %v\n", err) + return + } + defer f.Close() + + messages, err := griblib.ReadMessages(f) + if err != nil { + fmt.Printf("Error reading GRIB: %v\n", err) + return + } + + fmt.Printf("Found %d messages\n\n", len(messages)) + + for i, m := range messages { + product := m.Section4.ProductDefinitionTemplate + if product.ParameterCategory != 2 || product.ParameterNumber != 2 { + continue // only u-wind + } + fmt.Printf("UGRD Msg %d: SurfType=%d SurfValue=%d SurfScale=%d DataLen=%d\n", + i, + product.FirstSurface.Type, + product.FirstSurface.Value, + product.FirstSurface.Scale, + len(m.Data())) + } +} diff --git a/scripts/test_prediction.go b/scripts/test_prediction.go new file mode 100644 index 0000000..fb56c5a --- /dev/null +++ b/scripts/test_prediction.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Инициализируем GRIB сервис + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error creating service: %v\n", err) + return + } + + // Обновляем данные (создаёт куб) + fmt.Println("Updating GRIB data (building cube)...") + start := time.Now() + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + fmt.Printf("Cube built in %v\n", time.Since(start)) + + // Тестируем извлечение ветра + fmt.Println("\nTesting wind extraction...") + lat, lon, alt := 52.2, 0.1, 10000.0 + ts := time.Date(2026, 2, 11, 12, 0, 0, 0, time.UTC) + + wind, err := svc.Extract(ctx, lat, lon, alt, ts) + if err != nil { + fmt.Printf("Extract error: %v\n", err) + return + } + fmt.Printf("Wind at (%.2f, %.2f, %.0fm) at %v:\n", lat, lon, alt, ts) + fmt.Printf(" U (east): %.2f m/s\n", wind[0]) + fmt.Printf(" V (north): %.2f m/s\n", wind[1]) + + // Сравниваем с Tawhiri + fmt.Println("\nComparing with Tawhiri API...") + tawhiriURL := fmt.Sprintf( + "https://api.v2.sondehub.org/tawhiri?launch_latitude=%.2f&launch_longitude=%.2f&launch_altitude=0&launch_datetime=%s&ascent_rate=5&burst_altitude=30000&descent_rate=5", + lat, lon, ts.Format(time.RFC3339), + ) + + resp, err := http.Get(tawhiriURL) + if err != nil { + fmt.Printf("Tawhiri request error: %v\n", err) + return + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + var tawhiriResp map[string]interface{} + json.Unmarshal(body, &tawhiriResp) + + // Выводим финальную точку приземления + if prediction, ok := tawhiriResp["prediction"].([]interface{}); ok { + for _, stage := range prediction { + stageMap := stage.(map[string]interface{}) + if stageMap["stage"] == "descent" { + trajectory := stageMap["trajectory"].([]interface{}) + if len(trajectory) > 0 { + last := trajectory[len(trajectory)-1].(map[string]interface{}) + fmt.Printf("\nTawhiri landing point:\n") + fmt.Printf(" Lat: %.4f\n", last["latitude"]) + fmt.Printf(" Lon: %.4f\n", last["longitude"]) + } + } + } + } +} diff --git a/scripts/test_s3_download.go b/scripts/test_s3_download.go deleted file mode 100644 index 0391cd6..0000000 --- a/scripts/test_s3_download.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - // Create S3 downloader - downloader, err := grib.NewS3Downloader( - "/tmp/grib_test", - 4, // parallel downloads - "noaa-gfs-bdp-pds", - "us-east-1", - ) - if err != nil { - log.Fatalf("Failed to create S3 downloader: %v", err) - } - - // Ensure directory exists - if err := os.MkdirAll("/tmp/grib_test", 0o755); err != nil { - log.Fatalf("Failed to create directory: %v", err) - } - - // Find nearest run (6-hour intervals: 00, 06, 12, 18 UTC) - now := time.Now().UTC() - hour := now.Hour() - (now.Hour() % 6) - // Use data from 6 hours ago to ensure it's available - run := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC).Add(-6 * time.Hour) - - fmt.Printf("Testing S3 download for run: %s\n", run.Format("2006-01-02 15:04 MST")) - - // List available files first - runStr := run.Format("20060102") - fmt.Printf("Listing available files for %s/%02d...\n", runStr, run.Hour()) - files, err := downloader.ListAvailableFiles(ctx, runStr, run.Hour()) - if err != nil { - log.Fatalf("Failed to list files: %v", err) - } - - fmt.Printf("Found %d files in S3:\n", len(files)) - if len(files) > 0 { - // Show first 5 files - for i, file := range files { - if i >= 5 { - fmt.Printf("... and %d more files\n", len(files)-5) - break - } - fmt.Printf(" - %s\n", file) - } - } - - // Try downloading just first 3 forecast hours (f000, f001, f002) - fmt.Println("\nTesting download of first 3 forecast hours...") - testRun := run - - // Create a timeout context for the download - downloadCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - if err := downloader.Run(downloadCtx, testRun); err != nil { - log.Fatalf("Failed to download: %v", err) - } - - fmt.Println("\nDownload completed successfully!") - - // Check downloaded files - entries, err := os.ReadDir("/tmp/grib_test") - if err != nil { - log.Fatalf("Failed to read directory: %v", err) - } - - fmt.Printf("\nDownloaded %d files:\n", len(entries)) - for i, entry := range entries { - if i >= 10 { - fmt.Printf("... and %d more files\n", len(entries)-10) - break - } - info, _ := entry.Info() - fmt.Printf(" - %s (%.2f MB)\n", entry.Name(), float64(info.Size())/1024/1024) - } -} diff --git a/scripts/test_s3_simple.go b/scripts/test_s3_simple.go deleted file mode 100644 index 2ae8414..0000000 --- a/scripts/test_s3_simple.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "log" - "os" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -func main() { - ctx := context.Background() - - // Create AWS config with anonymous credentials - cfg, err := config.LoadDefaultConfig(ctx, - config.WithRegion("us-east-1"), - config.WithCredentialsProvider(aws.AnonymousCredentials{}), - ) - if err != nil { - log.Fatalf("Failed to load config: %v", err) - } - - client := s3.NewFromConfig(cfg) - - // Try to download a single file - bucket := "noaa-gfs-bdp-pds" - key := "gfs.20251020/00/atmos/gfs.t00z.pgrb2.0p50.f000" - - fmt.Printf("Downloading: s3://%s/%s\n", bucket, key) - - input := &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - } - - result, err := client.GetObject(ctx, input) - if err != nil { - log.Fatalf("Failed to get object: %v", err) - } - defer result.Body.Close() - - // Create output file - outFile := "/tmp/test_grib.part" - f, err := os.Create(outFile) - if err != nil { - log.Fatalf("Failed to create file: %v", err) - } - defer f.Close() - - // Copy data - written, err := io.Copy(f, result.Body) - if err != nil { - log.Fatalf("Failed to copy data: %v (wrote %d bytes)", err, written) - } - - fmt.Printf("Successfully downloaded %d bytes\n", written) - - // Rename - if err := os.Rename(outFile, "/tmp/test_grib"); err != nil { - log.Fatalf("Failed to rename: %v", err) - } - - fmt.Println("Download complete!") -} diff --git a/scripts/test_wind.go b/scripts/test_wind.go new file mode 100644 index 0000000..61a231a --- /dev/null +++ b/scripts/test_wind.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + + // Test wind at lat=52.2, lon=0.1 at various altitudes + // Run is 2026-02-12T12:00Z, request time 21:00Z = +9 hours + ts := time.Date(2026, 2, 12, 21, 0, 0, 0, time.UTC) + lat, lon := 52.2, 0.1 + + fmt.Println("Wind at (52.2°N, 0.1°E) at 2026-02-12T21:00Z:") + fmt.Printf("%8s %8s %8s\n", "Alt(m)", "U(m/s)", "V(m/s)") + + for _, alt := range []float64{0, 1000, 3000, 5000, 7000, 10000, 15000, 20000, 25000, 30000} { + w, err := svc.Extract(ctx, lat, lon, alt, ts) + if err != nil { + fmt.Printf("%8.0f Error: %v\n", alt, err) + continue + } + fmt.Printf("%8.0f %8.2f %8.2f\n", alt, w[0], w[1]) + } + + // Also test at a few nearby points to check spatial consistency + fmt.Println("\nWind at 10km altitude, varying longitude:") + for _, testLon := range []float64{0.0, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 350.0, 359.75} { + w, _ := svc.Extract(ctx, lat, testLon, 10000, ts) + fmt.Printf(" lon=%6.2f: U=%8.2f V=%8.2f\n", testLon, w[0], w[1]) + } +} diff --git a/start_with_http.sh b/start_with_http.sh new file mode 100644 index 0000000..4173c2b --- /dev/null +++ b/start_with_http.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# Start API with HTTP downloads instead of S3 + +export GSN_PREDICTOR_GRIB_USE_S3=false + +echo "Starting API with HTTP downloads from NOMADS..." +echo "USE_S3 = $GSN_PREDICTOR_GRIB_USE_S3" +echo "" + +cd "$(dirname "$0")" +go run ./cmd/api/main.go