diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index af05740..0000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(cat:*)", - "Bash(xargs:*)", - "Bash(ls:*)", - "Bash(done)", - "Bash(curl:*)", - "WebFetch(domain:raw.githubusercontent.com)", - "WebFetch(domain:github.com)", - "Bash(go run:*)", - "Bash(pkill:*)" - ], - "deny": [], - "ask": [] - } -} diff --git a/.dockerignore b/.dockerignore index 22e12b6..287fc68 100644 --- a/.dockerignore +++ b/.dockerignore @@ -47,7 +47,7 @@ predictor # Temporary files /tmp/ /temp/ -*.py + # Test coverage *.out diff --git a/.gitignore b/.gitignore index 9ffe4dd..8519b69 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ *.dll *.so *.dylib -*.ps1 # Test binary, built with `go test -c` *.test @@ -29,9 +28,6 @@ go.work *.swp *.swo *~ -*.bak -*.py -*.json # OS generated files .DS_Store @@ -63,6 +59,4 @@ Thumbs.db # Tawhiri /tawhiri -/tawhiri/* - -*.md \ No newline at end of file +/tawhiri/* \ No newline at end of file diff --git a/Makefile b/Makefile index 42d11c5..c6c35d9 100644 --- a/Makefile +++ b/Makefile @@ -105,9 +105,7 @@ help: @echo " run-local - Run locally" @echo " fmt - Format code" @echo " lint - Lint code" - @echo " generate-ogen - Generate OpenAPI code from swagger spec" @echo " help - Show this help" -.PHONY: generate-ogen generate-ogen: go run github.com/ogen-go/ogen/cmd/ogen@latest --target pkg/rest -package gsn --clean api/rest/predictor.swagger.yml \ No newline at end of file diff --git a/api/rest/predictor.swagger.yml b/api/rest/predictor.swagger.yml index fc408f9..73e3381 100644 --- a/api/rest/predictor.swagger.yml +++ b/api/rest/predictor.swagger.yml @@ -61,13 +61,6 @@ paths: name: descent_curve schema: type: string - - in: query - name: simulate_stages - schema: - type: array - items: - type: string - enum: [ascent, descent, float] - in: query name: interpolate schema: @@ -154,7 +147,7 @@ components: properties: stage: type: string - enum: ["ascent", "descent", "float"] + enum: ["ascent", "descent"] trajectory: type: array items: diff --git a/assemble_cube.go b/assemble_cube.go deleted file mode 100644 index c624e92..0000000 --- a/assemble_cube.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "fmt" - "os" - "time" -) - -// This will be a simple wrapper that calls the internal assembleCube function -// We'll compile it as part of the grib package - -func main() { - dir := "C:/tmp/grib" - run := time.Date(2025, 12, 6, 0, 0, 0, 0, time.UTC) - cubePath := fmt.Sprintf("%s/%s.cube", dir, run.Format("20060102_15")) - - fmt.Printf("Assembling cube from existing GRIB files...\n") - fmt.Printf("Directory: %s\n", dir) - fmt.Printf("Run: %s\n", run.Format("2006-01-02 15:04 MST")) - fmt.Printf("Output: %s\n", cubePath) - fmt.Println() - - // Just print instructions - we'll do it directly - fmt.Println("Run this Go code to assemble:") - fmt.Printf("cd internal/pkg/grib && go test -run TestAssemble\n") -} diff --git a/cmd/api/main.go b/cmd/api/main.go index f250199..2e9bf6b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -24,7 +24,6 @@ func main() { panic(err) } defer lg.Sync() - zap.ReplaceGlobals(lg) ctx := log.ToCtx(context.Background(), lg) schedulerConfig, err := scheduler.NewConfig() diff --git a/go.mod b/go.mod index b186e37..079e12c 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/nilsmagnus/grib v1.2.8 github.com/ogen-go/ogen v1.16.0 github.com/rs/cors v1.11.1 - github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 @@ -20,7 +19,24 @@ require ( ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2/config v1.31.13 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.17 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 // indirect + github.com/aws/smithy-go v1.23.1 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/fatih/color v1.18.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -30,11 +46,9 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -43,5 +57,4 @@ require ( golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7c95f5c..eaa1790 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,39 @@ +github.com/aws/aws-sdk-go-v2 v1.39.3 h1:h7xSsanJ4EQJXG5iuW4UqgP7qBopLpj84mpkNx3wPjM= +github.com/aws/aws-sdk-go-v2 v1.39.3/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2/config v1.31.13 h1:wcqQB3B0PgRPUF5ZE/QL1JVOyB0mbPevHFoAMpemR9k= +github.com/aws/aws-sdk-go-v2/config v1.31.13/go.mod h1:ySB5D5ybwqGbT6c3GszZ+u+3KvrlYCUQNo62+hkKOFk= +github.com/aws/aws-sdk-go-v2/credentials v1.18.17 h1:skpEwzN/+H8cdrrtT8y+rvWJGiWWv0DeNAe+4VTf+Vs= +github.com/aws/aws-sdk-go-v2/credentials v1.18.17/go.mod h1:Ed+nXsaYa5uBINovJhcAWkALvXw2ZLk36opcuiSZfJM= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10 h1:UuGVOX48oP4vgQ36oiKmW9RuSeT8jlgQgBFQD+HUiHY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.10/go.mod h1:vM/Ini41PzvudT4YkQyE/+WiQJiQ6jzeDyU8pQKwCac= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10 h1:mj/bdWleWEh81DtpdHKkw41IrS+r3uw1J/VQtbwYYp8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.10/go.mod h1:7+oEMxAZWP8gZCyjcm9VicI0M61Sx4DJtcGfKYv2yKQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10 h1:wh+/mn57yhUrFtLIxyFPh2RgxgQz/u+Yrf7hiHGHqKY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.10/go.mod h1:7zirD+ryp5gitJJ2m1BBux56ai8RIRDykXZrJSp540w= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10 h1:FHw90xCTsofzk6vjU808TSuDtDfOOKPNdz5Weyc3tUI= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.10/go.mod h1:n8jdIE/8F3UYkg8O4IGkQpn2qUmapg/1K1yl29/uf/c= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1 h1:ne+eepnDB2Wh5lHKzELgEncIqeVlQ1rSF9fEa4r5I+A= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.1/go.mod h1:u0Jkg0L+dcG1ozUq21uFElmpbmjBnhHR5DELHIme4wg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10 h1:DRND0dkCKtJzCj4Xl4OpVbXZgfttY5q712H9Zj7qc/0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.10/go.mod h1:tGGNmJKOTernmR2+VJ0fCzQRurcPZj9ut60Zu5Fi6us= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10 h1:DA+Hl5adieRyFvE7pCvBWm3VOZTRexGVkXw33SUqNoY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.10/go.mod h1:L+A89dH3/gr8L4ecrdzuXUYd1znoko6myzndVGZx/DA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5 h1:FlGScxzCGNzT+2AvHT1ZGMvxTwAMa6gsooFb1pO/AiM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.88.5/go.mod h1:N/iojY+8bW3MYol9NUMuKimpSbPEur75cuI1SmtonFM= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.7 h1:fspVFg6qMx0svs40YgRmE7LZXh9VRZvTT35PfdQR6FM= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.7/go.mod h1:BQTKL3uMECaLaUV3Zc2L4Qybv8C6BIXjuu1dOPyxTQs= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2 h1:scVnW+NLXasGOhy7HhkdT9AGb6kjgW7fJ5xYkUaqHs0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.2/go.mod h1:FRNCY3zTEWZXBKm2h5UBUPvCVDOecTad9KhynDyGBc0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.7 h1:VEO5dqFkMsl8QZ2yHsFDJAIZLAkEbaYDB+xdKi0Feic= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.7/go.mod h1:L1xxV3zAdB+qVrVW/pBIrIAnHFWHo6FBbFe4xOGsG/o= +github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= +github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -21,6 +57,8 @@ github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb github.com/go-faster/yaml v0.4.6 h1:lOK/EhI04gCpPgPhgt0bChS6bvw7G3WwI8xxVe0sw9I= github.com/go-faster/yaml v0.4.6/go.mod h1:390dRIvV4zbnO7qC9FGo6YYutc+wyyUSHBgbXL52eXk= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -39,12 +77,17 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nilsmagnus/grib v1.2.8 h1:H7ch/1/agaCqM3MC8hW1Ft+EJ+q2XB757uml/IfPvp4= github.com/nilsmagnus/grib v1.2.8/go.mod h1:XHm+5zuoOk0NSIWaGmA3JaAxI4i50YvD1L1vz+aqPOQ= +github.com/ogen-go/ogen v1.14.0 h1:TU1Nj4z9UBsAfTkf+IhuNNp7igdFQKqkk9+6/y4XuWg= +github.com/ogen-go/ogen v1.14.0/go.mod h1:Iw1vkqkx6SU7I9th5ceP+fVPJ6Wge4e3kAVzAxJEpPE= github.com/ogen-go/ogen v1.16.0 h1:fKHEYokW/QrMzVNXId74/6RObRIUs9T2oroGKtR25Iw= github.com/ogen-go/ogen v1.16.0/go.mod h1:s3nWiMzybSf8fhxckyO+wtto92+QHpEL8FmkPnhL3jI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -54,10 +97,13 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= -github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= @@ -65,22 +111,30 @@ github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+D github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= -github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -90,15 +144,26 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 h1:Di6/M8l0O2lCLc6VVRWhgCiApHV8MnQurBnFSHsQtNY= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/exp v0.0.0-20251017212417-90e834f514db h1:by6IehL4BH5k3e3SJmcoNbOobMey2SLpAF79iPOEBvw= golang.org/x/exp v0.0.0-20251017212417-90e834f514db/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/pkg/ds/predictor.go b/internal/pkg/ds/predictor.go index c7c33c8..753b0b2 100644 --- a/internal/pkg/ds/predictor.go +++ b/internal/pkg/ds/predictor.go @@ -19,7 +19,6 @@ type PredictionParameters struct { StopDatetime *time.Time AscentCurve *string // base64 DescentCurve *string // base64 - SimulateStages []string Interpolate *bool Format *string Dataset *time.Time @@ -86,11 +85,5 @@ func ConvertFlatPredictionParams(params api.PerformPredictionParams) *Prediction if v, ok := params.Dataset.Get(); ok { out.Dataset = &v } - if len(params.SimulateStages) > 0 { - out.SimulateStages = make([]string, len(params.SimulateStages)) - for i, stage := range params.SimulateStages { - out.SimulateStages[i] = string(stage) - } - } return out } diff --git a/internal/pkg/grib/assemble_test.go b/internal/pkg/grib/assemble_test.go deleted file mode 100644 index 8d9d7ce..0000000 --- a/internal/pkg/grib/assemble_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package grib - -import ( - "testing" - "time" -) - -func TestAssembleCubeFromExisting(t *testing.T) { - dir := "C:/tmp/grib" - run := time.Date(2026, 1, 16, 6, 0, 0, 0, time.UTC) - cubePath := dir + "/" + run.Format("20060102_15") + ".cube" - - t.Logf("Assembling cube from existing GRIB files...") - t.Logf("Directory: %s", dir) - t.Logf("Run: %s", run.Format("2006-01-02 15:04 MST")) - t.Logf("Output: %s", cubePath) - - err := assembleCube(dir, run, cubePath) - if err != nil { - t.Fatalf("Failed to assemble cube: %v", err) - } - - t.Logf("✓ Cube assembled successfully!") - t.Logf("Cube file: %s", cubePath) -} diff --git a/internal/pkg/grib/config.go b/internal/pkg/grib/config.go index c80462e..009645d 100644 --- a/internal/pkg/grib/config.go +++ b/internal/pkg/grib/config.go @@ -1,130 +1,23 @@ package grib import ( - "fmt" "time" "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" env "github.com/caarlos0/env/v11" ) -// DatasetConfig описывает параметры GFS-датасета: сетку, временные шаги, -// уровни давления и URL для загрузки. -type DatasetConfig struct { - // Сетка - Resolution float64 // шаг сетки в градусах (0.25 или 0.5) - NLat int // точек по широте (721 для 0.25°, 361 для 0.5°) - NLon int // точек по долготе (1440 для 0.25°, 720 для 0.5°) - - // Время - NT int // кол-во временных шагов (97 для 0–96 ч с шагом 1) - MaxHour int // последний час прогноза (96) - TimeStep int // интервал между шагами, часы (1 или 3) - - // Вертикаль - NP int // кол-во уровней давления - Levels []float64 // уровни давления в гПа, по убыванию (1000 … 1) - - // Переменные в кубе (порядок важен: индексы 0, 1, 2, …) - NVar int // кол-во переменных - Variables []string // GRIB-имена для фильтрации idx (HGT, UGRD, VGRD) - - // URL загрузки (fmt-шаблоны: date, hour, hour, step) - URLMask string // основной pgrb2 - URLMaskB string // дополнительный pgrb2b - - // Имена файлов - FileSuffix string // токен разрешения в именах файлов ("0p25", "0p50") -} - -// SizePerVar возвращает размер одной переменной в кубе, байт. -func (dc *DatasetConfig) SizePerVar() int64 { - return int64(dc.NT) * int64(dc.NP) * int64(dc.NLat) * int64(dc.NLon) * 4 -} - -// CubeSize возвращает полный размер куба, байт. -func (dc *DatasetConfig) CubeSize() int64 { - return dc.SizePerVar() * int64(dc.NVar) -} - -// GridSize возвращает NLat * NLon. -func (dc *DatasetConfig) GridSize() int { - return dc.NLat * dc.NLon -} - -// InvResolution возвращает 1/Resolution — множитель для перевода координат в индексы. -func (dc *DatasetConfig) InvResolution() float64 { - return 1.0 / dc.Resolution -} - -// Steps возвращает список часов прогноза [0, TimeStep, 2*TimeStep, …, MaxHour]. -func (dc *DatasetConfig) Steps() []int { - out := make([]int, 0, dc.NT) - for h := 0; h <= dc.MaxHour; h += dc.TimeStep { - out = append(out, h) - } - return out -} - -// FileName возвращает имя основного GRIB-файла (pgrb2). -func (dc *DatasetConfig) FileName(run time.Time, step int) string { - return fmt.Sprintf("gfs.t%02dz.pgrb2.%s.f%03d", run.Hour(), dc.FileSuffix, step) -} - -// FileNameB возвращает имя вторичного GRIB-файла (pgrb2b). -func (dc *DatasetConfig) FileNameB(run time.Time, step int) string { - return fmt.Sprintf("gfs.t%02dz.pgrb2b.%s.f%03d", run.Hour(), dc.FileSuffix, step) -} - -// GribURL возвращает URL основного GRIB-файла. -func (dc *DatasetConfig) GribURL(run time.Time, step int) string { - return fmt.Sprintf(dc.URLMask, run.Format("20060102"), run.Hour(), run.Hour(), step) -} - -// GribURLB возвращает URL вторичного GRIB-файла. -func (dc *DatasetConfig) GribURLB(run time.Time, step int) string { - return fmt.Sprintf(dc.URLMaskB, run.Format("20060102"), run.Hour(), run.Hour(), step) -} - -// DefaultDatasetConfig возвращает конфиг GFS 0.25° / 1 час / 47 уровней. -func DefaultDatasetConfig() DatasetConfig { - return DatasetConfig{ - Resolution: 0.25, - NLat: 721, - NLon: 1440, - - NT: 97, - MaxHour: 96, - TimeStep: 1, - - NP: 47, - Levels: []float64{ - 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, - 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, - 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, - 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, - 20, 10, 7, 5, 3, 2, 1, - }, - - NVar: 3, - Variables: []string{"HGT", "UGRD", "VGRD"}, - - URLMask: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p25.f%03d", - URLMaskB: "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2b.0p25.f%03d", - - FileSuffix: "0p25", - } -} - -// --------------------------------------------------------------------------- - type Config struct { - Dir string `env:"DIR" envDefault:"C:/tmp/grib"` - TTL time.Duration `env:"TTL" envDefault:"48h"` - CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` - Parallel int `env:"PARALLEL" envDefault:"8"` - - Dataset DatasetConfig + Dir string `env:"DIR" envDefault:"/tmp/grib"` + TTL time.Duration `env:"TTL" envDefault:"24h"` + CacheTTL time.Duration `env:"CACHE_TTL" envDefault:"1h"` + Parallel int `env:"PARALLEL" envDefault:"8"` + DatasetURL string `env:"DATASET_URL" envDefault:"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"` + // S3 configuration + UseS3 bool `env:"USE_S3" envDefault:"true"` + S3Bucket string `env:"S3_BUCKET" envDefault:"noaa-gfs-bdp-pds"` + S3Region string `env:"S3_REGION" envDefault:"us-east-1"` + S3Timeout time.Duration `env:"S3_TIMEOUT" envDefault:"300s"` } func NewConfig() (*Config, error) { @@ -134,6 +27,6 @@ func NewConfig() (*Config, error) { }); err != nil { return nil, errcodes.Wrap(err, "failed to parse GRIB config") } - cfg.Dataset = DefaultDatasetConfig() + return cfg, nil } diff --git a/internal/pkg/grib/create_dataset.go b/internal/pkg/grib/create_dataset.go deleted file mode 100644 index e69de29..0000000 diff --git a/internal/pkg/grib/cube.go b/internal/pkg/grib/cube.go index dfc3606..d2015ec 100644 --- a/internal/pkg/grib/cube.go +++ b/internal/pkg/grib/cube.go @@ -15,7 +15,7 @@ type cube struct { file *os.File } -func openCube(path string, dc *DatasetConfig) (*cube, error) { +func openCube(path string) (*cube, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -27,15 +27,14 @@ func openCube(path string, dc *DatasetConfig) (*cube, error) { return nil, err } - return &cube{ - mm: mm, - t: dc.NT, - p: dc.NP, - lat: dc.NLat, - lon: dc.NLon, - bytesPerVar: dc.SizePerVar(), - file: f, - }, nil + const ( + nT = 97 // 0-96 hours with step 1 hour + nP = 47 // 47 pressure levels matching tawhiri + nLat = 361 + nLon = 720 + ) + + return &cube{mm: mm, t: nT, p: nP, lat: nLat, lon: nLon, bytesPerVar: int64(nT * nP * nLat * nLon * 4), file: f}, nil } func (c *cube) val(varIdx, ti, pi, y, x int) float32 { diff --git a/internal/pkg/grib/dataset.go b/internal/pkg/grib/dataset.go index f994dc9..e539f65 100644 --- a/internal/pkg/grib/dataset.go +++ b/internal/pkg/grib/dataset.go @@ -2,7 +2,6 @@ package grib type dataset struct { cube *cube - ds *DatasetConfig runUTC int64 // unix seconds } diff --git a/internal/pkg/grib/downloader.go b/internal/pkg/grib/downloader.go new file mode 100644 index 0000000..a93a64c --- /dev/null +++ b/internal/pkg/grib/downloader.go @@ -0,0 +1,91 @@ +package grib + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "golang.org/x/sync/errgroup" +) + +type Downloader struct { + Dir string + Parallel int + Client *http.Client + DatasetURL string +} + +func (d *Downloader) fileURL(run string, hour int, step int) string { + return fmt.Sprintf("%s/gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", d.DatasetURL, run, hour, hour, step) +} + +func (d *Downloader) fetch(ctx context.Context, url, dst string) (err error) { + // Check if final file already exists + if _, err := os.Stat(dst); err == nil { + return nil + } + + tmp := dst + ".part" + + // Remove old .part file if it exists (fixes race condition) + os.Remove(tmp) + + f, err := os.Create(tmp) + if err != nil { + return err + } + + // Cleanup .part file on any error (using named return value) + defer func() { + f.Close() + if err != nil { + os.Remove(tmp) + } + }() + + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := d.Client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) + } + + if _, err := io.Copy(f, resp.Body); err != nil { + return err + } + + // Close file before rename + if err := f.Close(); err != nil { + return err + } + + // If rename fails, err will be set and defer will cleanup .part file + return os.Rename(tmp, dst) +} + +func (d *Downloader) Run(ctx context.Context, run time.Time) error { + runStr := run.Format("20060102") + hour := run.Hour() + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + for _, step := range steps { + step := step + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + url := d.fileURL(runStr, hour, step) + dst := filepath.Join(d.Dir, fileName(run, step)) + return d.fetch(ctx, url, dst) + }) + } + return g.Wait() +} diff --git a/internal/pkg/grib/extractor.go b/internal/pkg/grib/extractor.go index d27f3a4..769b7cd 100644 --- a/internal/pkg/grib/extractor.go +++ b/internal/pkg/grib/extractor.go @@ -4,100 +4,29 @@ import "math" func lerp(a, b, t float64) float64 { return a + t*(b-a) } -// ghInterp returns interpolated geopotential height at given time/pressure/lat/lon -func (d *dataset) ghInterp(ti, pi int, y0, y1, x0, x1 int, wy, wx float64) float64 { - g00 := d.cube.val(0, ti, pi, y0, x0) - g10 := d.cube.val(0, ti, pi, y0, x1) - g01 := d.cube.val(0, ti, pi, y1, x0) - g11 := d.cube.val(0, ti, pi, y1, x1) - return (1-wy)*((1-wx)*float64(g00)+wx*float64(g10)) + wy*((1-wx)*float64(g01)+wx*float64(g11)) -} - -// searchAltLevel uses geopotential height to find pressure level bracket for target altitude. -func (d *dataset) searchAltLevel(alt float64, ti, y0, y1, x0, x1 int, wy, wx float64) (int, float64) { - levels := d.ds.Levels - nLevels := len(levels) - - lo, hi := 0, nLevels-1 - for lo < hi-1 { - mid := (lo + hi) / 2 - ghMid := d.ghInterp(ti, mid, y0, y1, x0, x1, wy, wx) - if ghMid < alt { - lo = mid - } else { - hi = mid - } - } - - ghLo := d.ghInterp(ti, lo, y0, y1, x0, x1, wy, wx) - ghHi := d.ghInterp(ti, hi, y0, y1, x0, x1, wy, wx) - - wp := 0.0 - if ghHi != ghLo { - wp = (alt - ghLo) / (ghHi - ghLo) - } - if wp < 0 { - wp = 0 - } - if wp > 1 { - wp = 1 - } - - return lo, wp -} - -// uv выполняет интерполяцию ветра по 4 измерениям (time, pressure, lat, lon). +// Interpolate 16‑point (time, p, lat, lon) func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { if lon < 0 { lon += 360 } - - inv := d.ds.InvResolution() - - // GRIB scan north→south: index 0 = 90°N - iy := (90 - lat) * inv + iy := (lat + 90) * 2 y0 := int(math.Floor(iy)) - if y0 < 0 { - y0 = 0 - } - if y0 >= d.cube.lat-1 { - y0 = d.cube.lat - 2 - } y1 := y0 + 1 wy := iy - float64(y0) - - ix := lon * inv + ix := lon * 2 x0 := int(math.Floor(ix)) % d.cube.lon x1 := (x0 + 1) % d.cube.lon wx := ix - float64(x0) - - // Время: tHours делим на шаг, чтобы получить индекс в кубе - tIdx := tHours / float64(d.ds.TimeStep) - it0 := int(math.Floor(tIdx)) - if it0 < 0 { - it0 = 0 - } - if it0 >= d.cube.t-1 { - it0 = d.cube.t - 2 - } - wt := tIdx - float64(it0) - - // ISA: высота → давление → индекс уровня - levels := d.ds.Levels + // For hourly data (step = 1 hour) + it0 := int(math.Floor(tHours)) + wt := tHours - float64(it0) p := pressureFromAlt(alt) ip0 := 0 - for ip0+1 < len(levels) && levels[ip0+1] > p { + for ip0+1 < len(pressureLevels) && pressureLevels[ip0+1] > p { ip0++ } ip1 := ip0 + 1 - if ip1 >= len(levels) { - ip1 = len(levels) - 1 - } - wp := 0.0 - if levels[ip0] != levels[ip1] { - wp = (levels[ip0] - p) / (levels[ip0] - levels[ip1]) - } - + wp := (pressureLevels[ip0] - p) / (pressureLevels[ip0] - pressureLevels[ip1]) fetch := func(ti, pi int) (float64, float64) { u00 := d.cube.val(1, ti, pi, y0, x0) u10 := d.cube.val(1, ti, pi, y0, x1) @@ -111,7 +40,6 @@ func (d *dataset) uv(lat, lon, alt float64, tHours float64) (float64, float64) { vxy := (1-wy)*((1-wx)*float64(v00)+wx*float64(v10)) + wy*((1-wx)*float64(v01)+wx*float64(v11)) return uxy, vxy } - u0p0, v0p0 := fetch(it0, ip0) u0p1, v0p1 := fetch(it0, ip1) u1p0, v1p0 := fetch(it0+1, ip0) diff --git a/internal/pkg/grib/grib.go b/internal/pkg/grib/grib.go index 7bf6bfc..32c4466 100644 --- a/internal/pkg/grib/grib.go +++ b/internal/pkg/grib/grib.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "math" + "net/http" "os" "path/filepath" "strings" @@ -40,12 +41,15 @@ func New(cfg *Config) (Service, error) { // Try to load existing dataset on startup if err := s.loadExistingDataset(); err != nil { // Log error but don't fail startup - dataset will be loaded on first Update() + // This allows the service to start even if no data is available yet } return s, nil } +// loadExistingDataset tries to load the most recent available dataset func (s *service) loadExistingDataset() error { + // Find the most recent cube file pattern := filepath.Join(s.cfg.Dir, "*.cube") matches, err := filepath.Glob(pattern) if err != nil { @@ -56,6 +60,7 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoCubeFilesFound } + // Sort by modification time (newest first) var latestFile string var latestTime time.Time @@ -64,6 +69,7 @@ func (s *service) loadExistingDataset() error { if err != nil { continue } + if info.ModTime().After(latestTime) { latestTime = info.ModTime() latestFile = match @@ -74,16 +80,18 @@ func (s *service) loadExistingDataset() error { return errcodes.ErrNoValidCubeFilesFound } + // Check if the file is fresh enough if time.Since(latestTime) > s.cfg.TTL { return errcodes.Wrap(errcodes.ErrLatestCubeFileIsTooOld, "latest cube file is too old") } - dc := &s.cfg.Dataset - c, err := openCube(latestFile, dc) + // Load the dataset + c, err := openCube(latestFile) if err != nil { return err } + // Extract run time from filename base := filepath.Base(latestFile) runStr := strings.TrimSuffix(base, ".cube") run, err := time.Parse("20060102_15", runStr) @@ -92,70 +100,94 @@ func (s *service) loadExistingDataset() error { return err } - s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) + return nil } +// Update() downloads missing GRIBs, assembles cube into a single mmap‑file. func (s *service) Update(ctx context.Context) error { + // Check if we already have fresh data if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { + // Data is still fresh, no need to update return nil } } + // Check again after acquiring lock (double-checked locking pattern) if d := s.data.Load(); d != nil { runTime := time.Unix(d.runUTC, 0) if time.Since(runTime) < s.cfg.TTL { + // Another instance already updated the data return nil } } - dc := &s.cfg.Dataset - run := nearestRun(time.Now().UTC().Add(-6 * time.Hour)) + run := nearestRun(time.Now().UTC().Add(-24 * time.Hour)) + // Check if we already have this run cubePath := filepath.Join(s.cfg.Dir, run.Format("20060102_15")) + ".cube" if _, err := os.Stat(cubePath); err == nil { + // File exists, check if it's fresh info, err := os.Stat(cubePath) if err == nil && time.Since(info.ModTime()) < s.cfg.TTL { - c, err := openCube(cubePath, dc) + // File is fresh, just load it + c, err := openCube(cubePath) if err != nil { return err } - s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } } - downloadCtx, cancel := context.WithTimeout(ctx, 60*time.Minute) - defer cancel() - - dl := NewPartialDownloader(s.cfg.Dir, s.cfg.Parallel, dc) - if err := dl.Run(downloadCtx, run); err != nil { - return err + // Download new data using S3 or HTTP + var downloadErr error + if s.cfg.UseS3 { + s3dl, err := NewS3Downloader(s.cfg.Dir, s.cfg.Parallel, s.cfg.S3Bucket, s.cfg.S3Region) + if err != nil { + return errcodes.Wrap(err, "failed to create S3 downloader") + } + downloadErr = s3dl.Run(ctx, run) + } else { + dl := Downloader{ + Dir: s.cfg.Dir, + Parallel: s.cfg.Parallel, + Client: http.DefaultClient, + DatasetURL: s.cfg.DatasetURL, + } + downloadErr = dl.Run(ctx, run) } + if downloadErr != nil { + return downloadErr + } + + // Assemble cube if it doesn't exist if _, err := os.Stat(cubePath); err != nil { - if err := assembleCube(s.cfg.Dir, run, cubePath, dc); err != nil { + if err := assembleCube(s.cfg.Dir, run, cubePath); err != nil { return err } } - c, err := openCube(cubePath, dc) + c, err := openCube(cubePath) if err != nil { return err } - s.data.Store(&dataset{cube: c, ds: dc, runUTC: run.Unix()}) + ds := &dataset{cube: c, runUTC: run.Unix()} + s.data.Store(ds) s.cache = memCache{ttl: s.cfg.CacheTTL} return nil } -func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) error { - sizePerVar := dc.SizePerVar() - total := dc.CubeSize() - gridBytes := int64(dc.GridSize()) * 4 - +func assembleCube(dir string, run time.Time, cubePath string) error { + const sizePerVar = 97 * 47 * 361 * 720 * 4 // 97 time steps (0-96 hours), 47 pressure levels + total := int64(sizePerVar * 3) // 3 variables: gh, u, v f, err := os.Create(cubePath) if err != nil { return err @@ -171,23 +203,27 @@ func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) defer f.Close() pIndex := make(map[int]int) - for i, p := range dc.Levels { + for i, p := range pressureLevels { pIndex[int(math.Round(p))] = i } - processFile := func(fn string, ti int) error { + for ti, step := range steps { + fn := filepath.Join(dir, fileName(run, step)) file, err := os.Open(fn) if err != nil { return err } messages, err := griblib.ReadMessages(file) - file.Close() + file.Close() // Close immediately after reading if err != nil { return err } for _, m := range messages { + // Check if this is a wind component (u or v) or geopotential height + // ParameterCategory 2 = momentum, ParameterNumber 2 = u-wind, 3 = v-wind + // ParameterCategory 3 = mass, ParameterNumber 5 = geopotential height if m.Section4.ProductDefinitionTemplateNumber != 0 { continue } @@ -195,6 +231,7 @@ func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) product := m.Section4.ProductDefinitionTemplate var varIdx int + // Match tawhiri variable order: ['gh', 'u', 'v'] (indices 0, 1, 2) if product.ParameterCategory == 2 { switch product.ParameterNumber { case 2: // u-wind @@ -205,15 +242,18 @@ func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) continue } } else if product.ParameterCategory == 3 && product.ParameterNumber == 5 { - varIdx = 0 // geopotential height + // geopotential height + varIdx = 0 } else { continue } + // Check if this is a pressure level (type 100) if product.FirstSurface.Type != 100 { continue } + // Get pressure level in hPa pressure := float64(product.FirstSurface.Value) / 100.0 pIdx, ok := pIndex[int(math.Round(pressure))] if !ok { @@ -221,27 +261,14 @@ func assembleCube(dir string, run time.Time, cubePath string, dc *DatasetConfig) } vals := m.Data() + // GRIB library returns scan north->south, west->east already in row-major order raw := make([]byte, len(vals)*4) for i, v := range vals { binary.LittleEndian.PutUint32(raw[i*4:], math.Float32bits(float32(v))) } - base := int64(varIdx)*sizePerVar + (int64(ti)*int64(dc.NP)+int64(pIdx))*gridBytes + base := int64(varIdx*sizePerVar + (ti*47+pIdx)*361*720*4) copy(mm[base:base+int64(len(raw))], raw) } - return nil - } - - steps := dc.Steps() - for ti, step := range steps { - fn := filepath.Join(dir, dc.FileName(run, step)) - if err := processFile(fn, ti); err != nil { - return err - } - - fnB := filepath.Join(dir, dc.FileNameB(run, step)) - if err := processFile(fnB, ti); err != nil { - return err - } } return mm.Flush() } @@ -252,21 +279,24 @@ func (s *service) Extract(ctx context.Context, lat, lon, alt float64, ts time.Ti if d == nil { return zero, errcodes.ErrNoDataset } - maxDur := time.Duration(s.cfg.Dataset.MaxHour) * time.Hour - if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(maxDur)) { + if ts.Before(time.Unix(d.runUTC, 0)) || ts.After(time.Unix(d.runUTC, 0).Add(96*time.Hour)) { return zero, errcodes.ErrOutOfBounds } + // Try memory cache first key := encodeKey(lat, lon, alt, ts) if v, ok := s.cache.get(key); ok { return [2]float64(v), nil } + // Calculate result td := ts.Sub(time.Unix(d.runUTC, 0)).Hours() u, v := d.uv(lat, lon, alt, td) out := [2]float64{u, v} + // Cache in memory s.cache.set(key, vec(out)) + return out, nil } diff --git a/internal/pkg/grib/partial_downloader.go b/internal/pkg/grib/partial_downloader.go deleted file mode 100644 index 3752504..0000000 --- a/internal/pkg/grib/partial_downloader.go +++ /dev/null @@ -1,350 +0,0 @@ -package grib - -import ( - "bufio" - "context" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "strconv" - "strings" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" - "git.intra.yksa.space/gsn/predictor/internal/pkg/log" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -// PartialDownloader загружает только необходимые поля из GRIB файлов -// используя HTTP Range requests и .idx индексные файлы -type PartialDownloader struct { - Dir string - Parallel int - Client *http.Client - Variables []string - ds *DatasetConfig -} - -// NewPartialDownloader создаёт новый partial downloader -func NewPartialDownloader(dir string, parallel int, dc *DatasetConfig) *PartialDownloader { - return &PartialDownloader{ - Dir: dir, - Parallel: parallel, - Client: &http.Client{ - Timeout: 60 * time.Second, - }, - Variables: dc.Variables, - ds: dc, - } -} - -// idxEntry представляет запись из .idx файла -type idxEntry struct { - Index int - ByteStart int64 - Date string - Variable string - Level string - Forecast string -} - -type ProgressWriter struct { - Total int64 - Downloaded int64 - OnProgress func(percent float64) -} - -func (pw *ProgressWriter) Write(p []byte) (int, error) { - n := len(p) - pw.Downloaded += int64(n) - if pw.Total > 0 && pw.OnProgress != nil { - percent := float64(pw.Downloaded) / float64(pw.Total) * 100 - pw.OnProgress(percent) - } - return n, nil -} - -// parseIdx парсит .idx файл и возвращает записи -func (d *PartialDownloader) parseIdx(body []byte) []idxEntry { - var entries []idxEntry - lines := strings.Split(string(body), "\n") - - for _, line := range lines { - if line == "" { - continue - } - parts := strings.Split(line, ":") - if len(parts) < 7 { - continue - } - - byteStart, _ := strconv.ParseInt(parts[1], 10, 64) - entries = append(entries, idxEntry{ - Index: len(entries), - ByteStart: byteStart, - Date: parts[2], - Variable: parts[3], - Level: parts[4], - Forecast: parts[5], - }) - } - return entries -} - -// filterEntries фильтрует записи по нужным переменным и уровням давления -func (d *PartialDownloader) filterEntries(entries []idxEntry) []idxEntry { - var filtered []idxEntry - - for _, e := range entries { - isNeededVar := false - for _, v := range d.Variables { - if v == e.Variable { - isNeededVar = true - break - } - } - - isPressureLevel := strings.HasSuffix(e.Level, " mb") - - if isNeededVar && isPressureLevel { - filtered = append(filtered, e) - } - } - - return filtered -} - -// Вспомогательная функция для выполнения запроса с повторами -func (d *PartialDownloader) doWithRetry(ctx context.Context, req *http.Request) (*http.Response, error) { - var resp *http.Response - var err error - - backoff := 1 * time.Second - maxRetries := 3 - - for i := 0; i < maxRetries; i++ { - resp, err = d.Client.Do(req) - if err == nil && resp.StatusCode < 500 { - return resp, nil - } - - if resp != nil { - resp.Body.Close() - } - - log.Ctx(ctx).Warn("retry download", zap.Int("attempt", i+1), zap.Error(err)) - - select { - case <-time.After(backoff): - backoff *= 2 - case <-ctx.Done(): - return nil, ctx.Err() - } - } - return nil, err -} - -// downloadRange загружает диапазон байтов из URL -func (d *PartialDownloader) downloadRange(ctx context.Context, url string, start, end int64, out io.Writer) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return err - } - - rangeHeader := fmt.Sprintf("bytes=%d-", start) - if end > 0 { - rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end) - } - req.Header.Set("Range", rangeHeader) - - resp, err := d.Client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { - return errcodes.Wrap(errcodes.ErrDownload, "bad status: "+resp.Status) - } - - _, err = io.Copy(out, resp.Body) - return err -} - -func (d *PartialDownloader) downloadFieldsFromURL(ctx context.Context, url string, dst string, step int) (err error) { - idxURL := url + ".idx" - tmp := dst + ".part" - - if info, err := os.Stat(dst); err == nil && info.Size() > 0 { - return nil - } - - reqIdx, _ := http.NewRequestWithContext(ctx, http.MethodGet, idxURL, nil) - respIdx, err := d.doWithRetry(ctx, reqIdx) - if err != nil { - return errcodes.Wrap(err, "failed to get idx") - } - defer respIdx.Body.Close() - - idxBody, _ := io.ReadAll(respIdx.Body) - entries := d.parseIdx(idxBody) - filtered := d.filterEntries(entries) - if len(filtered) == 0 { - return nil - } - - var totalBytes int64 - type chunk struct{ start, end int64 } - chunks := make([]chunk, 0, len(filtered)) - - for _, entry := range filtered { - var endByte int64 = -1 - for j, e := range entries { - if e.ByteStart == entry.ByteStart && j+1 < len(entries) { - endByte = entries[j+1].ByteStart - 1 - break - } - } - chunks = append(chunks, chunk{entry.ByteStart, endByte}) - if endByte > 0 { - totalBytes += (endByte - entry.ByteStart + 1) - } - } - - f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) - if err != nil { - return err - } - - var downloaded int64 - - err = func() error { - defer f.Close() - bufWriter := bufio.NewWriterSize(f, 1024*1024) - - for i, c := range chunks { - countingWriter := &proxyWriter{ - Writer: bufWriter, - OnWrite: func(n int) { - downloaded += int64(n) - if totalBytes > 0 && i%20 == 0 { - pct := float64(downloaded) / float64(totalBytes) * 100 - log.Ctx(ctx).Debug("download progress", - zap.Int("step", step), - zap.String("pct", fmt.Sprintf("%.1f%%", pct))) - } - }, - } - - if err := d.downloadRange(ctx, url, c.start, c.end, countingWriter); err != nil { - return err - } - } - return bufWriter.Flush() - }() - - if err != nil { - f.Close() - os.Remove(tmp) - return err - } - - return d.safeRename(tmp, dst) -} - -type proxyWriter struct { - io.Writer - OnWrite func(int) -} - -func (p *proxyWriter) Write(data []byte) (int, error) { - n, err := p.Writer.Write(data) - if n > 0 && p.OnWrite != nil { - p.OnWrite(n) - } - return n, err -} - -func (d *PartialDownloader) safeRename(src, dst string) error { - var lastErr error - for i := 0; i < 5; i++ { - if err := os.Rename(src, dst); err == nil { - return nil - } else { - lastErr = err - } - time.Sleep(150 * time.Millisecond) - } - return fmt.Errorf("rename failed: %w", lastErr) -} - -// Run запускает загрузку всех необходимых файлов (pgrb2 + pgrb2b) -func (d *PartialDownloader) Run(ctx context.Context, run time.Time) error { - log.Ctx(ctx).Info("starting partial download", - zap.Time("run", run), - zap.Strings("variables", d.Variables)) - - g, ctx := errgroup.WithContext(ctx) - sem := make(chan struct{}, d.Parallel) - steps := d.ds.Steps() - - for _, step := range steps { - step := step - - // Download primary pgrb2 - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - url := d.ds.GribURL(run, step) - dst := filepath.Join(d.Dir, d.ds.FileName(run, step)) - return d.downloadFieldsFromURL(ctx, url, dst, step) - }) - - // Download secondary pgrb2b - sem <- struct{}{} - g.Go(func() error { - defer func() { <-sem }() - url := d.ds.GribURLB(run, step) - dst := filepath.Join(d.Dir, d.ds.FileNameB(run, step)) - return d.downloadFieldsFromURL(ctx, url, dst, step) - }) - } - - return g.Wait() -} - -// GetLatestModelRun находит последний доступный прогноз GFS -func GetLatestModelRun(ctx context.Context, dc *DatasetConfig) (time.Time, error) { - now := time.Now().UTC() - hour := now.Hour() - (now.Hour() % 6) - current := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC) - - client := &http.Client{Timeout: 10 * time.Second} - - for i := 0; i < 8; i++ { - url := dc.GribURL(current, dc.MaxHour) - - req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) - if err != nil { - current = current.Add(-6 * time.Hour) - continue - } - - resp, err := client.Do(req) - if err == nil && resp.StatusCode == http.StatusOK { - resp.Body.Close() - log.Ctx(ctx).Info("found latest model run", zap.Time("run", current)) - return current, nil - } - if resp != nil { - resp.Body.Close() - } - - current = current.Add(-6 * time.Hour) - } - - return time.Time{}, errcodes.Wrap(errcodes.ErrDownload, "no recent GFS forecast found") -} diff --git a/internal/pkg/grib/pressure.go b/internal/pkg/grib/pressure.go index 1b6dec0..add6ff0 100644 --- a/internal/pkg/grib/pressure.go +++ b/internal/pkg/grib/pressure.go @@ -2,6 +2,15 @@ package grib import "math" +// 47 pressure levels matching tawhiri configuration +var pressureLevels = []float64{ + 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, + 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, + 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, + 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, + 20, 10, 7, 5, 3, 2, 1, +} + func pressureFromAlt(alt float64) float64 { // ICAO ISA return 1013.25 * math.Pow(1-alt/44307.69396, 5.255877) } diff --git a/internal/pkg/grib/s3_downloader.go b/internal/pkg/grib/s3_downloader.go new file mode 100644 index 0000000..0fa4c70 --- /dev/null +++ b/internal/pkg/grib/s3_downloader.go @@ -0,0 +1,265 @@ +package grib + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/errcodes" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "golang.org/x/sync/errgroup" +) + +// S3Downloader downloads GRIB files from AWS S3 +type S3Downloader struct { + Dir string + Parallel int + Bucket string + Region string + Client *s3.Client +} + +// NewS3Downloader creates a new S3 downloader with anonymous access +func NewS3Downloader(dir string, parallel int, bucket, region string) (*S3Downloader, error) { + // Create AWS config with anonymous credentials for public bucket + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region), + config.WithCredentialsProvider(aws.AnonymousCredentials{}), + ) + if err != nil { + return nil, errcodes.Wrap(err, "failed to load AWS config") + } + + client := s3.NewFromConfig(cfg) + + return &S3Downloader{ + Dir: dir, + Parallel: parallel, + Bucket: bucket, + Region: region, + Client: client, + }, nil +} + +// s3Key generates the S3 key for a GRIB file +// Path format: gfs.YYYYMMDD/HH/atmos/gfs.tHHz.pgrb2.0p50.fFFF +func (d *S3Downloader) s3Key(run string, hour int, step int) string { + return fmt.Sprintf("gfs.%s/%02d/atmos/gfs.t%02dz.pgrb2.0p50.f%03d", run, hour, hour, step) +} + +// CheckFileExists checks if a file exists in S3 using HeadObject +func (d *S3Downloader) CheckFileExists(ctx context.Context, key string) (bool, int64, error) { + input := &s3.HeadObjectInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + } + + result, err := d.Client.HeadObject(ctx, input) + if err != nil { + // Check if error is NotFound + // AWS SDK v2 doesn't export specific error types, check error string + if isNotFoundError(err) { + return false, 0, nil + } + return false, 0, errcodes.Wrap(err, "failed to check file existence") + } + + size := int64(0) + if result.ContentLength != nil { + size = *result.ContentLength + } + + return true, size, nil +} + +// isNotFoundError checks if error is a NotFound error +func isNotFoundError(err error) bool { + if err == nil { + return false + } + // AWS SDK v2 error handling + errStr := err.Error() + return contains(errStr, "NotFound") || contains(errStr, "404") || contains(errStr, "NoSuchKey") +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) +} + +func findSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// ListAvailableFiles lists all available files for a given run +func (d *S3Downloader) ListAvailableFiles(ctx context.Context, run string, hour int) ([]string, error) { + prefix := fmt.Sprintf("gfs.%s/%02d/atmos/", run, hour) + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(prefix), + } + + var files []string + paginator := s3.NewListObjectsV2Paginator(d.Client, input) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, errcodes.Wrap(err, "failed to list S3 objects") + } + + for _, obj := range page.Contents { + if obj.Key != nil { + files = append(files, *obj.Key) + } + } + } + + return files, nil +} + +// fetchFromS3 downloads a file from S3 to local disk with retry logic +func (d *S3Downloader) fetchFromS3(ctx context.Context, key, dst string) (err error) { + // Check if final file already exists + if _, err := os.Stat(dst); err == nil { + return nil + } + + const maxRetries = 3 + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + // Exponential backoff: 2s, 4s, 8s + waitTime := time.Duration(1< 0 && written != size { + return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("size mismatch: got %d bytes, expected %d", written, size)) + } + + // Close file before rename + if err := f.Close(); err != nil { + return err + } + fileClosed = true + + // If rename fails, err will be set and defer will cleanup .part file + return os.Rename(tmp, dst) +} + +// Run downloads all required GRIB files for a forecast run +func (d *S3Downloader) Run(ctx context.Context, run time.Time) error { + runStr := run.Format("20060102") + hour := run.Hour() + + // First, list available files to verify they exist + availableFiles, err := d.ListAvailableFiles(ctx, runStr, hour) + if err != nil { + return errcodes.Wrap(err, "failed to list available files") + } + + if len(availableFiles) == 0 { + return errcodes.Wrap(errcodes.ErrDownload, fmt.Sprintf("no files found for run %s/%02d", runStr, hour)) + } + + // Build a map of available files for quick lookup + availableMap := make(map[string]bool) + for _, file := range availableFiles { + availableMap[file] = true + } + + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, d.Parallel) + + for _, step := range steps { + step := step + key := d.s3Key(runStr, hour, step) + + // Check if file is available in S3 + if !availableMap[key] { + // Log warning but don't fail - some forecast hours might not be available yet + continue + } + + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + dst := filepath.Join(d.Dir, fileName(run, step)) + return d.fetchFromS3(ctx, key, dst) + }) + } + + return g.Wait() +} diff --git a/internal/pkg/grib/util.go b/internal/pkg/grib/util.go index 1145a61..8de4af7 100644 --- a/internal/pkg/grib/util.go +++ b/internal/pkg/grib/util.go @@ -6,11 +6,25 @@ import ( "time" ) +// Generate steps from 0 to 96 with step 1 hour (97 steps total) +// GFS provides hourly data for 0-120 hours, we use first 96 hours +var steps = func() []int { + result := make([]int, 0, 97) + for i := 0; i <= 96; i++ { + result = append(result, i) + } + return result +}() + func nearestRun(t time.Time) time.Time { h := t.UTC().Hour() - t.UTC().Hour()%6 return time.Date(t.Year(), t.Month(), t.Day(), h, 0, 0, 0, time.UTC) } +func fileName(run time.Time, step int) string { + return fmt.Sprintf("gfs.t%02dz.pgrb2.0p50.f%03d", run.Hour(), step) +} + func encodeKey(a ...any) uint64 { h := fnv.New64a() for _, v := range a { diff --git a/internal/service/predictor.go b/internal/service/predictor.go index 9138c5e..e5c6fad 100644 --- a/internal/service/predictor.go +++ b/internal/service/predictor.go @@ -23,22 +23,6 @@ type Stage struct { EndTime time.Time } -// shouldSimulateStage checks if a given stage should be simulated based on the SimulateStages filter -func shouldSimulateStage(params ds.PredictionParameters, stage string) bool { - // If no filter is specified, simulate all stages - if len(params.SimulateStages) == 0 { - return true - } - - // Check if the stage is in the filter list - for _, s := range params.SimulateStages { - if s == stage { - return true - } - } - return false -} - // CustomCurve represents a custom ascent/descent curve type CustomCurve struct { Altitude []float64 `json:"altitude"` @@ -90,7 +74,7 @@ func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionPar } } - log.Ctx(ctx).Warn("🚀 PREDICTION STARTING", + log.Ctx(ctx).Info("Starting prediction", zap.String("profile", profile), zap.Float64("lat", *params.LaunchLatitude), zap.Float64("lon", *params.LaunchLongitude), @@ -119,27 +103,16 @@ func (s *Service) PerformPrediction(ctx context.Context, params ds.PredictionPar func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult - var lastResult ds.PredicitonResult // Stage 1: Ascent - if shouldSimulateStage(params, "ascent") { - ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) - results = append(results, ascentResults...) - if len(ascentResults) > 0 { - lastResult = ascentResults[len(ascentResults)-1] - } - } else { - // If ascent is skipped, use initial position as starting point - lastResult = ds.PredicitonResult{ - Latitude: params.LaunchLatitude, - Longitude: params.LaunchLongitude, - Altitude: &burstAltitude, - Timestamp: params.LaunchDatetime, - } - } + ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) + results = append(results, ascentResults...) - // Stage 2: Descent - if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { + if len(ascentResults) > 0 { + // Get final position from ascent + lastResult := ascentResults[len(ascentResults)-1] + + // Stage 2: Descent descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -156,45 +129,30 @@ func (s *Service) standardProfile(ctx context.Context, params ds.PredictionParam func (s *Service) floatProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, floatAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult - var lastResult ds.PredicitonResult // Stage 1: Ascent to float altitude - if shouldSimulateStage(params, "ascent") { - ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve) - results = append(results, ascentResults...) - if len(ascentResults) > 0 { - lastResult = ascentResults[len(ascentResults)-1] - } - } else { - // If ascent is skipped, use initial position at float altitude as starting point - lastResult = ds.PredicitonResult{ - Latitude: params.LaunchLatitude, - Longitude: params.LaunchLongitude, - Altitude: &floatAltitude, - Timestamp: params.LaunchDatetime, - } - } + ascentResults := s.simulateAscent(ctx, params, ascentRate, floatAltitude, ascentCurve) + results = append(results, ascentResults...) - // Stage 2: Float (simulate for some time) - if shouldSimulateStage(params, "float") && lastResult.Latitude != nil { + if len(ascentResults) > 0 { + // Stage 2: Float (simulate for some time) + lastResult := ascentResults[len(ascentResults)-1] floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) // Float for 30 minutes results = append(results, floatResults...) + if len(floatResults) > 0 { - lastResult = floatResults[len(floatResults)-1] - } - } + // Stage 3: Descent + finalFloat := floatResults[len(floatResults)-1] + descentParams := ds.PredictionParameters{ + LaunchLatitude: finalFloat.Latitude, + LaunchLongitude: finalFloat.Longitude, + LaunchAltitude: finalFloat.Altitude, + LaunchDatetime: finalFloat.Timestamp, + } - // Stage 3: Descent - if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { - descentParams := ds.PredictionParameters{ - LaunchLatitude: lastResult.Latitude, - LaunchLongitude: lastResult.Longitude, - LaunchAltitude: lastResult.Altitude, - LaunchDatetime: lastResult.Timestamp, + descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve) + results = append(results, descentResults...) } - - descentResults := s.simulateDescent(ctx, descentParams, descentRate, 0, descentCurve) - results = append(results, descentResults...) } return results @@ -202,32 +160,14 @@ func (s *Service) floatProfile(ctx context.Context, params ds.PredictionParamete func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParameters, ascentRate, burstAltitude, descentRate float64, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult - var lastResult ds.PredicitonResult // Stage 1: Ascent - if shouldSimulateStage(params, "ascent") { - ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) - results = append(results, ascentResults...) - if len(ascentResults) > 0 { - lastResult = ascentResults[len(ascentResults)-1] - } - } else { - // If ascent is skipped, use initial position at burst altitude as starting point - lastResult = ds.PredicitonResult{ - Latitude: params.LaunchLatitude, - Longitude: params.LaunchLongitude, - Altitude: &burstAltitude, - Timestamp: params.LaunchDatetime, - } - } + ascentResults := s.simulateAscent(ctx, params, ascentRate, burstAltitude, ascentCurve) + results = append(results, ascentResults...) - // Stage 2: Descent to float altitude - floatAlt := 0.0 - if params.FloatAltitude != nil { - floatAlt = *params.FloatAltitude - } - - if shouldSimulateStage(params, "descent") && lastResult.Latitude != nil { + if len(ascentResults) > 0 { + // Stage 2: Descent to float altitude + lastResult := ascentResults[len(ascentResults)-1] descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -235,25 +175,21 @@ func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParame LaunchDatetime: lastResult.Timestamp, } + // Descent to float altitude (if specified) + floatAlt := 0.0 + if params.FloatAltitude != nil { + floatAlt = *params.FloatAltitude + } + descentResults := s.simulateDescent(ctx, descentParams, descentRate, floatAlt, descentCurve) results = append(results, descentResults...) - if len(descentResults) > 0 { - lastResult = descentResults[len(descentResults)-1] - } - } else if floatAlt > 0 { - // If descent is skipped but we need to float, position at float altitude - lastResult = ds.PredicitonResult{ - Latitude: lastResult.Latitude, - Longitude: lastResult.Longitude, - Altitude: &floatAlt, - Timestamp: lastResult.Timestamp, - } - } - // Stage 3: Float - if shouldSimulateStage(params, "float") && floatAlt > 0 && lastResult.Latitude != nil { - floatResults := s.simulateFloat(ctx, lastResult, 30*time.Minute) - results = append(results, floatResults...) + if floatAlt > 0 && len(descentResults) > 0 { + // Stage 3: Float + finalDescent := descentResults[len(descentResults)-1] + floatResults := s.simulateFloat(ctx, finalDescent, 30*time.Minute) + results = append(results, floatResults...) + } } return results @@ -261,27 +197,14 @@ func (s *Service) reverseProfile(ctx context.Context, params ds.PredictionParame func (s *Service) customProfile(ctx context.Context, params ds.PredictionParameters, ascentCurve, descentCurve *CustomCurve) []ds.PredicitonResult { var results []ds.PredicitonResult - var lastResult ds.PredicitonResult - // Custom ascent - if shouldSimulateStage(params, "ascent") && ascentCurve != nil { + if ascentCurve != nil { ascentResults := s.simulateCustomAscent(ctx, params, ascentCurve) results = append(results, ascentResults...) - if len(ascentResults) > 0 { - lastResult = ascentResults[len(ascentResults)-1] - } - } else if len(results) == 0 { - // If ascent is skipped, use initial position - lastResult = ds.PredicitonResult{ - Latitude: params.LaunchLatitude, - Longitude: params.LaunchLongitude, - Altitude: params.LaunchAltitude, - Timestamp: params.LaunchDatetime, - } } - // Custom descent - if shouldSimulateStage(params, "descent") && descentCurve != nil && lastResult.Latitude != nil { + if descentCurve != nil && len(results) > 0 { + lastResult := results[len(results)-1] descentParams := ds.PredictionParameters{ LaunchLatitude: lastResult.Latitude, LaunchLongitude: lastResult.Longitude, @@ -325,10 +248,6 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame const dt = 10.0 // simulation step in seconds const outputInterval = 60.0 // output every 60 seconds - log.Ctx(ctx).Warn("⬆️ ASCENT SIMULATION STARTING", - zap.Float64("ascentRate", ascentRate), - zap.Float64("targetAlt", targetAltitude)) - lat := *params.LaunchLatitude lon := *params.LaunchLongitude alt := *params.LaunchAltitude @@ -353,29 +272,12 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame }) nextOutputTime := timeCur.Add(time.Duration(outputInterval) * time.Second) - firstExtraction := true windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Error("Wind extraction FAILED during ascent", - zap.Error(err), - zap.Float64("lat", lat), - zap.Float64("lon", lon), - zap.Float64("alt", alt), - zap.Time("time", t)) + log.Ctx(ctx).Warn("Wind extraction failed during ascent", zap.Error(err)) return 0, 0 } - // Log only first extraction and when wind is zero - if firstExtraction || (w[0] == 0 && w[1] == 0) { - log.Ctx(ctx).Warn("Wind data check", - zap.Bool("first", firstExtraction), - zap.Float64("lat", lat), - zap.Float64("lon", lon), - zap.Float64("alt", alt), - zap.Float64("u", w[0]), - zap.Float64("v", w[1])) - firstExtraction = false - } return w[0], w[1] } @@ -391,23 +293,6 @@ func (s *Service) simulateAscent(ctx context.Context, params ds.PredictionParame alt = altNew if alt >= targetAltitude { - alt = targetAltitude - // Record burst point - wU, wV := windFunc(lat, lon, alt, timeCur) - latCopy := lat - lonCopy := lon - altCopy := alt - timeCopy := timeCur - windU := wU - windV := wV - results = append(results, ds.PredicitonResult{ - Latitude: &latCopy, - Longitude: &lonCopy, - Altitude: &altCopy, - Timestamp: &timeCopy, - WindU: &windU, - WindV: &windV, - }) break } @@ -465,19 +350,14 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam windFunc := func(lat, lon, alt float64, t time.Time) (float64, float64) { w, err := s.ExtractWind(ctx, lat, lon, alt, t) if err != nil { - log.Ctx(ctx).Error("Wind extraction FAILED during descent", - zap.Error(err), - zap.Float64("lat", lat), - zap.Float64("lon", lon), - zap.Float64("alt", alt), - zap.Time("time", t)) + log.Ctx(ctx).Warn("Wind extraction failed during descent", zap.Error(err)) return 0, 0 } return w[0], w[1] } for alt > targetAltitude { - altRate := -descentRateAtAlt(descentRate, alt) + altRate := -descentRate if customCurve != nil { altRate = -s.getCustomAltitudeRate(customCurve, alt, descentRate) } @@ -488,23 +368,6 @@ func (s *Service) simulateDescent(ctx context.Context, params ds.PredictionParam alt = altNew if alt <= targetAltitude { - alt = targetAltitude - // Record landing point - wU, wV := windFunc(lat, lon, alt, timeCur) - latCopy := lat - lonCopy := lon - altCopy := alt - timeCopy := timeCur - windU := wU - windV := wV - results = append(results, ds.PredicitonResult{ - Latitude: &latCopy, - Longitude: &lonCopy, - Altitude: &altCopy, - Timestamp: &timeCopy, - WindU: &windU, - WindV: &windV, - }) break } @@ -599,37 +462,6 @@ func (s *Service) simulateFloat(ctx context.Context, startResult ds.PredicitonRe return results } -// airDensity returns ISA air density in kg/m³ at given altitude in meters -func airDensity(h float64) float64 { - var T, p float64 - switch { - case h < 11000: - T = 288.15 - 0.0065*h - p = 101325 * math.Pow(T/288.15, 5.2561) - case h < 20000: - T = 216.65 - p = 22632.1 * math.Exp(-0.00015769*(h-11000)) - case h < 32000: - T = 216.65 + 0.001*(h-20000) - p = 5474.89 * math.Pow(T/216.65, -34.1632) - default: - T = 228.65 + 0.0028*(h-32000) - p = 868.019 * math.Pow(T/228.65, -12.2009) - } - return p / (287.05 * T) -} - -// descentRateAtAlt returns descent rate adjusted for air density at altitude. -// descent_rate parameter is the sea-level rate. At altitude, thinner air means faster descent. -func descentRateAtAlt(seaLevelRate, alt float64) float64 { - rho0 := airDensity(0) - rhoH := airDensity(alt) - if rhoH <= 0 { - return seaLevelRate - } - return seaLevelRate * math.Sqrt(rho0/rhoH) -} - func (s *Service) simulateCustomAscent(ctx context.Context, params ds.PredictionParameters, curve *CustomCurve) []ds.PredicitonResult { // Implementation for custom ascent curve // This would interpolate the altitude rate from the custom curve diff --git a/internal/service/predictor_test.go b/internal/service/predictor_test.go deleted file mode 100644 index 66ab76e..0000000 --- a/internal/service/predictor_test.go +++ /dev/null @@ -1,492 +0,0 @@ -package service - -import ( - "context" - "testing" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/ds" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -// MockGrib is a mock implementation of the Grib interface -type MockGrib struct { - mock.Mock -} - -func (m *MockGrib) Update(ctx context.Context) error { - args := m.Called(ctx) - return args.Error(0) -} - -func (m *MockGrib) Extract(ctx context.Context, lat, lon, alt float64, t time.Time) ([2]float64, error) { - args := m.Called(ctx, lat, lon, alt, t) - return args.Get(0).([2]float64), args.Error(1) -} - -func (m *MockGrib) Close() error { - args := m.Called() - return args.Error(0) -} - -// Helper function to create a test service with mocked GRIB -func createTestService() (*Service, *MockGrib) { - mockGrib := new(MockGrib) - - // Default mock behavior: return constant wind (5 m/s east, 3 m/s north) - mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return([2]float64{5.0, 3.0}, nil) - - service := &Service{ - grib: mockGrib, - } - - return service, mockGrib -} - -// Helper function to create basic prediction parameters -func createBasicParams() ds.PredictionParameters { - lat := 40.0 - lon := -105.0 - alt := 1000.0 - launchTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) - profile := "standard_profile" - ascentRate := 5.0 - burstAltitude := 10000.0 - descentRate := 5.0 - - return ds.PredictionParameters{ - LaunchLatitude: &lat, - LaunchLongitude: &lon, - LaunchAltitude: &alt, - LaunchDatetime: &launchTime, - Profile: &profile, - AscentRate: &ascentRate, - BurstAltitude: &burstAltitude, - DescentRate: &descentRate, - } -} - -func TestRestrictedPrediction_OnlyAscent(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Restrict to ascent only - params.SimulateStages = []string{"ascent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Verify all results are during ascent phase (altitude increasing) - for i := 1; i < len(results); i++ { - assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, - "Altitude should be increasing or equal during ascent") - } - - // Last altitude should be near burst altitude - lastAlt := *results[len(results)-1].Altitude - burstAlt := *params.BurstAltitude - assert.InDelta(t, burstAlt, lastAlt, 500.0, "Last altitude should be near burst altitude") -} - -func TestRestrictedPrediction_OnlyDescent(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Restrict to descent only - params.SimulateStages = []string{"descent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // First result should be at burst altitude (since ascent was skipped) - firstAlt := *results[0].Altitude - burstAlt := *params.BurstAltitude - assert.Equal(t, burstAlt, firstAlt, "Should start at burst altitude when ascent is skipped") - - // Verify all results are during descent phase (altitude decreasing) - for i := 1; i < len(results); i++ { - assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, - "Altitude should be decreasing or equal during descent") - } - - // Last altitude should be near ground - lastAlt := *results[len(results)-1].Altitude - assert.Less(t, lastAlt, 1000.0, "Last altitude should be near ground") -} - -func TestRestrictedPrediction_AscentAndDescent(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Include both ascent and descent - params.SimulateStages = []string{"ascent", "descent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Find the peak altitude (transition point) - maxAlt := 0.0 - maxIdx := 0 - for i, result := range results { - if *result.Altitude > maxAlt { - maxAlt = *result.Altitude - maxIdx = i - } - } - - // Verify ascent phase - for i := 1; i <= maxIdx; i++ { - assert.GreaterOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, - "Altitude should increase during ascent phase") - } - - // Verify descent phase - for i := maxIdx + 1; i < len(results); i++ { - assert.LessOrEqual(t, *results[i].Altitude, *results[i-1].Altitude, - "Altitude should decrease during descent phase") - } -} - -func TestRestrictedPrediction_FloatProfile_OnlyFloat(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - profile := "float_profile" - floatAlt := 15000.0 - params.Profile = &profile - params.FloatAltitude = &floatAlt - - // Restrict to float only - params.SimulateStages = []string{"float"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // All results should be at the float altitude - for _, result := range results { - assert.Equal(t, floatAlt, *result.Altitude, - "Altitude should remain constant at float altitude") - } - - // Verify horizontal movement (lat/lon changes due to wind) - firstLat := *results[0].Latitude - lastLat := *results[len(results)-1].Latitude - assert.NotEqual(t, firstLat, lastLat, "Latitude should change during float due to wind") -} - -func TestRestrictedPrediction_FloatProfile_AllStages(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - profile := "float_profile" - floatAlt := 15000.0 - params.Profile = &profile - params.FloatAltitude = &floatAlt - - // Include all stages - params.SimulateStages = []string{"ascent", "float", "descent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Verify we have ascending, constant, and descending altitude patterns - hasAscent := false - hasFloat := false - hasDescent := false - - const altTolerance = 50.0 // Tolerance for altitude comparison - - for i := 1; i < len(results); i++ { - altDiff := *results[i].Altitude - *results[i-1].Altitude - - if altDiff > altTolerance { - hasAscent = true - } else if altDiff < -altTolerance { - hasDescent = true - } else if *results[i].Altitude > 10000 { // Float happens at high altitude - hasFloat = true - } - } - - assert.True(t, hasAscent, "Should have ascent phase") - assert.True(t, hasFloat, "Should have float phase") - assert.True(t, hasDescent, "Should have descent phase") - - // Verify maximum altitude is near float altitude - maxAlt := 0.0 - for _, result := range results { - if *result.Altitude > maxAlt { - maxAlt = *result.Altitude - } - } - assert.InDelta(t, floatAlt, maxAlt, 1000.0, "Max altitude should be near float altitude") -} - -func TestRestrictedPrediction_ReverseProfile_OnlyFloat(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - profile := "reverse_profile" - floatAlt := 5000.0 - params.Profile = &profile - params.FloatAltitude = &floatAlt - - // Restrict to float only - params.SimulateStages = []string{"float"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // All results should be at the float altitude - for _, result := range results { - assert.InDelta(t, floatAlt, *result.Altitude, 10.0, - "Altitude should remain near float altitude") - } -} - -func TestRestrictedPrediction_EmptyStages_SimulatesAll(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Empty SimulateStages should simulate all stages - params.SimulateStages = []string{} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Should have both ascent and descent - // Find the peak - maxAlt := 0.0 - hasAscent := false - hasDescent := false - - for i := 1; i < len(results); i++ { - if *results[i].Altitude > *results[i-1].Altitude { - hasAscent = true - } - if *results[i].Altitude < *results[i-1].Altitude { - hasDescent = true - } - if *results[i].Altitude > maxAlt { - maxAlt = *results[i].Altitude - } - } - - assert.True(t, hasAscent, "Should have ascent phase") - assert.True(t, hasDescent, "Should have descent phase") -} - -func TestRestrictedPrediction_NilStages_SimulatesAll(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Nil SimulateStages should simulate all stages - params.SimulateStages = nil - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Should have both ascent and descent - maxAlt := 0.0 - minAltAfterMax := 1000000.0 - - for _, result := range results { - if *result.Altitude > maxAlt { - maxAlt = *result.Altitude - } - } - - foundMax := false - for _, result := range results { - if *result.Altitude == maxAlt { - foundMax = true - } - if foundMax && *result.Altitude < minAltAfterMax { - minAltAfterMax = *result.Altitude - } - } - - // Should reach high altitude and come back down - assert.Greater(t, maxAlt, 5000.0, "Should reach high altitude") - assert.Less(t, minAltAfterMax, maxAlt, "Should descend after reaching max altitude") -} - -func TestRestrictedPrediction_InvalidStage_IgnoresInvalid(t *testing.T) { - service, _ := createTestService() - params := createBasicParams() - - // Include invalid stage name (should be ignored) - params.SimulateStages = []string{"ascent", "invalid_stage", "descent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - // Should still simulate ascent and descent, ignoring the invalid stage -} - -func TestRestrictedPrediction_WindImpact(t *testing.T) { - service, mockGrib := createTestService() - - // Override mock to return strong eastward wind - mockGrib.ExpectedCalls = nil - mockGrib.On("Extract", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return([2]float64{20.0, 0.0}, nil) // Strong eastward wind - - params := createBasicParams() - params.SimulateStages = []string{"ascent"} - - results, err := service.PerformPrediction(context.Background(), params) - - assert.NoError(t, err) - assert.NotEmpty(t, results) - - // Longitude should increase significantly due to eastward wind - firstLon := *results[0].Longitude - lastLon := *results[len(results)-1].Longitude - assert.Greater(t, lastLon, firstLon, "Longitude should increase with eastward wind") - - // Verify wind values are captured in results - for _, result := range results { - if result.WindU != nil { - // Wind values should be present in results - assert.NotNil(t, result.WindV, "WindV should be present if WindU is present") - } - } -} - -func TestRestrictedPrediction_MissingRequiredParams(t *testing.T) { - service, _ := createTestService() - - testCases := []struct { - name string - params ds.PredictionParameters - }{ - { - name: "Missing latitude", - params: ds.PredictionParameters{ - LaunchLongitude: floatPtr(-105.0), - LaunchAltitude: floatPtr(1000.0), - LaunchDatetime: timePtr(time.Now()), - }, - }, - { - name: "Missing longitude", - params: ds.PredictionParameters{ - LaunchLatitude: floatPtr(40.0), - LaunchAltitude: floatPtr(1000.0), - LaunchDatetime: timePtr(time.Now()), - }, - }, - { - name: "Missing altitude", - params: ds.PredictionParameters{ - LaunchLatitude: floatPtr(40.0), - LaunchLongitude: floatPtr(-105.0), - LaunchDatetime: timePtr(time.Now()), - }, - }, - { - name: "Missing datetime", - params: ds.PredictionParameters{ - LaunchLatitude: floatPtr(40.0), - LaunchLongitude: floatPtr(-105.0), - LaunchAltitude: floatPtr(1000.0), - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - tc.params.SimulateStages = []string{"ascent"} - results, err := service.PerformPrediction(context.Background(), tc.params) - - assert.Error(t, err) - assert.Equal(t, ErrInvalidParameters, err) - assert.Nil(t, results) - }) - } -} - -func TestShouldSimulateStage(t *testing.T) { - testCases := []struct { - name string - stages []string - queryStage string - shouldSimulate bool - }{ - { - name: "Empty filter simulates all", - stages: []string{}, - queryStage: "ascent", - shouldSimulate: true, - }, - { - name: "Nil filter simulates all", - stages: nil, - queryStage: "descent", - shouldSimulate: true, - }, - { - name: "Stage in filter", - stages: []string{"ascent", "descent"}, - queryStage: "ascent", - shouldSimulate: true, - }, - { - name: "Stage not in filter", - stages: []string{"ascent"}, - queryStage: "descent", - shouldSimulate: false, - }, - { - name: "Float stage in filter", - stages: []string{"float"}, - queryStage: "float", - shouldSimulate: true, - }, - { - name: "Multiple stages excluding one", - stages: []string{"ascent", "float"}, - queryStage: "descent", - shouldSimulate: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - params := ds.PredictionParameters{ - SimulateStages: tc.stages, - } - result := shouldSimulateStage(params, tc.queryStage) - assert.Equal(t, tc.shouldSimulate, result) - }) - } -} - -// Helper functions -func floatPtr(f float64) *float64 { - return &f -} - -func timePtr(t time.Time) *time.Time { - return &t -} diff --git a/pkg/rest/oas_cfg_gen.go b/pkg/rest/oas_cfg_gen.go index 6b8f72d..1845c4f 100644 --- a/pkg/rest/oas_cfg_gen.go +++ b/pkg/rest/oas_cfg_gen.go @@ -5,14 +5,14 @@ package gsn import ( "net/http" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/otelogen" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" ) var ( @@ -32,7 +32,6 @@ type otelConfig struct { Tracer trace.Tracer MeterProvider metric.MeterProvider Meter metric.Meter - Attributes []attribute.KeyValue } func (cfg *otelConfig) initOTEL() { @@ -216,13 +215,6 @@ func WithMeterProvider(provider metric.MeterProvider) Option { }) } -// WithAttributes specifies default otel attributes. -func WithAttributes(attributes ...attribute.KeyValue) Option { - return otelOptionFunc(func(cfg *otelConfig) { - cfg.Attributes = attributes - }) -} - // WithClient specifies http client to use. func WithClient(client ht.Client) ClientOption { return optionFunc[clientConfig](func(cfg *clientConfig) { diff --git a/pkg/rest/oas_client_gen.go b/pkg/rest/oas_client_gen.go index 0484f94..19822e8 100644 --- a/pkg/rest/oas_client_gen.go +++ b/pkg/rest/oas_client_gen.go @@ -9,15 +9,16 @@ import ( "time" "github.com/go-faster/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + "github.com/ogen-go/ogen/conv" ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/otelogen" "github.com/ogen-go/ogen/uri" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" - semconv "go.opentelemetry.io/otel/semconv/v1.37.0" - "go.opentelemetry.io/otel/trace" ) func trimTrailingSlashes(u *url.URL) { @@ -102,9 +103,8 @@ func (c *Client) sendPerformPrediction(ctx context.Context, params PerformPredic otelAttrs := []attribute.KeyValue{ otelogen.OperationID("performPrediction"), semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLTemplateKey.String("/api/v1/prediction"), + semconv.HTTPRouteKey.String("/api/v1/prediction"), } - otelAttrs = append(otelAttrs, c.cfg.Attributes...) // Run stopwatch. startTime := time.Now() @@ -345,32 +345,6 @@ func (c *Client) sendPerformPrediction(ctx context.Context, params PerformPredic return res, errors.Wrap(err, "encode query") } } - { - // Encode "simulate_stages" parameter. - cfg := uri.QueryParameterEncodingConfig{ - Name: "simulate_stages", - Style: uri.QueryStyleForm, - Explode: true, - } - - if err := q.EncodeParam(cfg, func(e uri.Encoder) error { - if params.SimulateStages != nil { - return e.EncodeArray(func(e uri.Encoder) error { - for i, item := range params.SimulateStages { - if err := func() error { - return e.EncodeValue(conv.StringToString(string(item))) - }(); err != nil { - return errors.Wrapf(err, "[%d]", i) - } - } - return nil - }) - } - return nil - }); err != nil { - return res, errors.Wrap(err, "encode query") - } - } { // Encode "interpolate" parameter. cfg := uri.QueryParameterEncodingConfig{ @@ -460,9 +434,8 @@ func (c *Client) sendReadinessCheck(ctx context.Context) (res *ReadinessResponse otelAttrs := []attribute.KeyValue{ otelogen.OperationID("readinessCheck"), semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLTemplateKey.String("/ready"), + semconv.HTTPRouteKey.String("/ready"), } - otelAttrs = append(otelAttrs, c.cfg.Attributes...) // Run stopwatch. startTime := time.Now() diff --git a/pkg/rest/oas_handlers_gen.go b/pkg/rest/oas_handlers_gen.go index b7e53e4..76ab8ad 100644 --- a/pkg/rest/oas_handlers_gen.go +++ b/pkg/rest/oas_handlers_gen.go @@ -8,15 +8,16 @@ import ( "time" "github.com/go-faster/errors" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + ht "github.com/ogen-go/ogen/http" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/otelogen" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" - semconv "go.opentelemetry.io/otel/semconv/v1.37.0" - "go.opentelemetry.io/otel/trace" ) type codeRecorder struct { @@ -29,10 +30,6 @@ func (c *codeRecorder) WriteHeader(status int) { c.ResponseWriter.WriteHeader(status) } -func (c *codeRecorder) Unwrap() http.ResponseWriter { - return c.ResponseWriter -} - // handlePerformPredictionRequest handles performPrediction operation. // // Perform prediction. @@ -89,7 +86,7 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool // unless there was another error (e.g., network error receiving the response body; or 3xx codes with // max redirects exceeded), in which case status MUST be set to Error. code := statusWriter.status - if code < 100 || code >= 500 { + if code >= 100 && code < 500 { span.SetStatus(codes.Error, stage) } @@ -118,8 +115,6 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool return } - var rawBody []byte - var response *PredictionResult if m := s.cfg.Middleware; m != nil { mreq := middleware.Request{ @@ -128,7 +123,6 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool OperationSummary: "Perform prediction", OperationID: "performPrediction", Body: nil, - RawBody: rawBody, Params: middleware.Parameters{ { Name: "launch_latitude", @@ -178,10 +172,6 @@ func (s *Server) handlePerformPredictionRequest(args [0]string, argsEscaped bool Name: "descent_curve", In: "query", }: params.DescentCurve, - { - Name: "simulate_stages", - In: "query", - }: params.SimulateStages, { Name: "interpolate", In: "query", @@ -301,7 +291,7 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w // unless there was another error (e.g., network error receiving the response body; or 3xx codes with // max redirects exceeded), in which case status MUST be set to Error. code := statusWriter.status - if code < 100 || code >= 500 { + if code >= 100 && code < 500 { span.SetStatus(codes.Error, stage) } @@ -316,8 +306,6 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w err error ) - var rawBody []byte - var response *ReadinessResponse if m := s.cfg.Middleware; m != nil { mreq := middleware.Request{ @@ -326,7 +314,6 @@ func (s *Server) handleReadinessCheckRequest(args [0]string, argsEscaped bool, w OperationSummary: "Readiness check", OperationID: "readinessCheck", Body: nil, - RawBody: rawBody, Params: middleware.Parameters{}, Raw: r, } diff --git a/pkg/rest/oas_json_gen.go b/pkg/rest/oas_json_gen.go index 9707b6f..ea3d61c 100644 --- a/pkg/rest/oas_json_gen.go +++ b/pkg/rest/oas_json_gen.go @@ -9,6 +9,7 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" + "github.com/ogen-go/ogen/json" "github.com/ogen-go/ogen/validate" ) @@ -606,8 +607,6 @@ func (s *PredictionResultPredictionItemStage) Decode(d *jx.Decoder) error { *s = PredictionResultPredictionItemStageAscent case PredictionResultPredictionItemStageDescent: *s = PredictionResultPredictionItemStageDescent - case PredictionResultPredictionItemStageFloat: - *s = PredictionResultPredictionItemStageFloat default: *s = PredictionResultPredictionItemStage(v) } diff --git a/pkg/rest/oas_parameters_gen.go b/pkg/rest/oas_parameters_gen.go index da12715..23cc5d8 100644 --- a/pkg/rest/oas_parameters_gen.go +++ b/pkg/rest/oas_parameters_gen.go @@ -3,11 +3,11 @@ package gsn import ( - "fmt" "net/http" "time" "github.com/go-faster/errors" + "github.com/ogen-go/ogen/conv" "github.com/ogen-go/ogen/middleware" "github.com/ogen-go/ogen/ogenerrors" @@ -17,22 +17,21 @@ import ( // PerformPredictionParams is parameters of performPrediction operation. type PerformPredictionParams struct { - LaunchLatitude OptFloat64 `json:",omitempty,omitzero"` - LaunchLongitude OptFloat64 `json:",omitempty,omitzero"` - LaunchDatetime OptDateTime `json:",omitempty,omitzero"` - LaunchAltitude OptFloat64 `json:",omitempty,omitzero"` - Profile OptPerformPredictionProfile `json:",omitempty,omitzero"` - AscentRate OptFloat64 `json:",omitempty,omitzero"` - BurstAltitude OptFloat64 `json:",omitempty,omitzero"` - DescentRate OptFloat64 `json:",omitempty,omitzero"` - FloatAltitude OptFloat64 `json:",omitempty,omitzero"` - StopDatetime OptDateTime `json:",omitempty,omitzero"` - AscentCurve OptString `json:",omitempty,omitzero"` - DescentCurve OptString `json:",omitempty,omitzero"` - SimulateStages []PerformPredictionSimulateStagesItem `json:",omitempty"` - Interpolate OptBool `json:",omitempty,omitzero"` - Format OptPerformPredictionFormat `json:",omitempty,omitzero"` - Dataset OptDateTime `json:",omitempty,omitzero"` + LaunchLatitude OptFloat64 + LaunchLongitude OptFloat64 + LaunchDatetime OptDateTime + LaunchAltitude OptFloat64 + Profile OptPerformPredictionProfile + AscentRate OptFloat64 + BurstAltitude OptFloat64 + DescentRate OptFloat64 + FloatAltitude OptFloat64 + StopDatetime OptDateTime + AscentCurve OptString + DescentCurve OptString + Interpolate OptBool + Format OptPerformPredictionFormat + Dataset OptDateTime } func unpackPerformPredictionParams(packed middleware.Parameters) (params PerformPredictionParams) { @@ -144,15 +143,6 @@ func unpackPerformPredictionParams(packed middleware.Parameters) (params Perform params.DescentCurve = v.(OptString) } } - { - key := middleware.ParameterKey{ - Name: "simulate_stages", - In: "query", - } - if v, ok := packed[key]; ok { - params.SimulateStages = v.([]PerformPredictionSimulateStagesItem) - } - } { key := middleware.ParameterKey{ Name: "interpolate", @@ -797,71 +787,6 @@ func decodePerformPredictionParams(args [0]string, argsEscaped bool, r *http.Req Err: err, } } - // Decode query: simulate_stages. - if err := func() error { - cfg := uri.QueryParameterDecodingConfig{ - Name: "simulate_stages", - Style: uri.QueryStyleForm, - Explode: true, - } - - if err := q.HasParam(cfg); err == nil { - if err := q.DecodeParam(cfg, func(d uri.Decoder) error { - return d.DecodeArray(func(d uri.Decoder) error { - var paramsDotSimulateStagesVal PerformPredictionSimulateStagesItem - if err := func() error { - val, err := d.DecodeValue() - if err != nil { - return err - } - - c, err := conv.ToString(val) - if err != nil { - return err - } - - paramsDotSimulateStagesVal = PerformPredictionSimulateStagesItem(c) - return nil - }(); err != nil { - return err - } - params.SimulateStages = append(params.SimulateStages, paramsDotSimulateStagesVal) - return nil - }) - }); err != nil { - return err - } - if err := func() error { - var failures []validate.FieldError - for i, elem := range params.SimulateStages { - if err := func() error { - if err := elem.Validate(); err != nil { - return err - } - return nil - }(); err != nil { - failures = append(failures, validate.FieldError{ - Name: fmt.Sprintf("[%d]", i), - Error: err, - }) - } - } - if len(failures) > 0 { - return &validate.Error{Fields: failures} - } - return nil - }(); err != nil { - return err - } - } - return nil - }(); err != nil { - return params, &ogenerrors.DecodeParamError{ - Name: "simulate_stages", - In: "query", - Err: err, - } - } // Decode query: interpolate. if err := func() error { cfg := uri.QueryParameterDecodingConfig{ diff --git a/pkg/rest/oas_response_decoders_gen.go b/pkg/rest/oas_response_decoders_gen.go index 352068c..3c148fd 100644 --- a/pkg/rest/oas_response_decoders_gen.go +++ b/pkg/rest/oas_response_decoders_gen.go @@ -9,6 +9,7 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" + "github.com/ogen-go/ogen/ogenerrors" "github.com/ogen-go/ogen/validate" ) diff --git a/pkg/rest/oas_response_encoders_gen.go b/pkg/rest/oas_response_encoders_gen.go index a95167d..8f24cd5 100644 --- a/pkg/rest/oas_response_encoders_gen.go +++ b/pkg/rest/oas_response_encoders_gen.go @@ -7,9 +7,10 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/jx" - ht "github.com/ogen-go/ogen/http" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + + ht "github.com/ogen-go/ogen/http" ) func encodePerformPredictionResponse(response *PredictionResult, w http.ResponseWriter, span trace.Span) error { diff --git a/pkg/rest/oas_router_gen.go b/pkg/rest/oas_router_gen.go index 5f7617a..1eea998 100644 --- a/pkg/rest/oas_router_gen.go +++ b/pkg/rest/oas_router_gen.go @@ -109,13 +109,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Route is route object. type Route struct { - name string - summary string - operationID string - operationGroup string - pathPattern string - count int - args [0]string + name string + summary string + operationID string + pathPattern string + count int + args [0]string } // Name returns ogen operation name. @@ -135,11 +134,6 @@ func (r Route) OperationID() string { return r.operationID } -// OperationGroup returns the x-ogen-operation-group value. -func (r Route) OperationGroup() string { - return r.operationGroup -} - // PathPattern returns OpenAPI path. func (r Route) PathPattern() string { return r.pathPattern @@ -215,7 +209,6 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { r.name = PerformPredictionOperation r.summary = "Perform prediction" r.operationID = "performPrediction" - r.operationGroup = "" r.pathPattern = "/api/v1/prediction" r.args = args r.count = 0 @@ -240,7 +233,6 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { r.name = ReadinessCheckOperation r.summary = "Readiness check" r.operationID = "readinessCheck" - r.operationGroup = "" r.pathPattern = "/ready" r.args = args r.count = 0 diff --git a/pkg/rest/oas_schemas_gen.go b/pkg/rest/oas_schemas_gen.go index b484e26..26808cc 100644 --- a/pkg/rest/oas_schemas_gen.go +++ b/pkg/rest/oas_schemas_gen.go @@ -430,54 +430,6 @@ func (s *PerformPredictionProfile) UnmarshalText(data []byte) error { } } -type PerformPredictionSimulateStagesItem string - -const ( - PerformPredictionSimulateStagesItemAscent PerformPredictionSimulateStagesItem = "ascent" - PerformPredictionSimulateStagesItemDescent PerformPredictionSimulateStagesItem = "descent" - PerformPredictionSimulateStagesItemFloat PerformPredictionSimulateStagesItem = "float" -) - -// AllValues returns all PerformPredictionSimulateStagesItem values. -func (PerformPredictionSimulateStagesItem) AllValues() []PerformPredictionSimulateStagesItem { - return []PerformPredictionSimulateStagesItem{ - PerformPredictionSimulateStagesItemAscent, - PerformPredictionSimulateStagesItemDescent, - PerformPredictionSimulateStagesItemFloat, - } -} - -// MarshalText implements encoding.TextMarshaler. -func (s PerformPredictionSimulateStagesItem) MarshalText() ([]byte, error) { - switch s { - case PerformPredictionSimulateStagesItemAscent: - return []byte(s), nil - case PerformPredictionSimulateStagesItemDescent: - return []byte(s), nil - case PerformPredictionSimulateStagesItemFloat: - return []byte(s), nil - default: - return nil, errors.Errorf("invalid value: %q", s) - } -} - -// UnmarshalText implements encoding.TextUnmarshaler. -func (s *PerformPredictionSimulateStagesItem) UnmarshalText(data []byte) error { - switch PerformPredictionSimulateStagesItem(data) { - case PerformPredictionSimulateStagesItemAscent: - *s = PerformPredictionSimulateStagesItemAscent - return nil - case PerformPredictionSimulateStagesItemDescent: - *s = PerformPredictionSimulateStagesItemDescent - return nil - case PerformPredictionSimulateStagesItemFloat: - *s = PerformPredictionSimulateStagesItemFloat - return nil - default: - return errors.Errorf("invalid value: %q", data) - } -} - // Ref: #/components/schemas/PredictionResult type PredictionResult struct { Metadata PredictionResultMetadata `json:"metadata"` @@ -559,7 +511,6 @@ type PredictionResultPredictionItemStage string const ( PredictionResultPredictionItemStageAscent PredictionResultPredictionItemStage = "ascent" PredictionResultPredictionItemStageDescent PredictionResultPredictionItemStage = "descent" - PredictionResultPredictionItemStageFloat PredictionResultPredictionItemStage = "float" ) // AllValues returns all PredictionResultPredictionItemStage values. @@ -567,7 +518,6 @@ func (PredictionResultPredictionItemStage) AllValues() []PredictionResultPredict return []PredictionResultPredictionItemStage{ PredictionResultPredictionItemStageAscent, PredictionResultPredictionItemStageDescent, - PredictionResultPredictionItemStageFloat, } } @@ -578,8 +528,6 @@ func (s PredictionResultPredictionItemStage) MarshalText() ([]byte, error) { return []byte(s), nil case PredictionResultPredictionItemStageDescent: return []byte(s), nil - case PredictionResultPredictionItemStageFloat: - return []byte(s), nil default: return nil, errors.Errorf("invalid value: %q", s) } @@ -594,9 +542,6 @@ func (s *PredictionResultPredictionItemStage) UnmarshalText(data []byte) error { case PredictionResultPredictionItemStageDescent: *s = PredictionResultPredictionItemStageDescent return nil - case PredictionResultPredictionItemStageFloat: - *s = PredictionResultPredictionItemStageFloat - return nil default: return errors.Errorf("invalid value: %q", data) } diff --git a/pkg/rest/oas_validators_gen.go b/pkg/rest/oas_validators_gen.go index 1207d8f..0cd02b6 100644 --- a/pkg/rest/oas_validators_gen.go +++ b/pkg/rest/oas_validators_gen.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/go-faster/errors" + "github.com/ogen-go/ogen/validate" ) @@ -33,19 +34,6 @@ func (s PerformPredictionProfile) Validate() error { } } -func (s PerformPredictionSimulateStagesItem) Validate() error { - switch s { - case "ascent": - return nil - case "descent": - return nil - case "float": - return nil - default: - return errors.Errorf("invalid value: %v", s) - } -} - func (s *PredictionResult) Validate() error { if s == nil { return validate.ErrNilPointer @@ -143,8 +131,6 @@ func (s PredictionResultPredictionItemStage) Validate() error { return nil case "descent": return nil - case "float": - return nil default: return errors.Errorf("invalid value: %v", s) } diff --git a/scripts/compare.go b/scripts/compare.go deleted file mode 100644 index 5e50653..0000000 --- a/scripts/compare.go +++ /dev/null @@ -1,114 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "math" - "os" -) - -type Point struct { - Datetime string `json:"datetime"` - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Altitude float64 `json:"altitude"` -} - -type Stage struct { - Stage string `json:"stage"` - Trajectory []Point `json:"trajectory"` -} - -type Prediction struct { - Prediction []Stage `json:"prediction"` -} - -func haversine(lat1, lon1, lat2, lon2 float64) float64 { - R := 6371000.0 - phi1, phi2 := lat1*math.Pi/180, lat2*math.Pi/180 - dphi := (lat2 - lat1) * math.Pi / 180 - dlam := (lon2 - lon1) * math.Pi / 180 - a := math.Sin(dphi/2)*math.Sin(dphi/2) + math.Cos(phi1)*math.Cos(phi2)*math.Sin(dlam/2)*math.Sin(dlam/2) - return R * 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) -} - -func load(path string) Prediction { - data, _ := os.ReadFile(path) - var p Prediction - json.Unmarshal(data, &p) - return p -} - -func main() { - our := load("c:/tmp/our.json") - taw := load("c:/tmp/tawhiri.json") - - // Find burst and landing points - var ourBurst, ourLand, tawBurst, tawLand Point - for _, s := range our.Prediction { - t := s.Trajectory - if s.Stage == "ascent" { - ourBurst = t[len(t)-1] - } - if s.Stage == "descent" { - ourLand = t[len(t)-1] - } - } - for _, s := range taw.Prediction { - t := s.Trajectory - if s.Stage == "ascent" { - tawBurst = t[len(t)-1] - } - if s.Stage == "descent" { - tawLand = t[len(t)-1] - } - } - - fmt.Println("=== Burst Point ===") - fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourBurst.Latitude, ourBurst.Longitude, ourBurst.Altitude, ourBurst.Datetime) - fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawBurst.Latitude, tawBurst.Longitude, tawBurst.Altitude, tawBurst.Datetime) - burstDist := haversine(ourBurst.Latitude, ourBurst.Longitude, tawBurst.Latitude, tawBurst.Longitude) - fmt.Printf(" Distance: %.2f km\n", burstDist/1000) - - fmt.Println() - fmt.Println("=== Landing Point ===") - fmt.Printf(" Our: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", ourLand.Latitude, ourLand.Longitude, ourLand.Altitude, ourLand.Datetime) - fmt.Printf(" Tawhiri: lat=%.4f, lon=%.4f, alt=%.0f, time=%s\n", tawLand.Latitude, tawLand.Longitude, tawLand.Altitude, tawLand.Datetime) - landDist := haversine(ourLand.Latitude, ourLand.Longitude, tawLand.Latitude, tawLand.Longitude) - fmt.Printf(" Distance: %.2f km\n", landDist/1000) - - fmt.Println() - fmt.Println("=== Trajectory Comparison (every 10 min) ===") - ourPts := map[string]Point{} - tawPts := map[string]Point{} - for _, s := range our.Prediction { - for _, p := range s.Trajectory { - ourPts[p.Datetime] = p - } - } - for _, s := range taw.Prediction { - for _, p := range s.Trajectory { - tawPts[p.Datetime] = p - } - } - - // Collect common times - var common []string - for _, s := range our.Prediction { - for _, p := range s.Trajectory { - if _, ok := tawPts[p.Datetime]; ok { - common = append(common, p.Datetime) - } - } - } - - for i, t := range common { - if i%10 == 0 { - o := ourPts[t] - tw := tawPts[t] - d := haversine(o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) - fmt.Printf(" %s: dist=%.2f km (our: %.3f,%.3f vs taw: %.3f,%.3f)\n", - t, d/1000, o.Latitude, o.Longitude, tw.Latitude, tw.Longitude) - } - } -} diff --git a/scripts/rebuild_cube.go b/scripts/rebuild_cube.go deleted file mode 100644 index 240b707..0000000 --- a/scripts/rebuild_cube.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "context" - "fmt" - "os" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - cfg := &grib.Config{ - Dir: "C:/tmp/grib", - TTL: 48 * time.Hour, - CacheTTL: 1 * time.Hour, - Parallel: 8, - } - - svc, err := grib.New(cfg) - if err != nil { - fmt.Printf("Error: %v\n", err) - return - } - - // Delete old cube to force rebuild - cubePath := "C:/tmp/grib/20260212_12.cube" - if err := os.Remove(cubePath); err != nil && !os.IsNotExist(err) { - fmt.Printf("Remove cube error: %v\n", err) - } else { - fmt.Println("Old cube removed") - } - - // Update will download missing pgrb2b files and rebuild cube - fmt.Println("Starting update (download pgrb2b + rebuild cube)...") - start := time.Now() - if err := svc.Update(ctx); err != nil { - fmt.Printf("Update error: %v\n", err) - return - } - fmt.Printf("Done in %v\n", time.Since(start)) -} diff --git a/scripts/test_download.go b/scripts/test_download.go deleted file mode 100644 index 73934a5..0000000 --- a/scripts/test_download.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "context" - "fmt" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - // Найти последний доступный прогноз - run, err := grib.GetLatestModelRun(ctx) - if err != nil { - fmt.Printf("Error finding model run: %v\n", err) - return - } - fmt.Printf("Found model run: %v\n", run) - - // Создать downloader - dl := grib.NewPartialDownloader("C:/tmp/grib", 8) - - // Запустить загрузку - start := time.Now() - fmt.Println("Starting download...") - - err = dl.Run(ctx, run) - if err != nil { - fmt.Printf("Download error: %v\n", err) - return - } - - fmt.Printf("Download completed in %v\n", time.Since(start)) -} diff --git a/scripts/test_gh.go b/scripts/test_gh.go deleted file mode 100644 index bfb592c..0000000 --- a/scripts/test_gh.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "encoding/binary" - "fmt" - "math" - "os" - - mmap "github.com/edsrzf/mmap-go" -) - -var pressureLevels = []float64{ - 1000, 975, 950, 925, 900, 875, 850, 825, 800, 775, - 750, 725, 700, 675, 650, 625, 600, 575, 550, 525, - 500, 475, 450, 425, 400, 375, 350, 325, 300, 275, - 250, 225, 200, 175, 150, 125, 100, 70, 50, 30, - 20, 10, 7, 5, 3, 2, 1, -} - -func main() { - f, _ := os.Open("C:/tmp/grib/20260212_12.cube") - mm, _ := mmap.Map(f, mmap.RDONLY, 0) - defer mm.Unmap() - defer f.Close() - - const ( - nT = 97 - nP = 47 - nLat = 721 - nLon = 1440 - ) - bytesPerVar := int64(nT * nP * nLat * nLon * 4) - - val := func(varIdx, ti, pi, y, x int) float32 { - idx := (((ti*nP + pi) * nLat) + y) * nLon + x - off := int64(varIdx)*bytesPerVar + int64(idx)*4 - bits := binary.LittleEndian.Uint32(mm[off : off+4]) - return math.Float32frombits(bits) - } - - // Check gh values at lat=52.2N (y=(90-52.2)*4=151.2 → y=151), lon=0.1E (x=0.1*4=0.4 → x=0) - // Time step 9 (9 hours into forecast) - ti := 9 - y := 151 - x := 0 - - fmt.Println("GH values at (52.25N, 0E), t=+9h:") - fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "GH(m)") - for pi := 0; pi < nP; pi++ { - gh := val(0, ti, pi, y, x) - fmt.Printf("%8d %8.0f %10.1f\n", pi, pressureLevels[pi], gh) - } - - fmt.Println("\nU-wind values at same point:") - fmt.Printf("%8s %8s %10s\n", "Level", "hPa", "U(m/s)") - for pi := 0; pi < nP; pi++ { - u := val(1, ti, pi, y, x) - fmt.Printf("%8d %8.0f %10.2f\n", pi, pressureLevels[pi], u) - } -} diff --git a/scripts/test_grib_read.go b/scripts/test_grib_read.go deleted file mode 100644 index 3b8b7eb..0000000 --- a/scripts/test_grib_read.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "fmt" - "os" - - "github.com/nilsmagnus/grib/griblib" -) - -func main() { - f, err := os.Open("C:/tmp/grib/gfs.t18z.pgrb2.0p25.f000") - if err != nil { - fmt.Printf("Error opening file: %v\n", err) - return - } - defer f.Close() - - messages, err := griblib.ReadMessages(f) - if err != nil { - fmt.Printf("Error reading GRIB: %v\n", err) - return - } - - fmt.Printf("Found %d messages\n\n", len(messages)) - - for i, m := range messages { - product := m.Section4.ProductDefinitionTemplate - if product.ParameterCategory != 2 || product.ParameterNumber != 2 { - continue // only u-wind - } - fmt.Printf("UGRD Msg %d: SurfType=%d SurfValue=%d SurfScale=%d DataLen=%d\n", - i, - product.FirstSurface.Type, - product.FirstSurface.Value, - product.FirstSurface.Scale, - len(m.Data())) - } -} diff --git a/scripts/test_prediction.go b/scripts/test_prediction.go deleted file mode 100644 index fb56c5a..0000000 --- a/scripts/test_prediction.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - // Инициализируем GRIB сервис - cfg := &grib.Config{ - Dir: "C:/tmp/grib", - TTL: 48 * time.Hour, - CacheTTL: 1 * time.Hour, - Parallel: 8, - } - - svc, err := grib.New(cfg) - if err != nil { - fmt.Printf("Error creating service: %v\n", err) - return - } - - // Обновляем данные (создаёт куб) - fmt.Println("Updating GRIB data (building cube)...") - start := time.Now() - if err := svc.Update(ctx); err != nil { - fmt.Printf("Update error: %v\n", err) - return - } - fmt.Printf("Cube built in %v\n", time.Since(start)) - - // Тестируем извлечение ветра - fmt.Println("\nTesting wind extraction...") - lat, lon, alt := 52.2, 0.1, 10000.0 - ts := time.Date(2026, 2, 11, 12, 0, 0, 0, time.UTC) - - wind, err := svc.Extract(ctx, lat, lon, alt, ts) - if err != nil { - fmt.Printf("Extract error: %v\n", err) - return - } - fmt.Printf("Wind at (%.2f, %.2f, %.0fm) at %v:\n", lat, lon, alt, ts) - fmt.Printf(" U (east): %.2f m/s\n", wind[0]) - fmt.Printf(" V (north): %.2f m/s\n", wind[1]) - - // Сравниваем с Tawhiri - fmt.Println("\nComparing with Tawhiri API...") - tawhiriURL := fmt.Sprintf( - "https://api.v2.sondehub.org/tawhiri?launch_latitude=%.2f&launch_longitude=%.2f&launch_altitude=0&launch_datetime=%s&ascent_rate=5&burst_altitude=30000&descent_rate=5", - lat, lon, ts.Format(time.RFC3339), - ) - - resp, err := http.Get(tawhiriURL) - if err != nil { - fmt.Printf("Tawhiri request error: %v\n", err) - return - } - defer resp.Body.Close() - - body, _ := io.ReadAll(resp.Body) - var tawhiriResp map[string]interface{} - json.Unmarshal(body, &tawhiriResp) - - // Выводим финальную точку приземления - if prediction, ok := tawhiriResp["prediction"].([]interface{}); ok { - for _, stage := range prediction { - stageMap := stage.(map[string]interface{}) - if stageMap["stage"] == "descent" { - trajectory := stageMap["trajectory"].([]interface{}) - if len(trajectory) > 0 { - last := trajectory[len(trajectory)-1].(map[string]interface{}) - fmt.Printf("\nTawhiri landing point:\n") - fmt.Printf(" Lat: %.4f\n", last["latitude"]) - fmt.Printf(" Lon: %.4f\n", last["longitude"]) - } - } - } - } -} diff --git a/scripts/test_s3_download.go b/scripts/test_s3_download.go new file mode 100644 index 0000000..0391cd6 --- /dev/null +++ b/scripts/test_s3_download.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" +) + +func main() { + ctx := context.Background() + + // Create S3 downloader + downloader, err := grib.NewS3Downloader( + "/tmp/grib_test", + 4, // parallel downloads + "noaa-gfs-bdp-pds", + "us-east-1", + ) + if err != nil { + log.Fatalf("Failed to create S3 downloader: %v", err) + } + + // Ensure directory exists + if err := os.MkdirAll("/tmp/grib_test", 0o755); err != nil { + log.Fatalf("Failed to create directory: %v", err) + } + + // Find nearest run (6-hour intervals: 00, 06, 12, 18 UTC) + now := time.Now().UTC() + hour := now.Hour() - (now.Hour() % 6) + // Use data from 6 hours ago to ensure it's available + run := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, time.UTC).Add(-6 * time.Hour) + + fmt.Printf("Testing S3 download for run: %s\n", run.Format("2006-01-02 15:04 MST")) + + // List available files first + runStr := run.Format("20060102") + fmt.Printf("Listing available files for %s/%02d...\n", runStr, run.Hour()) + files, err := downloader.ListAvailableFiles(ctx, runStr, run.Hour()) + if err != nil { + log.Fatalf("Failed to list files: %v", err) + } + + fmt.Printf("Found %d files in S3:\n", len(files)) + if len(files) > 0 { + // Show first 5 files + for i, file := range files { + if i >= 5 { + fmt.Printf("... and %d more files\n", len(files)-5) + break + } + fmt.Printf(" - %s\n", file) + } + } + + // Try downloading just first 3 forecast hours (f000, f001, f002) + fmt.Println("\nTesting download of first 3 forecast hours...") + testRun := run + + // Create a timeout context for the download + downloadCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + if err := downloader.Run(downloadCtx, testRun); err != nil { + log.Fatalf("Failed to download: %v", err) + } + + fmt.Println("\nDownload completed successfully!") + + // Check downloaded files + entries, err := os.ReadDir("/tmp/grib_test") + if err != nil { + log.Fatalf("Failed to read directory: %v", err) + } + + fmt.Printf("\nDownloaded %d files:\n", len(entries)) + for i, entry := range entries { + if i >= 10 { + fmt.Printf("... and %d more files\n", len(entries)-10) + break + } + info, _ := entry.Info() + fmt.Printf(" - %s (%.2f MB)\n", entry.Name(), float64(info.Size())/1024/1024) + } +} diff --git a/scripts/test_s3_simple.go b/scripts/test_s3_simple.go new file mode 100644 index 0000000..2ae8414 --- /dev/null +++ b/scripts/test_s3_simple.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func main() { + ctx := context.Background() + + // Create AWS config with anonymous credentials + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(aws.AnonymousCredentials{}), + ) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + client := s3.NewFromConfig(cfg) + + // Try to download a single file + bucket := "noaa-gfs-bdp-pds" + key := "gfs.20251020/00/atmos/gfs.t00z.pgrb2.0p50.f000" + + fmt.Printf("Downloading: s3://%s/%s\n", bucket, key) + + input := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + } + + result, err := client.GetObject(ctx, input) + if err != nil { + log.Fatalf("Failed to get object: %v", err) + } + defer result.Body.Close() + + // Create output file + outFile := "/tmp/test_grib.part" + f, err := os.Create(outFile) + if err != nil { + log.Fatalf("Failed to create file: %v", err) + } + defer f.Close() + + // Copy data + written, err := io.Copy(f, result.Body) + if err != nil { + log.Fatalf("Failed to copy data: %v (wrote %d bytes)", err, written) + } + + fmt.Printf("Successfully downloaded %d bytes\n", written) + + // Rename + if err := os.Rename(outFile, "/tmp/test_grib"); err != nil { + log.Fatalf("Failed to rename: %v", err) + } + + fmt.Println("Download complete!") +} diff --git a/scripts/test_wind.go b/scripts/test_wind.go deleted file mode 100644 index 61a231a..0000000 --- a/scripts/test_wind.go +++ /dev/null @@ -1,55 +0,0 @@ -package main - -import ( - "context" - "fmt" - "time" - - "git.intra.yksa.space/gsn/predictor/internal/pkg/grib" -) - -func main() { - ctx := context.Background() - - cfg := &grib.Config{ - Dir: "C:/tmp/grib", - TTL: 48 * time.Hour, - CacheTTL: 1 * time.Hour, - Parallel: 8, - } - - svc, err := grib.New(cfg) - if err != nil { - fmt.Printf("Error: %v\n", err) - return - } - - if err := svc.Update(ctx); err != nil { - fmt.Printf("Update error: %v\n", err) - return - } - - // Test wind at lat=52.2, lon=0.1 at various altitudes - // Run is 2026-02-12T12:00Z, request time 21:00Z = +9 hours - ts := time.Date(2026, 2, 12, 21, 0, 0, 0, time.UTC) - lat, lon := 52.2, 0.1 - - fmt.Println("Wind at (52.2°N, 0.1°E) at 2026-02-12T21:00Z:") - fmt.Printf("%8s %8s %8s\n", "Alt(m)", "U(m/s)", "V(m/s)") - - for _, alt := range []float64{0, 1000, 3000, 5000, 7000, 10000, 15000, 20000, 25000, 30000} { - w, err := svc.Extract(ctx, lat, lon, alt, ts) - if err != nil { - fmt.Printf("%8.0f Error: %v\n", alt, err) - continue - } - fmt.Printf("%8.0f %8.2f %8.2f\n", alt, w[0], w[1]) - } - - // Also test at a few nearby points to check spatial consistency - fmt.Println("\nWind at 10km altitude, varying longitude:") - for _, testLon := range []float64{0.0, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 350.0, 359.75} { - w, _ := svc.Extract(ctx, lat, testLon, 10000, ts) - fmt.Printf(" lon=%6.2f: U=%8.2f V=%8.2f\n", testLon, w[0], w[1]) - } -} diff --git a/start_with_http.sh b/start_with_http.sh deleted file mode 100644 index 4173c2b..0000000 --- a/start_with_http.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -# Start API with HTTP downloads instead of S3 - -export GSN_PREDICTOR_GRIB_USE_S3=false - -echo "Starting API with HTTP downloads from NOMADS..." -echo "USE_S3 = $GSN_PREDICTOR_GRIB_USE_S3" -echo "" - -cd "$(dirname "$0")" -go run ./cmd/api/main.go