diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..af05740 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,17 @@ +{ + "permissions": { + "allow": [ + "Bash(cat:*)", + "Bash(xargs:*)", + "Bash(ls:*)", + "Bash(done)", + "Bash(curl:*)", + "WebFetch(domain:raw.githubusercontent.com)", + "WebFetch(domain:github.com)", + "Bash(go run:*)", + "Bash(pkill:*)" + ], + "deny": [], + "ask": [] + } +} diff --git a/.dockerignore b/.dockerignore index 287fc68..22e12b6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -47,7 +47,7 @@ predictor # Temporary files /tmp/ /temp/ - +*.py # Test coverage *.out diff --git a/.gitignore b/.gitignore index 8519b69..9ffe4dd 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +*.ps1 # Test binary, built with `go test -c` *.test @@ -28,6 +29,9 @@ go.work *.swp *.swo *~ +*.bak +*.py +*.json # OS generated files .DS_Store @@ -59,4 +63,6 @@ Thumbs.db # Tawhiri /tawhiri -/tawhiri/* \ No newline at end of file +/tawhiri/* + +*.md \ No newline at end of file diff --git a/Makefile b/Makefile index c6c35d9..42d11c5 100644 --- a/Makefile +++ b/Makefile @@ -105,7 +105,9 @@ help: @echo " run-local - Run locally" @echo " fmt - Format code" @echo " lint - Lint code" + @echo " generate-ogen - Generate OpenAPI code from swagger spec" @echo " help - Show this help" +.PHONY: generate-ogen generate-ogen: go run github.com/ogen-go/ogen/cmd/ogen@latest --target pkg/rest -package gsn --clean api/rest/predictor.swagger.yml \ No newline at end of file diff --git a/api/rest/predictor.swagger.yml b/api/rest/predictor.swagger.yml index 73e3381..fc408f9 100644 --- a/api/rest/predictor.swagger.yml +++ b/api/rest/predictor.swagger.yml @@ -61,6 +61,13 @@ paths: name: descent_curve schema: type: string + - in: query + name: simulate_stages + schema: + type: array + items: + type: string + enum: [ascent, descent, float] - in: query name: interpolate schema: @@ -147,7 +154,7 @@ components: properties: stage: type: string - enum: ["ascent", "descent"] + enum: ["ascent", "descent", "float"] trajectory: type: array items: diff --git a/assemble_cube.go b/assemble_cube.go new file mode 100644 index 0000000..c624e92 --- /dev/null +++ b/assemble_cube.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "os" + "time" +) + +// This will be a simple wrapper that calls the internal assembleCube function +// We'll compile it as part of the grib package + +func main() { + dir := "C:/tmp/grib" + run := time.Date(2025, 12, 6, 0, 0, 0, 0, time.UTC) + cubePath := fmt.Sprintf("%s/%s.cube", dir, run.Format("20060102_15")) + + fmt.Printf("Assembling cube from existing GRIB files...\n") + fmt.Printf("Directory: %s\n", dir) + fmt.Printf("Run: %s\n", run.Format("2006-01-02 15:04 MST")) + fmt.Printf("Output: %s\n", cubePath) + fmt.Println() + + // Just print instructions - we'll do it directly + fmt.Println("Run this Go code to assemble:") + fmt.Printf("cd internal/pkg/grib && go test -run TestAssemble\n") +} diff --git a/cmd/api/main.go b/cmd/api/main.go index 2e9bf6b..f250199 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -24,6 +24,7 @@ func main() { panic(err) } defer lg.Sync() + zap.ReplaceGlobals(lg) ctx := log.ToCtx(context.Background(), lg) schedulerConfig, err := scheduler.NewConfig() diff --git a/go.mod b/go.mod index 079e12c..b186e37 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/nilsmagnus/grib v1.2.8 github.com/ogen-go/ogen v1.16.0 github.com/rs/cors v1.11.1 + github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 @@ -19,24 +20,7 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2 v1.39.3 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect - github.com/aws/aws-sdk-go-v2/config v1.31.13 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.18.17 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 // indirect - github.com/aws/smithy-go v1.23.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/fatih/color v1.18.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -46,9 +30,11 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -57,4 +43,5 @@ require ( golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index eaa1790..7c95f5c 100644 --- a/go.sum +++ b/go.sum @@ -1,39 +1,3 @@ -github.com/aws/aws-sdk-go-v2 v1.39.3 h1:h7xSsanJ4EQJXG5iuW4UqgP7qBopLpj84mpkNx3wPjM= -github.com/aws/aws-sdk-go-v2 v1.39.3/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= -github.com/aws/aws-sdk-go-v2/config v1.31.13 h1:wcqQB3B0PgRPUF5ZE/QL1JVOyB0mbPevHFoAMpemR9k= -github.com/aws/aws-sdk-go-v2/config v1.31.13/go.mod h1:ySB5D5ybwqGbT6c3GszZ+u+3KvrlYCUQNo62+hkKOFk= -github.com/aws/aws-sdk-go-v2/credentials v1.18.17 h1:skpEwzN/+H8cdrrtT8y+rvWJGiWWv0DeNAe+4VTf+Vs= -github.com/aws/aws-sdk-go-v2/credentials v1.18.17/go.mod h1:Ed+nXsaYa5uBINovJhcAWkALvXw2ZLk36opcuiSZfJM= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 h1:UuGVOX48oP4vgQ36oiKmW9RuSeT8jlgQgBFQD+HUiHY= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10/go.mod h1:vM/Ini41PzvudT4YkQyE/+WiQJiQ6jzeDyU8pQKwCac= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 h1:mj/bdWleWEh81DtpdHKkw41IrS+r3uw1J/VQtbwYYp8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10/go.mod h1:7+oEMxAZWP8gZCyjcm9VicI0M61Sx4DJtcGfKYv2yKQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 h1:wh+/mn57yhUrFtLIxyFPh2RgxgQz/u+Yrf7hiHGHqKY= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10/go.mod h1:7zirD+ryp5gitJJ2m1BBux56ai8RIRDykXZrJSp540w= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 h1:FHw90xCTsofzk6vjU808TSuDtDfOOKPNdz5Weyc3tUI= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10/go.mod h1:n8jdIE/8F3UYkg8O4IGkQpn2qUmapg/1K1yl29/uf/c= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 h1:ne+eepnDB2Wh5lHKzELgEncIqeVlQ1rSF9fEa4r5I+A= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1/go.mod h1:u0Jkg0L+dcG1ozUq21uFElmpbmjBnhHR5DELHIme4wg= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 h1:DRND0dkCKtJzCj4Xl4OpVbXZgfttY5q712H9Zj7qc/0= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10/go.mod h1:tGGNmJKOTernmR2+VJ0fCzQRurcPZj9ut60Zu5Fi6us= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 h1:DA+Hl5adieRyFvE7pCvBWm3VOZTRexGVkXw33SUqNoY= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10/go.mod h1:L+A89dH3/gr8L4ecrdzuXUYd1znoko6myzndVGZx/DA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 h1:FlGScxzCGNzT+2AvHT1ZGMvxTwAMa6gsooFb1pO/AiM= -github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5/go.mod h1:N/iojY+8bW3MYol9NUMuKimpSbPEur75cuI1SmtonFM= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 h1:fspVFg6qMx0svs40YgRmE7LZXh9VRZvTT35PfdQR6FM= -github.com/aws/aws-sdk-go-v2/service/sso v1.29.7/go.mod h1:BQTKL3uMECaLaUV3Zc2L4Qybv8C6BIXjuu1dOPyxTQs= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 h1:scVnW+NLXasGOhy7HhkdT9AGb6kjgW7fJ5xYkUaqHs0= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2/go.mod h1:FRNCY3zTEWZXBKm2h5UBUPvCVDOecTad9KhynDyGBc0= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 h1:VEO5dqFkMsl8QZ2yHsFDJAIZLAkEbaYDB+xdKi0Feic= -github.com/aws/aws-sdk-go-v2/service/sts v1.38.7/go.mod h1:L1xxV3zAdB+qVrVW/pBIrIAnHFWHo6FBbFe4xOGsG/o= -github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= -github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -57,8 +21,6 @@ github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb github.com/go-faster/yaml v0.4.6 h1:lOK/EhI04gCpPgPhgt0bChS6bvw7G3WwI8xxVe0sw9I= github.com/go-faster/yaml v0.4.6/go.mod h1:390dRIvV4zbnO7qC9FGo6YYutc+wyyUSHBgbXL52eXk= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -77,17 +39,12 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nilsmagnus/grib v1.2.8 h1:H7ch/1/agaCqM3MC8hW1Ft+EJ+q2XB757uml/IfPvp4= github.com/nilsmagnus/grib v1.2.8/go.mod h1:XHm+5zuoOk0NSIWaGmA3JaAxI4i50YvD1L1vz+aqPOQ= -github.com/ogen-go/ogen v1.14.0 h1:TU1Nj4z9UBsAfTkf+IhuNNp7igdFQKqkk9+6/y4XuWg= -github.com/ogen-go/ogen v1.14.0/go.mod h1:Iw1vkqkx6SU7I9th5ceP+fVPJ6Wge4e3kAVzAxJEpPE= github.com/ogen-go/ogen v1.16.0 h1:fKHEYokW/QrMzVNXId74/6RObRIUs9T2oroGKtR25Iw= github.com/ogen-go/ogen v1.16.0/go.mod h1:s3nWiMzybSf8fhxckyO+wtto92+QHpEL8FmkPnhL3jI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -97,13 +54,10 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= -github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= -github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= @@ -111,30 +65,22 @@ github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+D github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= -go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= -go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= -go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= -go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= -go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -144,26 +90,15 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 h1:Di6/M8l0O2lCLc6VVRWhgCiApHV8MnQurBnFSHsQtNY= -golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/exp v0.0.0-20251017212417-90e834f514db h1:by6IehL4BH5k3e3SJmcoNbOobMey2SLpAF79iPOEBvw= golang.org/x/exp v0.0.0-20251017212417-90e834f514db/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= -golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/pkg/ds/predictor.go b/internal/pkg/ds/predictor.go index 753b0b2..c7c33c8 100644 --- a/internal/pkg/ds/predictor.go +++ b/internal/pkg/ds/predictor.go @@ -19,6 +19,7 @@ type PredictionParameters struct { StopDatetime *time.Time AscentCurve *string // base64 DescentCurve *string // base64 + SimulateStages []string Interpolate *bool Format *string Dataset *time.Time @@ -85,5 +86,11 @@ func ConvertFlatPredictionParams(params api.PerformPredictionParams) *Prediction if v, ok := params.Dataset.Get(); ok { out.Dataset = &v } + if len(params.SimulateStages) > 0 { + out.SimulateStages = make([]string, len(params.SimulateStages)) + for i, stage := range params.SimulateStages { + out.SimulateStages[i] = string(stage) + } + } return out } diff --git a/internal/pkg/grib/assemble_test.go b/internal/pkg/grib/assemble_test.go new file mode 100644 index 0000000..8d9d7ce --- /dev/null +++ b/internal/pkg/grib/assemble_test.go @@ -0,0 +1,25 @@ +package grib + +import ( + "testing" + "time" +) + +func TestAssembleCubeFromExisting(t *testing.T) { + dir := "C:/tmp/grib" + run := time.Date(2026, 1, 16, 6, 0, 0, 0, time.UTC) + cubePath := dir + "/" + run.Format("20060102_15") + ".cube" + + t.Logf("Assembling cube from existing GRIB files...") + t.Logf("Directory: %s", dir) + t.Logf("Run: %s", run.Format("2006-01-02 15:04 MST")) + t.Logf("Output: %s", cubePath) + + err := assembleCube(dir, run, cubePath) + if err != nil { + t.Fatalf("Failed to assemble cube: %v", err) + } + + t.Logf("✓ Cube assembled successfully!") + t.Logf("Cube file: %s", cubePath) +} diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index 009645d..c80462e 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -1,23 +1,130 @@ package grib import ( + "fmt" "time" "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" env "github.com/caarlos0/env/v11" ) +// DatasetConfig описывает параметры GFS-датасета: сетку, временные шаги, +// уровни давления и URL для загрузки. +type DatasetConfig struct { + // Сетка + Resolution float64 // шаг сетки в градусах (0.25 или 0.5) + NLat int // точек по широте (721 для 0.25°, 361 для 0.5°) + NLon int // точек по долготе (1440 для 0.25°, 720 для 0.5°) + + // Время + NT int // кол-во временных шагов (97 для 0–96 ч с шагом 1) + MaxHour int // последний час прогноза (96) + TimeStep int // интервал между шагами, часы (1 или 3) + + // Вертикаль + NP int // кол-во уровней давления + Levels []float64 // уровни давления в гПа, по убыванию (1000 … 1) + + // Переменные в кубе (порядок важен: индексы 0, 1, 2, …) + NVar int // кол-во переменных + Variables []string // GRIB-имена для фильтрации idx (HGT, UGRD, VGRD) + + // URL загрузки (fmt-шаблоны: date, hour, hour, step) + URLMask string // основной pgrb2 + URLMaskB string // дополнительный pgrb2b + + // Имена файлов + FileSuffix string // токен разрешения в именах файлов ("0p25", "0p50") +} + +// SizePerVar возвращает размер одной переменной в кубе, байт. +func (dc *DatasetConfig) SizePerVar() int64 { + return int64(dc.NT) * int64(dc.NP) * int64(dc.NLat) * int64(dc.NLon) * 4 +} + +// CubeSize возвращает полный размер куба, байт. +func (dc *DatasetConfig) CubeSize() int64 { + return dc.SizePerVar() * int64(dc.NVar) +} + +// GridSize возвращает NLat * NLon. +func (dc *DatasetConfig) GridSize() int { + return dc.NLat * dc.NLon +} + +// InvResolution возвращает 1/Resolution — множитель для перевода координат в индексы. +func (dc *DatasetConfig) InvResolution() float64 { + return 1.0 / dc.Resolution +} + +// Steps возвращает список часов прогноза [0, TimeStep, 2*TimeStep, …, MaxHour]. +func (dc *DatasetConfig) Steps() []int { + out := make([]int, 0, dc.NT) + for h := 0; h <= dc.MaxHour; h += dc.TimeStep { + out = append(out, h) + } + return out +} + +// FileName возвращает имя основного GRIB-файла (pgrb2). +func (dc *DatasetConfig) FileName(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2.%s.f%03d", run.Hour(), dc.FileSuffix, step) +} + +// FileNameB возвращает имя вторичного GRIB-файла (pgrb2b). +func (dc *DatasetConfig) FileNameB(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2b.%s.f%03d", run.Hour(), dc.FileSuffix, step) +} + +// GribURL возвращает URL основного GRIB-файла. +func (dc *DatasetConfig) GribURL(run time.Time, step int) string { + return fmt.Sprintf(dc.URLMask, run.Format("20060102"), run.Hour(), run.Hour(), step) +} + +// GribURLB возвращает URL вторичного GRIB-файла. +func (dc *DatasetConfig) GribURLB(run time.Time, step int) string { + return fmt.Sprintf(dc.URLMaskB, run.Format("20060102"), run.Hour(), run.Hour(), step) +} + +// DefaultDatasetConfig возвращает конфиг GFS 0.25° / 1 час / 47 уровней. +func DefaultDatasetConfig() DatasetConfig { + return DatasetConfig{ + Resolution: 0.25, + NLat: 721, + NLon: 1440, + + NT: 97, + MaxHour: 96, + TimeStep: 1, + + NP: 47, + Levels: []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, + }, + + NVar: 3, + Variables: []string{"HGT", "UGRD", "VGRD"}, + + URLMask: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p25.f%03d", + URLMaskB: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2b.0p25.f%03d", + + FileSuffix: "0p25", + } +} + +// --------------------------------------------------------------------------- + type Config struct { - Dir string `env:"DIR" envDefault:"/tmp/grib"` - TTL time.Duration `env:"TTL" envDefault:"24h"` - CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` - Parallel int `env:"PARALLEL" envDefault:"8"` - DatasetURL string `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"` - // S3 configuration - UseS3 bool `env:"USE_S3" envDefault:"true"` - S3Bucket string `env:"S3_BUCKET" envDefault:"noaa-gfs-bdp-pds"` - S3Region string `env:"S3_REGION" envDefault:"us-east-1"` - S3Timeout time.Duration `env:"S3_TIMEOUT" envDefault:"300s"` + Dir string `env:"DIR" envDefault:"C:/tmp/grib"` + TTL time.Duration `env:"TTL" envDefault:"48h"` + CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` + Parallel int `env:"PARALLEL" envDefault:"8"` + + Dataset DatasetConfig } func NewConfig() (*Config, error) { @@ -27,6 +134,6 @@ func NewConfig() (*Config, error) { }); err != nil { return nil, errcodes.Wrap(err, "failed to parse GRIB config") } - + cfg.Dataset = DefaultDatasetConfig() return cfg, nil } diff --git a/internal/pkg/grib/create_dataset.go b/internal/pkg/grib/create_dataset.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go index d2015ec..dfc3606 100644 --- a/internal/pkg/grib/cube.go +++ b/internal/pkg/grib/cube.go @@ -15,7 +15,7 @@ type cube struct { file *os.File } -func openCube(path string) (*cube, error) { +func openCube(path string, dc *DatasetConfig) (*cube, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -27,14 +27,15 @@ func openCube(path string) (*cube, error) { return nil, err } - const ( - nT = 97 // 0-96 hours with step 1 hour - nP = 47 // 47 pressure levels matching tawhiri - nLat = 361 - nLon = 720 - ) - - return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4), file: f}, nil + return &cube{ + mm: mm, + t: dc.NT, + p: dc.NP, + lat: dc.NLat, + lon: dc.NLon, + bytesPerVar: dc.SizePerVar(), + file: f, + }, nil } func (c *cube) val(varIdx, ti, pi, y, x int) float32 { diff --git a/internal/pkg/grib/dataset.go b/internal/pkg/grib/dataset.go index e539f65..f994dc9 100644 --- a/internal/pkg/grib/dataset.go +++ b/internal/pkg/grib/dataset.go @@ -2,6 +2,7 @@ package grib type dataset struct { cube *cube + ds *DatasetConfig runUTC int64 // unix seconds } diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go deleted file mode 100644 index a93a64c..0000000 --- a/internal/pkg/grib/downloader.go +++ /dev/null @@ -1,91 +0,0 @@ -package grib - -import ( - "context" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" - "golang.org/x/sync/errgroup" -) - -type Downloader struct { - Dir string - Parallel int - Client *http.Client - DatasetURL string -} - -func (d *Downloader) fileURL(run string, hour int, step int) string { - return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", d.DatasetURL, run, hour, hour, step) -} - -func (d *Downloader) fetch(ctx context.Context, url, dst string) (err error) { - // Check if final file already exists - if _, err := os.Stat(dst); err == nil { - return nil - } - - tmp := dst + ".part" - - // Remove old .part file if it exists (fixes race condition) - os.Remove(tmp) - - f, err := os.Create(tmp) - if err != nil { - return err - } - - // Cleanup .part file on any error (using named return value) - defer func() { - f.Close() - if err != nil { - os.Remove(tmp) - } - }() - - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - resp, err := d.Client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) - } - - if _, err := io.Copy(f, resp.Body); err != nil { - return err - } - - // Close file before rename - if err := f.Close(); err != nil { - return err - } - - // If rename fails, err will be set and defer will cleanup .part file - return os.Rename(tmp, dst) -} - -func (d *Downloader) Run(ctx context.Context, run time.Time) error { - runStr := run.Format("20060102") - hour := run.Hour() - g, ctx := errgroup.WithContext(ctx) - sem := make(chan struct{}, d.Parallel) - for _, step := range steps { - step := step - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - url := d.fileURL(runStr, hour, step) - dst := filepath.Join(d.Dir, fileName(run, step)) - return d.fetch(ctx, url, dst) - }) - } - return g.Wait() -} diff --git a/internal/pkg/grib/extractor.go b/internal/pkg/grib/extractor.go index 769b7cd..d27f3a4 100644 --- a/internal/pkg/grib/extractor.go +++ b/internal/pkg/grib/extractor.go @@ -4,29 +4,100 @@ import "math" func lerp(a, b, t float64) float64 { return a + t*(b-a) } -// Interpolate 16‑point (time, p, lat, lon) +// ghInterp returns interpolated geopotential height at given time/pressure/lat/lon +func (d *dataset) ghInterp(ti, pi int, y0, y1, x0, x1 int, wy, wx float64) float64 { + g00 := d.cube.val(0, ti, pi, y0, x0) + g10 := d.cube.val(0, ti, pi, y0, x1) + g01 := d.cube.val(0, ti, pi, y1, x0) + g11 := d.cube.val(0, ti, pi, y1, x1) + return (1-wy)*((1-wx)*float64(g00)+wx*float64(g10)) + wy*((1-wx)*float64(g01)+wx*float64(g11)) +} + +// searchAltLevel uses geopotential height to find pressure level bracket for target altitude. +func (d *dataset) searchAltLevel(alt float64, ti, y0, y1, x0, x1 int, wy, wx float64) (int, float64) { + levels := d.ds.Levels + nLevels := len(levels) + + lo, hi := 0, nLevels-1 + for lo < hi-1 { + mid := (lo + hi) / 2 + ghMid := d.ghInterp(ti, mid, y0, y1, x0, x1, wy, wx) + if ghMid < alt { + lo = mid + } else { + hi = mid + } + } + + ghLo := d.ghInterp(ti, lo, y0, y1, x0, x1, wy, wx) + ghHi := d.ghInterp(ti, hi, y0, y1, x0, x1, wy, wx) + + wp := 0.0 + if ghHi != ghLo { + wp = (alt - ghLo) / (ghHi - ghLo) + } + if wp < 0 { + wp = 0 + } + if wp > 1 { + wp = 1 + } + + return lo, wp +} + +// uv выполняет интерполяцию ветра по 4 измерениям (time, pressure, lat, lon). func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { if lon < 0 { lon += 360 } - iy := (lat + 90) * 2 + + inv := d.ds.InvResolution() + + // GRIB scan north→south: index 0 = 90°N + iy := (90 - lat) * inv y0 := int(math.Floor(iy)) + if y0 < 0 { + y0 = 0 + } + if y0 >= d.cube.lat-1 { + y0 = d.cube.lat - 2 + } y1 := y0 + 1 wy := iy - float64(y0) - ix := lon * 2 + + ix := lon * inv x0 := int(math.Floor(ix)) % d.cube.lon x1 := (x0 + 1) % d.cube.lon wx := ix - float64(x0) - // For hourly data (step = 1 hour) - it0 := int(math.Floor(tHours)) - wt := tHours - float64(it0) + + // Время: tHours делим на шаг, чтобы получить индекс в кубе + tIdx := tHours / float64(d.ds.TimeStep) + it0 := int(math.Floor(tIdx)) + if it0 < 0 { + it0 = 0 + } + if it0 >= d.cube.t-1 { + it0 = d.cube.t - 2 + } + wt := tIdx - float64(it0) + + // ISA: высота → давление → индекс уровня + levels := d.ds.Levels p := pressureFromAlt(alt) ip0 := 0 - for ip0+1 < len(pressureLevels) && pressureLevels[ip0+1] > p { + for ip0+1 < len(levels) && levels[ip0+1] > p { ip0++ } ip1 := ip0 + 1 - wp := (pressureLevels[ip0] - p) / (pressureLevels[ip0] - pressureLevels[ip1]) + if ip1 >= len(levels) { + ip1 = len(levels) - 1 + } + wp := 0.0 + if levels[ip0] != levels[ip1] { + wp = (levels[ip0] - p) / (levels[ip0] - levels[ip1]) + } + fetch := func(ti, pi int) (float64, float64) { u00 := d.cube.val(1, ti, pi, y0, x0) u10 := d.cube.val(1, ti, pi, y0, x1) @@ -40,6 +111,7 @@ func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11)) return uxy, vxy } + u0p0, v0p0 := fetch(it0, ip0) u0p1, v0p1 := fetch(it0, ip1) u1p0, v1p0 := fetch(it0+1, ip0) diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index 32c4466..7bf6bfc 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "math" - "net/http" "os" "path/filepath" "strings" @@ -41,15 +40,12 @@ func New(cfg *Config) (Service, error) { // Try to load existing dataset on startup if err := s.loadExistingDataset(); err != nil { // Log error but don't fail startup - dataset will be loaded on first Update() - // This allows the service to start even if no data is available yet } return s, nil } -// loadExistingDataset tries to load the most recent available dataset func (s *service) loadExistingDataset() error { - // Find the most recent cube file pattern := filepath.Join(s.cfg.Dir, "*.cube") matches, err := filepath.Glob(pattern) if err != nil { @@ -60,7 +56,6 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoCubeFilesFound } - // Sort by modification time (newest first) var latestFile string var latestTime time.Time @@ -69,7 +64,6 @@ func (s *service) loadExistingDataset() error { if err != nil { continue } - if info.ModTime().After(latestTime) { latestTime = info.ModTime() latestFile = match @@ -80,18 +74,16 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoValidCubeFilesFound } - // Check if the file is fresh enough if time.Since(latestTime) > s.cfg.TTL { return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old") } - // Load the dataset - c, err := openCube(latestFile) + dc := &s.cfg.Dataset + c, err := openCube(latestFile, dc) if err != nil { return err } - // Extract run time from filename base := filepath.Base(latestFile) runStr := strings.TrimSuffix(base, ".cube") run, err := time.Parse("20060102_15", runStr) @@ -100,94 +92,70 @@ func (s *service) loadExistingDataset() error { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) - + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) return nil } -// Update() downloads missing GRIBs, assembles cube into a single mmap‑file. func (s *service) Update(ctx context.Context) error { - // Check if we already have fresh data if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { - // Data is still fresh, no need to update return nil } } - // Check again after acquiring lock (double-checked locking pattern) if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { - // Another instance already updated the data return nil } } - run := nearestRun(time.Now().UTC().Add(-24 * time.Hour)) + dc := &s.cfg.Dataset + run := nearestRun(time.Now().UTC().Add(-6 * time.Hour)) - // Check if we already have this run cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" if _, err := os.Stat(cubePath); err == nil { - // File exists, check if it's fresh info, err := os.Stat(cubePath) if err == nil && time.Since(info.ModTime()) < s.cfg.TTL { - // File is fresh, just load it - c, err := openCube(cubePath) + c, err := openCube(cubePath, dc) if err != nil { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } } - // Download new data using S3 or HTTP - var downloadErr error - if s.cfg.UseS3 { - s3dl, err := NewS3Downloader(s.cfg.Dir, s.cfg.Parallel, s.cfg.S3Bucket, s.cfg.S3Region) - if err != nil { - return errcodes.Wrap(err, "failed to create S3 downloader") - } - downloadErr = s3dl.Run(ctx, run) - } else { - dl := Downloader{ - Dir: s.cfg.Dir, - Parallel: s.cfg.Parallel, - Client: http.DefaultClient, - DatasetURL: s.cfg.DatasetURL, - } - downloadErr = dl.Run(ctx, run) + downloadCtx, cancel := context.WithTimeout(ctx, 60*time.Minute) + defer cancel() + + dl := NewPartialDownloader(s.cfg.Dir, s.cfg.Parallel, dc) + if err := dl.Run(downloadCtx, run); err != nil { + return err } - if downloadErr != nil { - return downloadErr - } - - // Assemble cube if it doesn't exist if _, err := os.Stat(cubePath); err != nil { - if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil { + if err := assembleCube(s.cfg.Dir, run, cubePath, dc); err != nil { return err } } - c, err := openCube(cubePath) + c, err := openCube(cubePath, dc) if err != nil { return err } - ds := &dataset{cube: c, runUTC: run.Unix()} - s.data.Store(ds) + s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } -func assembleCube(dir string, run time.Time, cubePath string) error { - const sizePerVar = 97 * 47 * 361 * 720 * 4 // 97 time steps (0-96 hours), 47 pressure levels - total := int64(sizePerVar * 3) // 3 variables: gh, u, v +func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) error { + sizePerVar := dc.SizePerVar() + total := dc.CubeSize() + gridBytes := int64(dc.GridSize()) * 4 + f, err := os.Create(cubePath) if err != nil { return err @@ -203,27 +171,23 @@ func assembleCube(dir string, run time.Time, cubePath string) error { defer f.Close() pIndex := make(map[int]int) - for i, p := range pressureLevels { + for i, p := range dc.Levels { pIndex[int(math.Round(p))] = i } - for ti, step := range steps { - fn := filepath.Join(dir, fileName(run, step)) + processFile := func(fn string, ti int) error { file, err := os.Open(fn) if err != nil { return err } messages, err := griblib.ReadMessages(file) - file.Close() // Close immediately after reading + file.Close() if err != nil { return err } for _, m := range messages { - // Check if this is a wind component (u or v) or geopotential height - // ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind - // ParameterCategory 3 = mass, ParameterNumber 5 = geopotential height if m.Section4.ProductDefinitionTemplateNumber != 0 { continue } @@ -231,7 +195,6 @@ func assembleCube(dir string, run time.Time, cubePath string) error { product := m.Section4.ProductDefinitionTemplate var varIdx int - // Match tawhiri variable order: ['gh', 'u', 'v'] (indices 0, 1, 2) if product.ParameterCategory == 2 { switch product.ParameterNumber { case 2: // u-wind @@ -242,18 +205,15 @@ func assembleCube(dir string, run time.Time, cubePath string) error { continue } } else if product.ParameterCategory == 3 && product.ParameterNumber == 5 { - // geopotential height - varIdx = 0 + varIdx = 0 // geopotential height } else { continue } - // Check if this is a pressure level (type 100) if product.FirstSurface.Type != 100 { continue } - // Get pressure level in hPa pressure := float64(product.FirstSurface.Value) / 100.0 pIdx, ok := pIndex[int(math.Round(pressure))] if !ok { @@ -261,14 +221,27 @@ func assembleCube(dir string, run time.Time, cubePath string) error { } vals := m.Data() - // GRIB library returns scan north->south, west->east already in row-major order raw := make([]byte, len(vals)*4) for i, v := range vals { binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v))) } - base := int64(varIdx*sizePerVar + (ti*47+pIdx)*361*720*4) + base := int64(varIdx)*sizePerVar + (int64(ti)*int64(dc.NP)+int64(pIdx))*gridBytes copy(mm[base:base+int64(len(raw))], raw) } + return nil + } + + steps := dc.Steps() + for ti, step := range steps { + fn := filepath.Join(dir, dc.FileName(run, step)) + if err := processFile(fn, ti); err != nil { + return err + } + + fnB := filepath.Join(dir, dc.FileNameB(run, step)) + if err := processFile(fnB, ti); err != nil { + return err + } } return mm.Flush() } @@ -279,24 +252,21 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti if d == nil { return zero, errcodes.ErrNoDataset } - if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(96*time.Hour)) { + maxDur := time.Duration(s.cfg.Dataset.MaxHour) * time.Hour + if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(maxDur)) { return zero, errcodes.ErrOutOfBounds } - // Try memory cache first key := encodeKey(lat, lon, alt, ts) if v, ok := s.cache.get(key); ok { return [2]float64(v), nil } - // Calculate result td := ts.Sub(time.Unix(d.runUTC, 0)).Hours() u, v := d.uv(lat, lon, alt, td) out := [2]float64{u, v} - // Cache in memory s.cache.set(key, vec(out)) - return out, nil } diff --git a/internal/pkg/grib/partial_downloader.go b/internal/pkg/grib/partial_downloader.go new file mode 100644 index 0000000..3752504 --- /dev/null +++ b/internal/pkg/grib/partial_downloader.go @@ -0,0 +1,350 @@ +package grib + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "git.intra.yksa.space/gsn/predictor/internal/pkg/log" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// PartialDownloader загружает только необходимые поля из GRIB файлов +// используя HTTP Range requests и .idx индексные файлы +type PartialDownloader struct { + Dir string + Parallel int + Client *http.Client + Variables []string + ds *DatasetConfig +} + +// NewPartialDownloader создаёт новый partial downloader +func NewPartialDownloader(dir string, parallel int, dc *DatasetConfig) *PartialDownloader { + return &PartialDownloader{ + Dir: dir, + Parallel: parallel, + Client: &http.Client{ + Timeout: 60 * time.Second, + }, + Variables: dc.Variables, + ds: dc, + } +} + +// idxEntry представляет запись из .idx файла +type idxEntry struct { + Index int + ByteStart int64 + Date string + Variable string + Level string + Forecast string +} + +type ProgressWriter struct { + Total int64 + Downloaded int64 + OnProgress func(percent float64) +} + +func (pw *ProgressWriter) Write(p []byte) (int, error) { + n := len(p) + pw.Downloaded += int64(n) + if pw.Total > 0 && pw.OnProgress != nil { + percent := float64(pw.Downloaded) / float64(pw.Total) * 100 + pw.OnProgress(percent) + } + return n, nil +} + +// parseIdx парсит .idx файл и возвращает записи +func (d *PartialDownloader) parseIdx(body []byte) []idxEntry { + var entries []idxEntry + lines := strings.Split(string(body), "\n") + + for _, line := range lines { + if line == "" { + continue + } + parts := strings.Split(line, ":") + if len(parts) < 7 { + continue + } + + byteStart, _ := strconv.ParseInt(parts[1], 10, 64) + entries = append(entries, idxEntry{ + Index: len(entries), + ByteStart: byteStart, + Date: parts[2], + Variable: parts[3], + Level: parts[4], + Forecast: parts[5], + }) + } + return entries +} + +// filterEntries фильтрует записи по нужным переменным и уровням давления +func (d *PartialDownloader) filterEntries(entries []idxEntry) []idxEntry { + var filtered []idxEntry + + for _, e := range entries { + isNeededVar := false + for _, v := range d.Variables { + if v == e.Variable { + isNeededVar = true + break + } + } + + isPressureLevel := strings.HasSuffix(e.Level, " mb") + + if isNeededVar && isPressureLevel { + filtered = append(filtered, e) + } + } + + return filtered +} + +// Вспомогательная функция для выполнения запроса с повторами +func (d *PartialDownloader) doWithRetry(ctx context.Context, req *http.Request) (*http.Response, error) { + var resp *http.Response + var err error + + backoff := 1 * time.Second + maxRetries := 3 + + for i := 0; i < maxRetries; i++ { + resp, err = d.Client.Do(req) + if err == nil && resp.StatusCode < 500 { + return resp, nil + } + + if resp != nil { + resp.Body.Close() + } + + log.Ctx(ctx).Warn("retry download", zap.Int("attempt", i+1), zap.Error(err)) + + select { + case <-time.After(backoff): + backoff *= 2 + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return nil, err +} + +// downloadRange загружает диапазон байтов из URL +func (d *PartialDownloader) downloadRange(ctx context.Context, url string, start, end int64, out io.Writer) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + + rangeHeader := fmt.Sprintf("bytes=%d-", start) + if end > 0 { + rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end) + } + req.Header.Set("Range", rangeHeader) + + resp, err := d.Client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { + return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) + } + + _, err = io.Copy(out, resp.Body) + return err +} + +func (d *PartialDownloader) downloadFieldsFromURL(ctx context.Context, url string, dst string, step int) (err error) { + idxURL := url + ".idx" + tmp := dst + ".part" + + if info, err := os.Stat(dst); err == nil && info.Size() > 0 { + return nil + } + + reqIdx, _ := http.NewRequestWithContext(ctx, http.MethodGet, idxURL, nil) + respIdx, err := d.doWithRetry(ctx, reqIdx) + if err != nil { + return errcodes.Wrap(err, "failed to get idx") + } + defer respIdx.Body.Close() + + idxBody, _ := io.ReadAll(respIdx.Body) + entries := d.parseIdx(idxBody) + filtered := d.filterEntries(entries) + if len(filtered) == 0 { + return nil + } + + var totalBytes int64 + type chunk struct{ start, end int64 } + chunks := make([]chunk, 0, len(filtered)) + + for _, entry := range filtered { + var endByte int64 = -1 + for j, e := range entries { + if e.ByteStart == entry.ByteStart && j+1 < len(entries) { + endByte = entries[j+1].ByteStart - 1 + break + } + } + chunks = append(chunks, chunk{entry.ByteStart, endByte}) + if endByte > 0 { + totalBytes += (endByte - entry.ByteStart + 1) + } + } + + f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + + var downloaded int64 + + err = func() error { + defer f.Close() + bufWriter := bufio.NewWriterSize(f, 1024*1024) + + for i, c := range chunks { + countingWriter := &proxyWriter{ + Writer: bufWriter, + OnWrite: func(n int) { + downloaded += int64(n) + if totalBytes > 0 && i%20 == 0 { + pct := float64(downloaded) / float64(totalBytes) * 100 + log.Ctx(ctx).Debug("download progress", + zap.Int("step", step), + zap.String("pct", fmt.Sprintf("%.1f%%", pct))) + } + }, + } + + if err := d.downloadRange(ctx, url, c.start, c.end, countingWriter); err != nil { + return err + } + } + return bufWriter.Flush() + }() + + if err != nil { + f.Close() + os.Remove(tmp) + return err + } + + return d.safeRename(tmp, dst) +} + +type proxyWriter struct { + io.Writer + OnWrite func(int) +} + +func (p *proxyWriter) Write(data []byte) (int, error) { + n, err := p.Writer.Write(data) + if n > 0 && p.OnWrite != nil { + p.OnWrite(n) + } + return n, err +} + +func (d *PartialDownloader) safeRename(src, dst string) error { + var lastErr error + for i := 0; i < 5; i++ { + if err := os.Rename(src, dst); err == nil { + return nil + } else { + lastErr = err + } + time.Sleep(150 * time.Millisecond) + } + return fmt.Errorf("rename failed: %w", lastErr) +} + +// Run запускает загрузку всех необходимых файлов (pgrb2 + pgrb2b) +func (d *PartialDownloader) Run(ctx context.Context, run time.Time) error { + log.Ctx(ctx).Info("starting partial download", + zap.Time("run", run), + zap.Strings("variables", d.Variables)) + + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + steps := d.ds.Steps() + + for _, step := range steps { + step := step + + // Download primary pgrb2 + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.ds.GribURL(run, step) + dst := filepath.Join(d.Dir, d.ds.FileName(run, step)) + return d.downloadFieldsFromURL(ctx, url, dst, step) + }) + + // Download secondary pgrb2b + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.ds.GribURLB(run, step) + dst := filepath.Join(d.Dir, d.ds.FileNameB(run, step)) + return d.downloadFieldsFromURL(ctx, url, dst, step) + }) + } + + return g.Wait() +} + +// GetLatestModelRun находит последний доступный прогноз GFS +func GetLatestModelRun(ctx context.Context, dc *DatasetConfig) (time.Time, error) { + now := time.Now().UTC() + hour := now.Hour() - (now.Hour() % 6) + current := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC) + + client := &http.Client{Timeout: 10 * time.Second} + + for i := 0; i < 8; i++ { + url := dc.GribURL(current, dc.MaxHour) + + req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) + if err != nil { + current = current.Add(-6 * time.Hour) + continue + } + + resp, err := client.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + resp.Body.Close() + log.Ctx(ctx).Info("found latest model run", zap.Time("run", current)) + return current, nil + } + if resp != nil { + resp.Body.Close() + } + + current = current.Add(-6 * time.Hour) + } + + return time.Time{}, errcodes.Wrap(errcodes.ErrDownload, "no recent GFS forecast found") +} diff --git a/internal/pkg/grib/pressure.go b/internal/pkg/grib/pressure.go index add6ff0..1b6dec0 100644 --- a/internal/pkg/grib/pressure.go +++ b/internal/pkg/grib/pressure.go @@ -2,15 +2,6 @@ package grib import "math" -// 47 pressure levels matching tawhiri configuration -var pressureLevels = []float64{ - 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, - 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, - 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, - 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, - 20, 10, 7, 5, 3, 2, 1, -} - func pressureFromAlt(alt float64) float64 { // ICAO ISA return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877) } diff --git a/internal/pkg/grib/s3_downloader.go b/internal/pkg/grib/s3_downloader.go deleted file mode 100644 index 0fa4c70..0000000 --- a/internal/pkg/grib/s3_downloader.go +++ /dev/null @@ -1,265 +0,0 @@ -package grib - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - "golang.org/x/sync/errgroup" -) - -// S3Downloader downloads GRIB files from AWS S3 -type S3Downloader struct { - Dir string - Parallel int - Bucket string - Region string - Client *s3.Client -} - -// NewS3Downloader creates a new S3 downloader with anonymous access -func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) { - // Create AWS config with anonymous credentials for public bucket - cfg, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(region), - config.WithCredentialsProvider(aws.AnonymousCredentials{}), - ) - if err != nil { - return nil, errcodes.Wrap(err, "failed to load AWS config") - } - - client := s3.NewFromConfig(cfg) - - return &S3Downloader{ - Dir: dir, - Parallel: parallel, - Bucket: bucket, - Region: region, - Client: client, - }, nil -} - -// s3Key generates the S3 key for a GRIB file -// Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF -func (d *S3Downloader) s3Key(run string, hour int, step int) string { - return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step) -} - -// CheckFileExists checks if a file exists in S3 using HeadObject -func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) { - input := &s3.HeadObjectInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(key), - } - - result, err := d.Client.HeadObject(ctx, input) - if err != nil { - // Check if error is NotFound - // AWS SDK v2 doesn't export specific error types, check error string - if isNotFoundError(err) { - return false, 0, nil - } - return false, 0, errcodes.Wrap(err, "failed to check file existence") - } - - size := int64(0) - if result.ContentLength != nil { - size = *result.ContentLength - } - - return true, size, nil -} - -// isNotFoundError checks if error is a NotFound error -func isNotFoundError(err error) bool { - if err == nil { - return false - } - // AWS SDK v2 error handling - errStr := err.Error() - return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey") -} - -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false -} - -// ListAvailableFiles lists all available files for a given run -func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) { - prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour) - - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(d.Bucket), - Prefix: aws.String(prefix), - } - - var files []string - paginator := s3.NewListObjectsV2Paginator(d.Client, input) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - return nil, errcodes.Wrap(err, "failed to list S3 objects") - } - - for _, obj := range page.Contents { - if obj.Key != nil { - files = append(files, *obj.Key) - } - } - } - - return files, nil -} - -// fetchFromS3 downloads a file from S3 to local disk with retry logic -func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) { - // Check if final file already exists - if _, err := os.Stat(dst); err == nil { - return nil - } - - const maxRetries = 3 - var lastErr error - - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - // Exponential backoff: 2s, 4s, 8s - waitTime := time.Duration(1< 0 && written != size { - return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size)) - } - - // Close file before rename - if err := f.Close(); err != nil { - return err - } - fileClosed = true - - // If rename fails, err will be set and defer will cleanup .part file - return os.Rename(tmp, dst) -} - -// Run downloads all required GRIB files for a forecast run -func (d *S3Downloader) Run(ctx context.Context, run time.Time) error { - runStr := run.Format("20060102") - hour := run.Hour() - - // First, list available files to verify they exist - availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour) - if err != nil { - return errcodes.Wrap(err, "failed to list available files") - } - - if len(availableFiles) == 0 { - return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour)) - } - - // Build a map of available files for quick lookup - availableMap := make(map[string]bool) - for _, file := range availableFiles { - availableMap[file] = true - } - - g, ctx := errgroup.WithContext(ctx) - sem := make(chan struct{}, d.Parallel) - - for _, step := range steps { - step := step - key := d.s3Key(runStr, hour, step) - - // Check if file is available in S3 - if !availableMap[key] { - // Log warning but don't fail - some forecast hours might not be available yet - continue - } - - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - dst := filepath.Join(d.Dir, fileName(run, step)) - return d.fetchFromS3(ctx, key, dst) - }) - } - - return g.Wait() -} diff --git a/internal/pkg/grib/util.go b/internal/pkg/grib/util.go index 8de4af7..1145a61 100644 --- a/internal/pkg/grib/util.go +++ b/internal/pkg/grib/util.go @@ -6,25 +6,11 @@ import ( "time" ) -// Generate steps from 0 to 96 with step 1 hour (97 steps total) -// GFS provides hourly data for 0-120 hours, we use first 96 hours -var steps = func() []int { - result := make([]int, 0, 97) - for i := 0; i <= 96; i++ { - result = append(result, i) - } - return result -}() - func nearestRun(t time.Time) time.Time { h := t.UTC().Hour() - t.UTC().Hour()%6 return time.Date(t.Year(), t.Month(), t.Day(), h, 0, 0, 0, time.UTC) } -func fileName(run time.Time, step int) string { - return fmt.Sprintf("gfs.t%02dz.pgrb2.0p50.f%03d", run.Hour(), step) -} - func encodeKey(a ...any) uint64 { h := fnv.New64a() for _, v := range a { diff --git a/internal/service/predictor.go b/internal/service/predictor.go index e5c6fad..9138c5e 100644 --- a/internal/service/predictor.go +++ b/internal/service/predictor.go @@ -23,6 +23,22 @@ type Stage struct { EndTime time.Time } +// shouldSimulateStage checks if a given stage should be simulated based on the SimulateStages filter +func shouldSimulateStage(params ds.PredictionParameters, stage string) bool { + // If no filter is specified, simulate all stages + if len(params.SimulateStages) == 0 { + return true + } + + // Check if the stage is in the filter list + for _, s := range params.SimulateStages { + if s == stage { + return true + } + } + return false +} + // CustomCurve represents a custom ascent/descent curve type CustomCurve struct { Altitude []float64 `json:"altitude"` @@ -74,7 +90,7 @@ func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionPar } } - log.Ctx(ctx).Info("Starting prediction", + log.Ctx(ctx).Warn("🚀 PREDICTION STARTING", zap.String("profile", profile), zap.Float64("lat", *params.LaunchLatitude), zap.Float64("lon", *params.LaunchLongitude), @@ -103,16 +119,27 @@ func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionPar func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult + var lastResult ds.PredicitonResult // Stage 1: Ascent - ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) - results = append(results, ascentResults...) + if shouldSimulateStage(params, "ascent") { + ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) + results = append(results, ascentResults...) + if len(ascentResults) > 0 { + lastResult = ascentResults[len(ascentResults)-1] + } + } else { + // If ascent is skipped, use initial position as starting point + lastResult = ds.PredicitonResult{ + Latitude: params.LaunchLatitude, + Longitude: params.LaunchLongitude, + Altitude: &burstAltitude, + Timestamp: params.LaunchDatetime, + } + } - if len(ascentResults) > 0 { - // Get final position from ascent - lastResult := ascentResults[len(ascentResults)-1] - - // Stage 2: Descent + // Stage 2: Descent + if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -129,45 +156,36 @@ func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParam func (s *Service) floatProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, floatAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult + var lastResult ds.PredicitonResult // Stage 1: Ascent to float altitude - ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve) - results = append(results, ascentResults...) - - if len(ascentResults) > 0 { - // Stage 2: Float (simulate for some time) - lastResult := ascentResults[len(ascentResults)-1] - floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) // Float for 30 minutes - results = append(results, floatResults...) - - if len(floatResults) > 0 { - // Stage 3: Descent - finalFloat := floatResults[len(floatResults)-1] - descentParams := ds.PredictionParameters{ - LaunchLatitude: finalFloat.Latitude, - LaunchLongitude: finalFloat.Longitude, - LaunchAltitude: finalFloat.Altitude, - LaunchDatetime: finalFloat.Timestamp, - } - - descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve) - results = append(results, descentResults...) + if shouldSimulateStage(params, "ascent") { + ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve) + results = append(results, ascentResults...) + if len(ascentResults) > 0 { + lastResult = ascentResults[len(ascentResults)-1] + } + } else { + // If ascent is skipped, use initial position at float altitude as starting point + lastResult = ds.PredicitonResult{ + Latitude: params.LaunchLatitude, + Longitude: params.LaunchLongitude, + Altitude: &floatAltitude, + Timestamp: params.LaunchDatetime, } } - return results -} + // Stage 2: Float (simulate for some time) + if shouldSimulateStage(params, "float") && lastResult.Latitude != nil { + floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) // Float for 30 minutes + results = append(results, floatResults...) + if len(floatResults) > 0 { + lastResult = floatResults[len(floatResults)-1] + } + } -func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { - var results []ds.PredicitonResult - - // Stage 1: Ascent - ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) - results = append(results, ascentResults...) - - if len(ascentResults) > 0 { - // Stage 2: Descent to float altitude - lastResult := ascentResults[len(ascentResults)-1] + // Stage 3: Descent + if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -175,21 +193,67 @@ func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParame LaunchDatetime: lastResult.Timestamp, } - // Descent to float altitude (if specified) - floatAlt := 0.0 - if params.FloatAltitude != nil { - floatAlt = *params.FloatAltitude + descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve) + results = append(results, descentResults...) + } + + return results +} + +func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { + var results []ds.PredicitonResult + var lastResult ds.PredicitonResult + + // Stage 1: Ascent + if shouldSimulateStage(params, "ascent") { + ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) + results = append(results, ascentResults...) + if len(ascentResults) > 0 { + lastResult = ascentResults[len(ascentResults)-1] + } + } else { + // If ascent is skipped, use initial position at burst altitude as starting point + lastResult = ds.PredicitonResult{ + Latitude: params.LaunchLatitude, + Longitude: params.LaunchLongitude, + Altitude: &burstAltitude, + Timestamp: params.LaunchDatetime, + } + } + + // Stage 2: Descent to float altitude + floatAlt := 0.0 + if params.FloatAltitude != nil { + floatAlt = *params.FloatAltitude + } + + if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { + descentParams := ds.PredictionParameters{ + LaunchLatitude: lastResult.Latitude, + LaunchLongitude: lastResult.Longitude, + LaunchAltitude: lastResult.Altitude, + LaunchDatetime: lastResult.Timestamp, } descentResults := s.simulateDescent(ctx, descentParams, descentRate, floatAlt, descentCurve) results = append(results, descentResults...) - - if floatAlt > 0 && len(descentResults) > 0 { - // Stage 3: Float - finalDescent := descentResults[len(descentResults)-1] - floatResults := s.simulateFloat(ctx, finalDescent, 30*time.Minute) - results = append(results, floatResults...) + if len(descentResults) > 0 { + lastResult = descentResults[len(descentResults)-1] } + } else if floatAlt > 0 { + // If descent is skipped but we need to float, position at float altitude + lastResult = ds.PredicitonResult{ + Latitude: lastResult.Latitude, + Longitude: lastResult.Longitude, + Altitude: &floatAlt, + Timestamp: lastResult.Timestamp, + } + } + + // Stage 3: Float + if shouldSimulateStage(params, "float") && floatAlt > 0 && lastResult.Latitude != nil { + floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) + results = append(results, floatResults...) } return results @@ -197,14 +261,27 @@ func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParame func (s *Service) customProfile(ctx context.Context, params ds.PredictionParameters, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult + var lastResult ds.PredicitonResult - if ascentCurve != nil { + // Custom ascent + if shouldSimulateStage(params, "ascent") && ascentCurve != nil { ascentResults := s.simulateCustomAscent(ctx, params, ascentCurve) results = append(results, ascentResults...) + if len(ascentResults) > 0 { + lastResult = ascentResults[len(ascentResults)-1] + } + } else if len(results) == 0 { + // If ascent is skipped, use initial position + lastResult = ds.PredicitonResult{ + Latitude: params.LaunchLatitude, + Longitude: params.LaunchLongitude, + Altitude: params.LaunchAltitude, + Timestamp: params.LaunchDatetime, + } } - if descentCurve != nil && len(results) > 0 { - lastResult := results[len(results)-1] + // Custom descent + if shouldSimulateStage(params, "descent") && descentCurve != nil && lastResult.Latitude != nil { descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -248,6 +325,10 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame const dt = 10.0 // simulation step in seconds const outputInterval = 60.0 // output every 60 seconds + log.Ctx(ctx).Warn("⬆️ ASCENT SIMULATION STARTING", + zap.Float64("ascentRate", ascentRate), + zap.Float64("targetAlt", targetAltitude)) + lat := *params.LaunchLatitude lon := *params.LaunchLongitude alt := *params.LaunchAltitude @@ -272,12 +353,29 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame }) nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second) + firstExtraction := true windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Warn("Wind extraction failed during ascent", zap.Error(err)) + log.Ctx(ctx).Error("Wind extraction FAILED during ascent", + zap.Error(err), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Time("time", t)) return 0, 0 } + // Log only first extraction and when wind is zero + if firstExtraction || (w[0] == 0 && w[1] == 0) { + log.Ctx(ctx).Warn("Wind data check", + zap.Bool("first", firstExtraction), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Float64("u", w[0]), + zap.Float64("v", w[1])) + firstExtraction = false + } return w[0], w[1] } @@ -293,6 +391,23 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame alt = altNew if alt >= targetAltitude { + alt = targetAltitude + // Record burst point + wU, wV := windFunc(lat, lon, alt, timeCur) + latCopy := lat + lonCopy := lon + altCopy := alt + timeCopy := timeCur + windU := wU + windV := wV + results = append(results, ds.PredicitonResult{ + Latitude: &latCopy, + Longitude: &lonCopy, + Altitude: &altCopy, + Timestamp: &timeCopy, + WindU: &windU, + WindV: &windV, + }) break } @@ -350,14 +465,19 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Warn("Wind extraction failed during descent", zap.Error(err)) + log.Ctx(ctx).Error("Wind extraction FAILED during descent", + zap.Error(err), + zap.Float64("lat", lat), + zap.Float64("lon", lon), + zap.Float64("alt", alt), + zap.Time("time", t)) return 0, 0 } return w[0], w[1] } for alt > targetAltitude { - altRate := -descentRate + altRate := -descentRateAtAlt(descentRate, alt) if customCurve != nil { altRate = -s.getCustomAltitudeRate(customCurve, alt, descentRate) } @@ -368,6 +488,23 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam alt = altNew if alt <= targetAltitude { + alt = targetAltitude + // Record landing point + wU, wV := windFunc(lat, lon, alt, timeCur) + latCopy := lat + lonCopy := lon + altCopy := alt + timeCopy := timeCur + windU := wU + windV := wV + results = append(results, ds.PredicitonResult{ + Latitude: &latCopy, + Longitude: &lonCopy, + Altitude: &altCopy, + Timestamp: &timeCopy, + WindU: &windU, + WindV: &windV, + }) break } @@ -462,6 +599,37 @@ func (s *Service) simulateFloat(ctx context.Context, startResult ds.PredicitonRe return results } +// airDensity returns ISA air density in kg/m³ at given altitude in meters +func airDensity(h float64) float64 { + var T, p float64 + switch { + case h < 11000: + T = 288.15 - 0.0065*h + p = 101325 * math.Pow(T/288.15, 5.2561) + case h < 20000: + T = 216.65 + p = 22632.1 * math.Exp(-0.00015769*(h-11000)) + case h < 32000: + T = 216.65 + 0.001*(h-20000) + p = 5474.89 * math.Pow(T/216.65, -34.1632) + default: + T = 228.65 + 0.0028*(h-32000) + p = 868.019 * math.Pow(T/228.65, -12.2009) + } + return p / (287.05 * T) +} + +// descentRateAtAlt returns descent rate adjusted for air density at altitude. +// descent_rate parameter is the sea-level rate. At altitude, thinner air means faster descent. +func descentRateAtAlt(seaLevelRate, alt float64) float64 { + rho0 := airDensity(0) + rhoH := airDensity(alt) + if rhoH <= 0 { + return seaLevelRate + } + return seaLevelRate * math.Sqrt(rho0/rhoH) +} + func (s *Service) simulateCustomAscent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult { // Implementation for custom ascent curve // This would interpolate the altitude rate from the custom curve diff --git a/internal/service/predictor_test.go b/internal/service/predictor_test.go new file mode 100644 index 0000000..66ab76e --- /dev/null +++ b/internal/service/predictor_test.go @@ -0,0 +1,492 @@ +package service + +import ( + "context" + "testing" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/ds" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockGrib is a mock implementation of the Grib interface +type MockGrib struct { + mock.Mock +} + +func (m *MockGrib) Update(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func (m *MockGrib) Extract(ctx context.Context, lat, lon, alt float64, t time.Time) ([2]float64, error) { + args := m.Called(ctx, lat, lon, alt, t) + return args.Get(0).([2]float64), args.Error(1) +} + +func (m *MockGrib) Close() error { + args := m.Called() + return args.Error(0) +} + +// Helper function to create a test service with mocked GRIB +func createTestService() (*Service, *MockGrib) { + mockGrib := new(MockGrib) + + // Default mock behavior: return constant wind (5 m/s east, 3 m/s north) + mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([2]float64{5.0, 3.0}, nil) + + service := &Service{ + grib: mockGrib, + } + + return service, mockGrib +} + +// Helper function to create basic prediction parameters +func createBasicParams() ds.PredictionParameters { + lat := 40.0 + lon := -105.0 + alt := 1000.0 + launchTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + profile := "standard_profile" + ascentRate := 5.0 + burstAltitude := 10000.0 + descentRate := 5.0 + + return ds.PredictionParameters{ + LaunchLatitude: &lat, + LaunchLongitude: &lon, + LaunchAltitude: &alt, + LaunchDatetime: &launchTime, + Profile: &profile, + AscentRate: &ascentRate, + BurstAltitude: &burstAltitude, + DescentRate: &descentRate, + } +} + +func TestRestrictedPrediction_OnlyAscent(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Restrict to ascent only + params.SimulateStages = []string{"ascent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Verify all results are during ascent phase (altitude increasing) + for i := 1; i < len(results); i++ { + assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, + "Altitude should be increasing or equal during ascent") + } + + // Last altitude should be near burst altitude + lastAlt := *results[len(results)-1].Altitude + burstAlt := *params.BurstAltitude + assert.InDelta(t, burstAlt, lastAlt, 500.0, "Last altitude should be near burst altitude") +} + +func TestRestrictedPrediction_OnlyDescent(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Restrict to descent only + params.SimulateStages = []string{"descent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // First result should be at burst altitude (since ascent was skipped) + firstAlt := *results[0].Altitude + burstAlt := *params.BurstAltitude + assert.Equal(t, burstAlt, firstAlt, "Should start at burst altitude when ascent is skipped") + + // Verify all results are during descent phase (altitude decreasing) + for i := 1; i < len(results); i++ { + assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, + "Altitude should be decreasing or equal during descent") + } + + // Last altitude should be near ground + lastAlt := *results[len(results)-1].Altitude + assert.Less(t, lastAlt, 1000.0, "Last altitude should be near ground") +} + +func TestRestrictedPrediction_AscentAndDescent(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Include both ascent and descent + params.SimulateStages = []string{"ascent", "descent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Find the peak altitude (transition point) + maxAlt := 0.0 + maxIdx := 0 + for i, result := range results { + if *result.Altitude > maxAlt { + maxAlt = *result.Altitude + maxIdx = i + } + } + + // Verify ascent phase + for i := 1; i <= maxIdx; i++ { + assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, + "Altitude should increase during ascent phase") + } + + // Verify descent phase + for i := maxIdx + 1; i < len(results); i++ { + assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, + "Altitude should decrease during descent phase") + } +} + +func TestRestrictedPrediction_FloatProfile_OnlyFloat(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + profile := "float_profile" + floatAlt := 15000.0 + params.Profile = &profile + params.FloatAltitude = &floatAlt + + // Restrict to float only + params.SimulateStages = []string{"float"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // All results should be at the float altitude + for _, result := range results { + assert.Equal(t, floatAlt, *result.Altitude, + "Altitude should remain constant at float altitude") + } + + // Verify horizontal movement (lat/lon changes due to wind) + firstLat := *results[0].Latitude + lastLat := *results[len(results)-1].Latitude + assert.NotEqual(t, firstLat, lastLat, "Latitude should change during float due to wind") +} + +func TestRestrictedPrediction_FloatProfile_AllStages(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + profile := "float_profile" + floatAlt := 15000.0 + params.Profile = &profile + params.FloatAltitude = &floatAlt + + // Include all stages + params.SimulateStages = []string{"ascent", "float", "descent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Verify we have ascending, constant, and descending altitude patterns + hasAscent := false + hasFloat := false + hasDescent := false + + const altTolerance = 50.0 // Tolerance for altitude comparison + + for i := 1; i < len(results); i++ { + altDiff := *results[i].Altitude - *results[i-1].Altitude + + if altDiff > altTolerance { + hasAscent = true + } else if altDiff < -altTolerance { + hasDescent = true + } else if *results[i].Altitude > 10000 { // Float happens at high altitude + hasFloat = true + } + } + + assert.True(t, hasAscent, "Should have ascent phase") + assert.True(t, hasFloat, "Should have float phase") + assert.True(t, hasDescent, "Should have descent phase") + + // Verify maximum altitude is near float altitude + maxAlt := 0.0 + for _, result := range results { + if *result.Altitude > maxAlt { + maxAlt = *result.Altitude + } + } + assert.InDelta(t, floatAlt, maxAlt, 1000.0, "Max altitude should be near float altitude") +} + +func TestRestrictedPrediction_ReverseProfile_OnlyFloat(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + profile := "reverse_profile" + floatAlt := 5000.0 + params.Profile = &profile + params.FloatAltitude = &floatAlt + + // Restrict to float only + params.SimulateStages = []string{"float"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // All results should be at the float altitude + for _, result := range results { + assert.InDelta(t, floatAlt, *result.Altitude, 10.0, + "Altitude should remain near float altitude") + } +} + +func TestRestrictedPrediction_EmptyStages_SimulatesAll(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Empty SimulateStages should simulate all stages + params.SimulateStages = []string{} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Should have both ascent and descent + // Find the peak + maxAlt := 0.0 + hasAscent := false + hasDescent := false + + for i := 1; i < len(results); i++ { + if *results[i].Altitude > *results[i-1].Altitude { + hasAscent = true + } + if *results[i].Altitude < *results[i-1].Altitude { + hasDescent = true + } + if *results[i].Altitude > maxAlt { + maxAlt = *results[i].Altitude + } + } + + assert.True(t, hasAscent, "Should have ascent phase") + assert.True(t, hasDescent, "Should have descent phase") +} + +func TestRestrictedPrediction_NilStages_SimulatesAll(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Nil SimulateStages should simulate all stages + params.SimulateStages = nil + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Should have both ascent and descent + maxAlt := 0.0 + minAltAfterMax := 1000000.0 + + for _, result := range results { + if *result.Altitude > maxAlt { + maxAlt = *result.Altitude + } + } + + foundMax := false + for _, result := range results { + if *result.Altitude == maxAlt { + foundMax = true + } + if foundMax && *result.Altitude < minAltAfterMax { + minAltAfterMax = *result.Altitude + } + } + + // Should reach high altitude and come back down + assert.Greater(t, maxAlt, 5000.0, "Should reach high altitude") + assert.Less(t, minAltAfterMax, maxAlt, "Should descend after reaching max altitude") +} + +func TestRestrictedPrediction_InvalidStage_IgnoresInvalid(t *testing.T) { + service, _ := createTestService() + params := createBasicParams() + + // Include invalid stage name (should be ignored) + params.SimulateStages = []string{"ascent", "invalid_stage", "descent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + // Should still simulate ascent and descent, ignoring the invalid stage +} + +func TestRestrictedPrediction_WindImpact(t *testing.T) { + service, mockGrib := createTestService() + + // Override mock to return strong eastward wind + mockGrib.ExpectedCalls = nil + mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([2]float64{20.0, 0.0}, nil) // Strong eastward wind + + params := createBasicParams() + params.SimulateStages = []string{"ascent"} + + results, err := service.PerformPrediction(context.Background(), params) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + // Longitude should increase significantly due to eastward wind + firstLon := *results[0].Longitude + lastLon := *results[len(results)-1].Longitude + assert.Greater(t, lastLon, firstLon, "Longitude should increase with eastward wind") + + // Verify wind values are captured in results + for _, result := range results { + if result.WindU != nil { + // Wind values should be present in results + assert.NotNil(t, result.WindV, "WindV should be present if WindU is present") + } + } +} + +func TestRestrictedPrediction_MissingRequiredParams(t *testing.T) { + service, _ := createTestService() + + testCases := []struct { + name string + params ds.PredictionParameters + }{ + { + name: "Missing latitude", + params: ds.PredictionParameters{ + LaunchLongitude: floatPtr(-105.0), + LaunchAltitude: floatPtr(1000.0), + LaunchDatetime: timePtr(time.Now()), + }, + }, + { + name: "Missing longitude", + params: ds.PredictionParameters{ + LaunchLatitude: floatPtr(40.0), + LaunchAltitude: floatPtr(1000.0), + LaunchDatetime: timePtr(time.Now()), + }, + }, + { + name: "Missing altitude", + params: ds.PredictionParameters{ + LaunchLatitude: floatPtr(40.0), + LaunchLongitude: floatPtr(-105.0), + LaunchDatetime: timePtr(time.Now()), + }, + }, + { + name: "Missing datetime", + params: ds.PredictionParameters{ + LaunchLatitude: floatPtr(40.0), + LaunchLongitude: floatPtr(-105.0), + LaunchAltitude: floatPtr(1000.0), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.params.SimulateStages = []string{"ascent"} + results, err := service.PerformPrediction(context.Background(), tc.params) + + assert.Error(t, err) + assert.Equal(t, ErrInvalidParameters, err) + assert.Nil(t, results) + }) + } +} + +func TestShouldSimulateStage(t *testing.T) { + testCases := []struct { + name string + stages []string + queryStage string + shouldSimulate bool + }{ + { + name: "Empty filter simulates all", + stages: []string{}, + queryStage: "ascent", + shouldSimulate: true, + }, + { + name: "Nil filter simulates all", + stages: nil, + queryStage: "descent", + shouldSimulate: true, + }, + { + name: "Stage in filter", + stages: []string{"ascent", "descent"}, + queryStage: "ascent", + shouldSimulate: true, + }, + { + name: "Stage not in filter", + stages: []string{"ascent"}, + queryStage: "descent", + shouldSimulate: false, + }, + { + name: "Float stage in filter", + stages: []string{"float"}, + queryStage: "float", + shouldSimulate: true, + }, + { + name: "Multiple stages excluding one", + stages: []string{"ascent", "float"}, + queryStage: "descent", + shouldSimulate: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + params := ds.PredictionParameters{ + SimulateStages: tc.stages, + } + result := shouldSimulateStage(params, tc.queryStage) + assert.Equal(t, tc.shouldSimulate, result) + }) + } +} + +// Helper functions +func floatPtr(f float64) *float64 { + return &f +} + +func timePtr(t time.Time) *time.Time { + return &t +} diff --git a/pkg/rest/oas_cfg_gen.go b/pkg/rest/oas_cfg_gen.go index 1845c4f..6b8f72d 100644 --- a/pkg/rest/oas_cfg_gen.go +++ b/pkg/rest/oas_cfg_gen.go @@ -5,14 +5,14 @@ package gsn import ( "net/http" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/otelogen" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) var ( @@ -32,6 +32,7 @@ type otelConfig struct { Tracer trace.Tracer MeterProvider metric.MeterProvider Meter metric.Meter + Attributes []attribute.KeyValue } func (cfg *otelConfig) initOTEL() { @@ -215,6 +216,13 @@ func WithMeterProvider(provider metric.MeterProvider) Option { }) } +// WithAttributes specifies default otel attributes. +func WithAttributes(attributes ...attribute.KeyValue) Option { + return otelOptionFunc(func(cfg *otelConfig) { + cfg.Attributes = attributes + }) +} + // WithClient specifies http client to use. func WithClient(client ht.Client) ClientOption { return optionFunc[clientConfig](func(cfg *clientConfig) { diff --git a/pkg/rest/oas_client_gen.go b/pkg/rest/oas_client_gen.go index 19822e8..0484f94 100644 --- a/pkg/rest/oas_client_gen.go +++ b/pkg/rest/oas_client_gen.go @@ -9,16 +9,15 @@ import ( "time" "github.com/go-faster/errors" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" - semconv "go.opentelemetry.io/otel/semconv/v1.26.0" - "go.opentelemetry.io/otel/trace" - "github.com/ogen-go/ogen/conv" ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/otelogen" "github.com/ogen-go/ogen/uri" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/trace" ) func trimTrailingSlashes(u *url.URL) { @@ -103,8 +102,9 @@ func (c *Client) sendPerformPrediction(ctx context.Context, params PerformPredic otelAttrs := []attribute.KeyValue{ otelogen.OperationID("performPrediction"), semconv.HTTPRequestMethodKey.String("GET"), - semconv.HTTPRouteKey.String("/api/v1/prediction"), + semconv.URLTemplateKey.String("/api/v1/prediction"), } + otelAttrs = append(otelAttrs, c.cfg.Attributes...) // Run stopwatch. startTime := time.Now() @@ -345,6 +345,32 @@ func (c *Client) sendPerformPrediction(ctx context.Context, params PerformPredic return res, errors.Wrap(err, "encode query") } } + { + // Encode "simulate_stages" parameter. + cfg := uri.QueryParameterEncodingConfig{ + Name: "simulate_stages", + Style: uri.QueryStyleForm, + Explode: true, + } + + if err := q.EncodeParam(cfg, func(e uri.Encoder) error { + if params.SimulateStages != nil { + return e.EncodeArray(func(e uri.Encoder) error { + for i, item := range params.SimulateStages { + if err := func() error { + return e.EncodeValue(conv.StringToString(string(item))) + }(); err != nil { + return errors.Wrapf(err, "[%d]", i) + } + } + return nil + }) + } + return nil + }); err != nil { + return res, errors.Wrap(err, "encode query") + } + } { // Encode "interpolate" parameter. cfg := uri.QueryParameterEncodingConfig{ @@ -434,8 +460,9 @@ func (c *Client) sendReadinessCheck(ctx context.Context) (res *ReadinessResponse otelAttrs := []attribute.KeyValue{ otelogen.OperationID("readinessCheck"), semconv.HTTPRequestMethodKey.String("GET"), - semconv.HTTPRouteKey.String("/ready"), + semconv.URLTemplateKey.String("/ready"), } + otelAttrs = append(otelAttrs, c.cfg.Attributes...) // Run stopwatch. startTime := time.Now() diff --git a/pkg/rest/oas_handlers_gen.go b/pkg/rest/oas_handlers_gen.go index 76ab8ad..b7e53e4 100644 --- a/pkg/rest/oas_handlers_gen.go +++ b/pkg/rest/oas_handlers_gen.go @@ -8,16 +8,15 @@ import ( "time" "github.com/go-faster/errors" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" - semconv "go.opentelemetry.io/otel/semconv/v1.26.0" - "go.opentelemetry.io/otel/trace" - ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/otelogen" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/trace" ) type codeRecorder struct { @@ -30,6 +29,10 @@ func (c *codeRecorder) WriteHeader(status int) { c.ResponseWriter.WriteHeader(status) } +func (c *codeRecorder) Unwrap() http.ResponseWriter { + return c.ResponseWriter +} + // handlePerformPredictionRequest handles performPrediction operation. // // Perform prediction. @@ -86,7 +89,7 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool // unless there was another error (e.g., network error receiving the response body; or 3xx codes with // max redirects exceeded), in which case status MUST be set to Error. code := statusWriter.status - if code >= 100 && code < 500 { + if code < 100 || code >= 500 { span.SetStatus(codes.Error, stage) } @@ -115,6 +118,8 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool return } + var rawBody []byte + var response *PredictionResult if m := s.cfg.Middleware; m != nil { mreq := middleware.Request{ @@ -123,6 +128,7 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool OperationSummary: "Perform prediction", OperationID: "performPrediction", Body: nil, + RawBody: rawBody, Params: middleware.Parameters{ { Name: "launch_latitude", @@ -172,6 +178,10 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool Name: "descent_curve", In: "query", }: params.DescentCurve, + { + Name: "simulate_stages", + In: "query", + }: params.SimulateStages, { Name: "interpolate", In: "query", @@ -291,7 +301,7 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w // unless there was another error (e.g., network error receiving the response body; or 3xx codes with // max redirects exceeded), in which case status MUST be set to Error. code := statusWriter.status - if code >= 100 && code < 500 { + if code < 100 || code >= 500 { span.SetStatus(codes.Error, stage) } @@ -306,6 +316,8 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w err error ) + var rawBody []byte + var response *ReadinessResponse if m := s.cfg.Middleware; m != nil { mreq := middleware.Request{ @@ -314,6 +326,7 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w OperationSummary: "Readiness check", OperationID: "readinessCheck", Body: nil, + RawBody: rawBody, Params: middleware.Parameters{}, Raw: r, } diff --git a/pkg/rest/oas_json_gen.go b/pkg/rest/oas_json_gen.go index ea3d61c..9707b6f 100644 --- a/pkg/rest/oas_json_gen.go +++ b/pkg/rest/oas_json_gen.go @@ -9,7 +9,6 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" - "github.com/ogen-go/ogen/json" "github.com/ogen-go/ogen/validate" ) @@ -607,6 +606,8 @@ func (s *PredictionResultPredictionItemStage) Decode(d *jx.Decoder) error { *s = PredictionResultPredictionItemStageAscent case PredictionResultPredictionItemStageDescent: *s = PredictionResultPredictionItemStageDescent + case PredictionResultPredictionItemStageFloat: + *s = PredictionResultPredictionItemStageFloat default: *s = PredictionResultPredictionItemStage(v) } diff --git a/pkg/rest/oas_parameters_gen.go b/pkg/rest/oas_parameters_gen.go index 23cc5d8..da12715 100644 --- a/pkg/rest/oas_parameters_gen.go +++ b/pkg/rest/oas_parameters_gen.go @@ -3,11 +3,11 @@ package gsn import ( + "fmt" "net/http" "time" "github.com/go-faster/errors" - "github.com/ogen-go/ogen/conv" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" @@ -17,21 +17,22 @@ import ( // PerformPredictionParams is parameters of performPrediction operation. type PerformPredictionParams struct { - LaunchLatitude OptFloat64 - LaunchLongitude OptFloat64 - LaunchDatetime OptDateTime - LaunchAltitude OptFloat64 - Profile OptPerformPredictionProfile - AscentRate OptFloat64 - BurstAltitude OptFloat64 - DescentRate OptFloat64 - FloatAltitude OptFloat64 - StopDatetime OptDateTime - AscentCurve OptString - DescentCurve OptString - Interpolate OptBool - Format OptPerformPredictionFormat - Dataset OptDateTime + LaunchLatitude OptFloat64 `json:",omitempty,omitzero"` + LaunchLongitude OptFloat64 `json:",omitempty,omitzero"` + LaunchDatetime OptDateTime `json:",omitempty,omitzero"` + LaunchAltitude OptFloat64 `json:",omitempty,omitzero"` + Profile OptPerformPredictionProfile `json:",omitempty,omitzero"` + AscentRate OptFloat64 `json:",omitempty,omitzero"` + BurstAltitude OptFloat64 `json:",omitempty,omitzero"` + DescentRate OptFloat64 `json:",omitempty,omitzero"` + FloatAltitude OptFloat64 `json:",omitempty,omitzero"` + StopDatetime OptDateTime `json:",omitempty,omitzero"` + AscentCurve OptString `json:",omitempty,omitzero"` + DescentCurve OptString `json:",omitempty,omitzero"` + SimulateStages []PerformPredictionSimulateStagesItem `json:",omitempty"` + Interpolate OptBool `json:",omitempty,omitzero"` + Format OptPerformPredictionFormat `json:",omitempty,omitzero"` + Dataset OptDateTime `json:",omitempty,omitzero"` } func unpackPerformPredictionParams(packed middleware.Parameters) (params PerformPredictionParams) { @@ -143,6 +144,15 @@ func unpackPerformPredictionParams(packed middleware.Parameters) (params Perform params.DescentCurve = v.(OptString) } } + { + key := middleware.ParameterKey{ + Name: "simulate_stages", + In: "query", + } + if v, ok := packed[key]; ok { + params.SimulateStages = v.([]PerformPredictionSimulateStagesItem) + } + } { key := middleware.ParameterKey{ Name: "interpolate", @@ -787,6 +797,71 @@ func decodePerformPredictionParams(args [0]string, argsEscaped bool, r *http.Req Err: err, } } + // Decode query: simulate_stages. + if err := func() error { + cfg := uri.QueryParameterDecodingConfig{ + Name: "simulate_stages", + Style: uri.QueryStyleForm, + Explode: true, + } + + if err := q.HasParam(cfg); err == nil { + if err := q.DecodeParam(cfg, func(d uri.Decoder) error { + return d.DecodeArray(func(d uri.Decoder) error { + var paramsDotSimulateStagesVal PerformPredictionSimulateStagesItem + if err := func() error { + val, err := d.DecodeValue() + if err != nil { + return err + } + + c, err := conv.ToString(val) + if err != nil { + return err + } + + paramsDotSimulateStagesVal = PerformPredictionSimulateStagesItem(c) + return nil + }(); err != nil { + return err + } + params.SimulateStages = append(params.SimulateStages, paramsDotSimulateStagesVal) + return nil + }) + }); err != nil { + return err + } + if err := func() error { + var failures []validate.FieldError + for i, elem := range params.SimulateStages { + if err := func() error { + if err := elem.Validate(); err != nil { + return err + } + return nil + }(); err != nil { + failures = append(failures, validate.FieldError{ + Name: fmt.Sprintf("[%d]", i), + Error: err, + }) + } + } + if len(failures) > 0 { + return &validate.Error{Fields: failures} + } + return nil + }(); err != nil { + return err + } + } + return nil + }(); err != nil { + return params, &ogenerrors.DecodeParamError{ + Name: "simulate_stages", + In: "query", + Err: err, + } + } // Decode query: interpolate. if err := func() error { cfg := uri.QueryParameterDecodingConfig{ diff --git a/pkg/rest/oas_response_decoders_gen.go b/pkg/rest/oas_response_decoders_gen.go index 3c148fd..352068c 100644 --- a/pkg/rest/oas_response_decoders_gen.go +++ b/pkg/rest/oas_response_decoders_gen.go @@ -9,7 +9,6 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" - "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/validate" ) diff --git a/pkg/rest/oas_response_encoders_gen.go b/pkg/rest/oas_response_encoders_gen.go index 8f24cd5..a95167d 100644 --- a/pkg/rest/oas_response_encoders_gen.go +++ b/pkg/rest/oas_response_encoders_gen.go @@ -7,10 +7,9 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" + ht "github.com/ogen-go/ogen/http" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - - ht "github.com/ogen-go/ogen/http" ) func encodePerformPredictionResponse(response *PredictionResult, w http.ResponseWriter, span trace.Span) error { diff --git a/pkg/rest/oas_router_gen.go b/pkg/rest/oas_router_gen.go index 1eea998..5f7617a 100644 --- a/pkg/rest/oas_router_gen.go +++ b/pkg/rest/oas_router_gen.go @@ -109,12 +109,13 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Route is route object. type Route struct { - name string - summary string - operationID string - pathPattern string - count int - args [0]string + name string + summary string + operationID string + operationGroup string + pathPattern string + count int + args [0]string } // Name returns ogen operation name. @@ -134,6 +135,11 @@ func (r Route) OperationID() string { return r.operationID } +// OperationGroup returns the x-ogen-operation-group value. +func (r Route) OperationGroup() string { + return r.operationGroup +} + // PathPattern returns OpenAPI path. func (r Route) PathPattern() string { return r.pathPattern @@ -209,6 +215,7 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { r.name = PerformPredictionOperation r.summary = "Perform prediction" r.operationID = "performPrediction" + r.operationGroup = "" r.pathPattern = "/api/v1/prediction" r.args = args r.count = 0 @@ -233,6 +240,7 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { r.name = ReadinessCheckOperation r.summary = "Readiness check" r.operationID = "readinessCheck" + r.operationGroup = "" r.pathPattern = "/ready" r.args = args r.count = 0 diff --git a/pkg/rest/oas_schemas_gen.go b/pkg/rest/oas_schemas_gen.go index 26808cc..b484e26 100644 --- a/pkg/rest/oas_schemas_gen.go +++ b/pkg/rest/oas_schemas_gen.go @@ -430,6 +430,54 @@ func (s *PerformPredictionProfile) UnmarshalText(data []byte) error { } } +type PerformPredictionSimulateStagesItem string + +const ( + PerformPredictionSimulateStagesItemAscent PerformPredictionSimulateStagesItem = "ascent" + PerformPredictionSimulateStagesItemDescent PerformPredictionSimulateStagesItem = "descent" + PerformPredictionSimulateStagesItemFloat PerformPredictionSimulateStagesItem = "float" +) + +// AllValues returns all PerformPredictionSimulateStagesItem values. +func (PerformPredictionSimulateStagesItem) AllValues() []PerformPredictionSimulateStagesItem { + return []PerformPredictionSimulateStagesItem{ + PerformPredictionSimulateStagesItemAscent, + PerformPredictionSimulateStagesItemDescent, + PerformPredictionSimulateStagesItemFloat, + } +} + +// MarshalText implements encoding.TextMarshaler. +func (s PerformPredictionSimulateStagesItem) MarshalText() ([]byte, error) { + switch s { + case PerformPredictionSimulateStagesItemAscent: + return []byte(s), nil + case PerformPredictionSimulateStagesItemDescent: + return []byte(s), nil + case PerformPredictionSimulateStagesItemFloat: + return []byte(s), nil + default: + return nil, errors.Errorf("invalid value: %q", s) + } +} + +// UnmarshalText implements encoding.TextUnmarshaler. +func (s *PerformPredictionSimulateStagesItem) UnmarshalText(data []byte) error { + switch PerformPredictionSimulateStagesItem(data) { + case PerformPredictionSimulateStagesItemAscent: + *s = PerformPredictionSimulateStagesItemAscent + return nil + case PerformPredictionSimulateStagesItemDescent: + *s = PerformPredictionSimulateStagesItemDescent + return nil + case PerformPredictionSimulateStagesItemFloat: + *s = PerformPredictionSimulateStagesItemFloat + return nil + default: + return errors.Errorf("invalid value: %q", data) + } +} + // Ref: #/components/schemas/PredictionResult type PredictionResult struct { Metadata PredictionResultMetadata `json:"metadata"` @@ -511,6 +559,7 @@ type PredictionResultPredictionItemStage string const ( PredictionResultPredictionItemStageAscent PredictionResultPredictionItemStage = "ascent" PredictionResultPredictionItemStageDescent PredictionResultPredictionItemStage = "descent" + PredictionResultPredictionItemStageFloat PredictionResultPredictionItemStage = "float" ) // AllValues returns all PredictionResultPredictionItemStage values. @@ -518,6 +567,7 @@ func (PredictionResultPredictionItemStage) AllValues() []PredictionResultPredict return []PredictionResultPredictionItemStage{ PredictionResultPredictionItemStageAscent, PredictionResultPredictionItemStageDescent, + PredictionResultPredictionItemStageFloat, } } @@ -528,6 +578,8 @@ func (s PredictionResultPredictionItemStage) MarshalText() ([]byte, error) { return []byte(s), nil case PredictionResultPredictionItemStageDescent: return []byte(s), nil + case PredictionResultPredictionItemStageFloat: + return []byte(s), nil default: return nil, errors.Errorf("invalid value: %q", s) } @@ -542,6 +594,9 @@ func (s *PredictionResultPredictionItemStage) UnmarshalText(data []byte) error { case PredictionResultPredictionItemStageDescent: *s = PredictionResultPredictionItemStageDescent return nil + case PredictionResultPredictionItemStageFloat: + *s = PredictionResultPredictionItemStageFloat + return nil default: return errors.Errorf("invalid value: %q", data) } diff --git a/pkg/rest/oas_validators_gen.go b/pkg/rest/oas_validators_gen.go index 0cd02b6..1207d8f 100644 --- a/pkg/rest/oas_validators_gen.go +++ b/pkg/rest/oas_validators_gen.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/go-faster/errors" - "github.com/ogen-go/ogen/validate" ) @@ -34,6 +33,19 @@ func (s PerformPredictionProfile) Validate() error { } } +func (s PerformPredictionSimulateStagesItem) Validate() error { + switch s { + case "ascent": + return nil + case "descent": + return nil + case "float": + return nil + default: + return errors.Errorf("invalid value: %v", s) + } +} + func (s *PredictionResult) Validate() error { if s == nil { return validate.ErrNilPointer @@ -131,6 +143,8 @@ func (s PredictionResultPredictionItemStage) Validate() error { return nil case "descent": return nil + case "float": + return nil default: return errors.Errorf("invalid value: %v", s) } diff --git a/scripts/compare.go b/scripts/compare.go new file mode 100644 index 0000000..5e50653 --- /dev/null +++ b/scripts/compare.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/json" + "fmt" + "math" + "os" +) + +type Point struct { + Datetime string `json:"datetime"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` +} + +type Stage struct { + Stage string `json:"stage"` + Trajectory []Point `json:"trajectory"` +} + +type Prediction struct { + Prediction []Stage `json:"prediction"` +} + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + R := 6371000.0 + phi1, phi2 := lat1*math.Pi/180, lat2*math.Pi/180 + dphi := (lat2 - lat1) * math.Pi / 180 + dlam := (lon2 - lon1) * math.Pi / 180 + a := math.Sin(dphi/2)*math.Sin(dphi/2) + math.Cos(phi1)*math.Cos(phi2)*math.Sin(dlam/2)*math.Sin(dlam/2) + return R * 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) +} + +func load(path string) Prediction { + data, _ := os.ReadFile(path) + var p Prediction + json.Unmarshal(data, &p) + return p +} + +func main() { + our := load("c:/tmp/our.json") + taw := load("c:/tmp/tawhiri.json") + + // Find burst and landing points + var ourBurst, ourLand, tawBurst, tawLand Point + for _, s := range our.Prediction { + t := s.Trajectory + if s.Stage == "ascent" { + ourBurst = t[len(t)-1] + } + if s.Stage == "descent" { + ourLand = t[len(t)-1] + } + } + for _, s := range taw.Prediction { + t := s.Trajectory + if s.Stage == "ascent" { + tawBurst = t[len(t)-1] + } + if s.Stage == "descent" { + tawLand = t[len(t)-1] + } + } + + fmt.Println("=== Burst Point ===") + fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourBurst.Latitude, ourBurst.Longitude, ourBurst.Altitude, ourBurst.Datetime) + fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawBurst.Latitude, tawBurst.Longitude, tawBurst.Altitude, tawBurst.Datetime) + burstDist := haversine(ourBurst.Latitude, ourBurst.Longitude, tawBurst.Latitude, tawBurst.Longitude) + fmt.Printf(" Distance: %.2f km\n", burstDist/1000) + + fmt.Println() + fmt.Println("=== Landing Point ===") + fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourLand.Latitude, ourLand.Longitude, ourLand.Altitude, ourLand.Datetime) + fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawLand.Latitude, tawLand.Longitude, tawLand.Altitude, tawLand.Datetime) + landDist := haversine(ourLand.Latitude, ourLand.Longitude, tawLand.Latitude, tawLand.Longitude) + fmt.Printf(" Distance: %.2f km\n", landDist/1000) + + fmt.Println() + fmt.Println("=== Trajectory Comparison (every 10 min) ===") + ourPts := map[string]Point{} + tawPts := map[string]Point{} + for _, s := range our.Prediction { + for _, p := range s.Trajectory { + ourPts[p.Datetime] = p + } + } + for _, s := range taw.Prediction { + for _, p := range s.Trajectory { + tawPts[p.Datetime] = p + } + } + + // Collect common times + var common []string + for _, s := range our.Prediction { + for _, p := range s.Trajectory { + if _, ok := tawPts[p.Datetime]; ok { + common = append(common, p.Datetime) + } + } + } + + for i, t := range common { + if i%10 == 0 { + o := ourPts[t] + tw := tawPts[t] + d := haversine(o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) + fmt.Printf(" %s: dist=%.2f km (our: %.3f,%.3f vs taw: %.3f,%.3f)\n", + t, d/1000, o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) + } + } +} diff --git a/scripts/rebuild_cube.go b/scripts/rebuild_cube.go new file mode 100644 index 0000000..240b707 --- /dev/null +++ b/scripts/rebuild_cube.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + // Delete old cube to force rebuild + cubePath := "C:/tmp/grib/20260212_12.cube" + if err := os.Remove(cubePath); err != nil && !os.IsNotExist(err) { + fmt.Printf("Remove cube error: %v\n", err) + } else { + fmt.Println("Old cube removed") + } + + // Update will download missing pgrb2b files and rebuild cube + fmt.Println("Starting update (download pgrb2b + rebuild cube)...") + start := time.Now() + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + fmt.Printf("Done in %v\n", time.Since(start)) +} diff --git a/scripts/test_download.go b/scripts/test_download.go new file mode 100644 index 0000000..73934a5 --- /dev/null +++ b/scripts/test_download.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Найти последний доступный прогноз + run, err := grib.GetLatestModelRun(ctx) + if err != nil { + fmt.Printf("Error finding model run: %v\n", err) + return + } + fmt.Printf("Found model run: %v\n", run) + + // Создать downloader + dl := grib.NewPartialDownloader("C:/tmp/grib", 8) + + // Запустить загрузку + start := time.Now() + fmt.Println("Starting download...") + + err = dl.Run(ctx, run) + if err != nil { + fmt.Printf("Download error: %v\n", err) + return + } + + fmt.Printf("Download completed in %v\n", time.Since(start)) +} diff --git a/scripts/test_gh.go b/scripts/test_gh.go new file mode 100644 index 0000000..bfb592c --- /dev/null +++ b/scripts/test_gh.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "os" + + mmap "github.com/edsrzf/mmap-go" +) + +var pressureLevels = []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, +} + +func main() { + f, _ := os.Open("C:/tmp/grib/20260212_12.cube") + mm, _ := mmap.Map(f, mmap.RDONLY, 0) + defer mm.Unmap() + defer f.Close() + + const ( + nT = 97 + nP = 47 + nLat = 721 + nLon = 1440 + ) + bytesPerVar := int64(nT * nP * nLat * nLon * 4) + + val := func(varIdx, ti, pi, y, x int) float32 { + idx := (((ti*nP + pi) * nLat) + y) * nLon + x + off := int64(varIdx)*bytesPerVar + int64(idx)*4 + bits := binary.LittleEndian.Uint32(mm[off : off+4]) + return math.Float32frombits(bits) + } + + // Check gh values at lat=52.2N (y=(90-52.2)*4=151.2 → y=151), lon=0.1E (x=0.1*4=0.4 → x=0) + // Time step 9 (9 hours into forecast) + ti := 9 + y := 151 + x := 0 + + fmt.Println("GH values at (52.25N, 0E), t=+9h:") + fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "GH(m)") + for pi := 0; pi < nP; pi++ { + gh := val(0, ti, pi, y, x) + fmt.Printf("%8d %8.0f %10.1f\n", pi, pressureLevels[pi], gh) + } + + fmt.Println("\nU-wind values at same point:") + fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "U(m/s)") + for pi := 0; pi < nP; pi++ { + u := val(1, ti, pi, y, x) + fmt.Printf("%8d %8.0f %10.2f\n", pi, pressureLevels[pi], u) + } +} diff --git a/scripts/test_grib_read.go b/scripts/test_grib_read.go new file mode 100644 index 0000000..3b8b7eb --- /dev/null +++ b/scripts/test_grib_read.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "os" + + "github.com/nilsmagnus/grib/griblib" +) + +func main() { + f, err := os.Open("C:/tmp/grib/gfs.t18z.pgrb2.0p25.f000") + if err != nil { + fmt.Printf("Error opening file: %v\n", err) + return + } + defer f.Close() + + messages, err := griblib.ReadMessages(f) + if err != nil { + fmt.Printf("Error reading GRIB: %v\n", err) + return + } + + fmt.Printf("Found %d messages\n\n", len(messages)) + + for i, m := range messages { + product := m.Section4.ProductDefinitionTemplate + if product.ParameterCategory != 2 || product.ParameterNumber != 2 { + continue // only u-wind + } + fmt.Printf("UGRD Msg %d: SurfType=%d SurfValue=%d SurfScale=%d DataLen=%d\n", + i, + product.FirstSurface.Type, + product.FirstSurface.Value, + product.FirstSurface.Scale, + len(m.Data())) + } +} diff --git a/scripts/test_prediction.go b/scripts/test_prediction.go new file mode 100644 index 0000000..fb56c5a --- /dev/null +++ b/scripts/test_prediction.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Инициализируем GRIB сервис + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error creating service: %v\n", err) + return + } + + // Обновляем данные (создаёт куб) + fmt.Println("Updating GRIB data (building cube)...") + start := time.Now() + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + fmt.Printf("Cube built in %v\n", time.Since(start)) + + // Тестируем извлечение ветра + fmt.Println("\nTesting wind extraction...") + lat, lon, alt := 52.2, 0.1, 10000.0 + ts := time.Date(2026, 2, 11, 12, 0, 0, 0, time.UTC) + + wind, err := svc.Extract(ctx, lat, lon, alt, ts) + if err != nil { + fmt.Printf("Extract error: %v\n", err) + return + } + fmt.Printf("Wind at (%.2f, %.2f, %.0fm) at %v:\n", lat, lon, alt, ts) + fmt.Printf(" U (east): %.2f m/s\n", wind[0]) + fmt.Printf(" V (north): %.2f m/s\n", wind[1]) + + // Сравниваем с Tawhiri + fmt.Println("\nComparing with Tawhiri API...") + tawhiriURL := fmt.Sprintf( + "https://api.v2.sondehub.org/tawhiri?launch_latitude=%.2f&launch_longitude=%.2f&launch_altitude=0&launch_datetime=%s&ascent_rate=5&burst_altitude=30000&descent_rate=5", + lat, lon, ts.Format(time.RFC3339), + ) + + resp, err := http.Get(tawhiriURL) + if err != nil { + fmt.Printf("Tawhiri request error: %v\n", err) + return + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + var tawhiriResp map[string]interface{} + json.Unmarshal(body, &tawhiriResp) + + // Выводим финальную точку приземления + if prediction, ok := tawhiriResp["prediction"].([]interface{}); ok { + for _, stage := range prediction { + stageMap := stage.(map[string]interface{}) + if stageMap["stage"] == "descent" { + trajectory := stageMap["trajectory"].([]interface{}) + if len(trajectory) > 0 { + last := trajectory[len(trajectory)-1].(map[string]interface{}) + fmt.Printf("\nTawhiri landing point:\n") + fmt.Printf(" Lat: %.4f\n", last["latitude"]) + fmt.Printf(" Lon: %.4f\n", last["longitude"]) + } + } + } + } +} diff --git a/scripts/test_s3_download.go b/scripts/test_s3_download.go deleted file mode 100644 index 0391cd6..0000000 --- a/scripts/test_s3_download.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - // Create S3 downloader - downloader, err := grib.NewS3Downloader( - "/tmp/grib_test", - 4, // parallel downloads - "noaa-gfs-bdp-pds", - "us-east-1", - ) - if err != nil { - log.Fatalf("Failed to create S3 downloader: %v", err) - } - - // Ensure directory exists - if err := os.MkdirAll("/tmp/grib_test", 0o755); err != nil { - log.Fatalf("Failed to create directory: %v", err) - } - - // Find nearest run (6-hour intervals: 00, 06, 12, 18 UTC) - now := time.Now().UTC() - hour := now.Hour() - (now.Hour() % 6) - // Use data from 6 hours ago to ensure it's available - run := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC).Add(-6 * time.Hour) - - fmt.Printf("Testing S3 download for run: %s\n", run.Format("2006-01-02 15:04 MST")) - - // List available files first - runStr := run.Format("20060102") - fmt.Printf("Listing available files for %s/%02d...\n", runStr, run.Hour()) - files, err := downloader.ListAvailableFiles(ctx, runStr, run.Hour()) - if err != nil { - log.Fatalf("Failed to list files: %v", err) - } - - fmt.Printf("Found %d files in S3:\n", len(files)) - if len(files) > 0 { - // Show first 5 files - for i, file := range files { - if i >= 5 { - fmt.Printf("... and %d more files\n", len(files)-5) - break - } - fmt.Printf(" - %s\n", file) - } - } - - // Try downloading just first 3 forecast hours (f000, f001, f002) - fmt.Println("\nTesting download of first 3 forecast hours...") - testRun := run - - // Create a timeout context for the download - downloadCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - if err := downloader.Run(downloadCtx, testRun); err != nil { - log.Fatalf("Failed to download: %v", err) - } - - fmt.Println("\nDownload completed successfully!") - - // Check downloaded files - entries, err := os.ReadDir("/tmp/grib_test") - if err != nil { - log.Fatalf("Failed to read directory: %v", err) - } - - fmt.Printf("\nDownloaded %d files:\n", len(entries)) - for i, entry := range entries { - if i >= 10 { - fmt.Printf("... and %d more files\n", len(entries)-10) - break - } - info, _ := entry.Info() - fmt.Printf(" - %s (%.2f MB)\n", entry.Name(), float64(info.Size())/1024/1024) - } -} diff --git a/scripts/test_s3_simple.go b/scripts/test_s3_simple.go deleted file mode 100644 index 2ae8414..0000000 --- a/scripts/test_s3_simple.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "log" - "os" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -func main() { - ctx := context.Background() - - // Create AWS config with anonymous credentials - cfg, err := config.LoadDefaultConfig(ctx, - config.WithRegion("us-east-1"), - config.WithCredentialsProvider(aws.AnonymousCredentials{}), - ) - if err != nil { - log.Fatalf("Failed to load config: %v", err) - } - - client := s3.NewFromConfig(cfg) - - // Try to download a single file - bucket := "noaa-gfs-bdp-pds" - key := "gfs.20251020/00/atmos/gfs.t00z.pgrb2.0p50.f000" - - fmt.Printf("Downloading: s3://%s/%s\n", bucket, key) - - input := &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - } - - result, err := client.GetObject(ctx, input) - if err != nil { - log.Fatalf("Failed to get object: %v", err) - } - defer result.Body.Close() - - // Create output file - outFile := "/tmp/test_grib.part" - f, err := os.Create(outFile) - if err != nil { - log.Fatalf("Failed to create file: %v", err) - } - defer f.Close() - - // Copy data - written, err := io.Copy(f, result.Body) - if err != nil { - log.Fatalf("Failed to copy data: %v (wrote %d bytes)", err, written) - } - - fmt.Printf("Successfully downloaded %d bytes\n", written) - - // Rename - if err := os.Rename(outFile, "/tmp/test_grib"); err != nil { - log.Fatalf("Failed to rename: %v", err) - } - - fmt.Println("Download complete!") -} diff --git a/scripts/test_wind.go b/scripts/test_wind.go new file mode 100644 index 0000000..61a231a --- /dev/null +++ b/scripts/test_wind.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + cfg := &grib.Config{ + Dir: "C:/tmp/grib", + TTL: 48 * time.Hour, + CacheTTL: 1 * time.Hour, + Parallel: 8, + } + + svc, err := grib.New(cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + if err := svc.Update(ctx); err != nil { + fmt.Printf("Update error: %v\n", err) + return + } + + // Test wind at lat=52.2, lon=0.1 at various altitudes + // Run is 2026-02-12T12:00Z, request time 21:00Z = +9 hours + ts := time.Date(2026, 2, 12, 21, 0, 0, 0, time.UTC) + lat, lon := 52.2, 0.1 + + fmt.Println("Wind at (52.2°N, 0.1°E) at 2026-02-12T21:00Z:") + fmt.Printf("%8s %8s %8s\n", "Alt(m)", "U(m/s)", "V(m/s)") + + for _, alt := range []float64{0, 1000, 3000, 5000, 7000, 10000, 15000, 20000, 25000, 30000} { + w, err := svc.Extract(ctx, lat, lon, alt, ts) + if err != nil { + fmt.Printf("%8.0f Error: %v\n", alt, err) + continue + } + fmt.Printf("%8.0f %8.2f %8.2f\n", alt, w[0], w[1]) + } + + // Also test at a few nearby points to check spatial consistency + fmt.Println("\nWind at 10km altitude, varying longitude:") + for _, testLon := range []float64{0.0, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 350.0, 359.75} { + w, _ := svc.Extract(ctx, lat, testLon, 10000, ts) + fmt.Printf(" lon=%6.2f: U=%8.2f V=%8.2f\n", testLon, w[0], w[1]) + } +} diff --git a/start_with_http.sh b/start_with_http.sh new file mode 100644 index 0000000..4173c2b --- /dev/null +++ b/start_with_http.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# Start API with HTTP downloads instead of S3 + +export GSN_PREDICTOR_GRIB_USE_S3=false + +echo "Starting API with HTTP downloads from NOMADS..." +echo "USE_S3 = $GSN_PREDICTOR_GRIB_USE_S3" +echo "" + +cd "$(dirname "$0")" +go run ./cmd/api/main.go